当前位置:首页 » 数据仓库 » spark连接数据库
扩展阅读
webinf下怎么引入js 2023-08-31 21:54:13
堡垒机怎么打开web 2023-08-31 21:54:11

spark连接数据库

发布时间: 2022-05-30 07:52:40

1. spark在处理数据的时候需要连接数据库

spark在处理数据的时候需要连接数据库吗
spark也不是无所不能的啊,提供思路: 直接监听mysql的bin log 然后处理bin log的日志到hdfs上

2. 求问怎么设置sparksql读取hive的数据库

求问怎么设置sparksql读取hive的数据库
使用maven进行打包:
打包命令:
mvn -Pyarn -Dhadoop.version=2.3.0-cdh5.0.0 -Phive -Phive-thriftserver -DskipTests clean package

3. 求助各位大神.关于spark SQL的连接问题

主要内容:
1. Spark Streaming 另类在线实验
2. 理解Spark Streaming本质

写在前面的话:
为什么我们要以SparkStreaming为切入点进行Spark的源码定制呢?
原因如下:
1从研究目的来看
在Spark创立之初,并没有现在我们常用的这些子框架,如Spark Streaming、Spark SQL、Spark R等内容,仅有原始的Spark core;而这些子框架是在Spark不断发展中加入到Spark大家庭的。我们采用的是与Spark core联系最紧密的Spark Streaming作为切入点,可以透过一个子框架的彻底研究,从而了解Spark的核心问题与解决办法

2各个框架的对比来说
现阶段,我们运行最多的是Sparkcore 以及其他的一些常用子框架。一下我们做了一些纵向对比。

4. 如何使用sparksql向mysql中插入数据

f(isset($_POST['submit'])&&$_POST['submit']=='提交'){
3 //判断是否是提交过来的
4 $intext = $_POST['intext'];
5 if($intext!=null||$intext!=''){
6 $link = mysql_connect("localhost", "root", "123456");
7 //数据库配置信息 第一个参数数据库位置第二个是用户名第三个是密码
8 mysql_select_db("szn_test");
9 //设置要使用的数据库
10 $sql = "select * from demo where res = '".$intext."'";

5. Spark新手求教 SparkStreaming能否连接MySQL进行查询操作

支持mysql的,下面是示例
spark streaming使用数据源方式插入mysql数据
import java.sql.{Connection, ResultSet}
import com.jolbox.bonecp.{BoneCP, BoneCPConfig}
import org.slf4j.LoggerFactory

object ConnectionPool {

val logger = LoggerFactory.getLogger(this.getClass)
private val connectionPool = {
try{
Class.forName("com.mysql.jdbc.Driver")
val config = new BoneCPConfig()
config.setJdbcUrl("jdbc:mysql://192.168.0.46:3306/test")
config.setUsername("test")
config.setPassword("test")
config.setMinConnectionsPerPartition(2)
config.setMaxConnectionsPerPartition(5)
config.setPartitionCount(3)
config.setCloseConnectionWatch(true)
config.setLogStatementsEnabled(true)
Some(new BoneCP(config))
} catch {
case exception:Exception=>
logger.warn("Error in creation of connection pool"+exception.printStackTrace())
None
}
}

def getConnection:Option[Connection] ={
connectionPool match {
case Some(connPool) => Some(connPool.getConnection)
case None => None
}
}

def closeConnection(connection:Connection): Unit = {
if(!connection.isClosed) connection.close()
}
}

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.slf4j.LoggerFactory

/**
* 记录最近五秒钟的数据
*/
object RealtimeCount1{

case class Loging(vtime:Long,muid:String,uid:String,ucp:String,category:String,autoSid:Int,dealerId:String,tuanId:String,newsId:String)

case class Record(vtime:Long,muid:String,uid:String,item:String,types:String)

val logger = LoggerFactory.getLogger(this.getClass)

def main(args: Array[String]) {
val argc = new Array[String](4)
argc(0) = "10.0.0.37"
argc(1) = "test-1"
argc(2) = "test22"
argc(3) = "1"
val Array(zkQuorum, group, topics, numThreads) = argc
val sparkConf = new SparkConf().setAppName("RealtimeCount").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(5))

val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap

val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(x=>x._2)

val sql = "insert into loging_realtime1(vtime,muid,uid,item,category) values (?,?,?,?,?)"

val tmpdf = lines.map(_.split("\t")).map(x=>Loging(x(9).toLong,x(1),x(0),x(3),x(25),x(18).toInt,x(29),x(30),x(28))).filter(x=>(x.muid!=null && !x.muid.equals("null") && !("").equals(x.muid))).map(x=>Record(x.vtime,x.muid,x.uid,getItem(x.category,x.ucp,x.newsId,x.autoSid.toInt,x.dealerId,x.tuanId),getType(x.category,x.ucp,x.newsId,x.autoSid.toInt,x.dealerId,x.tuanId)))
tmpdf.filter(x=>x.types!=null).foreachRDD{rdd =>
//rdd.foreach(println)
rdd.foreachPartition(partitionRecords=>{
val connection = ConnectionPool.getConnection.getOrElse(null)
if(connection!=null){
partitionRecords.foreach(record=>process(connection,sql,record))
ConnectionPool.closeConnection(connection)
}
})
}
ssc.start()
ssc.awaitTermination()
}

def getItem(category:String,ucp:String,newsId:String,autoSid:Int,dealerId:String,tuanId:String):String = {
if(category!=null && !category.equals("null")){
val pattern = ""
val matcher = ucp.matches(pattern)
if(matcher) {
ucp.substring(33,42)
}else{
null
}
}else if(autoSid!=0){
autoSid.toString
}else if(dealerId!=null && !dealerId.equals("null")){
dealerId
}else if(tuanId!=null && !tuanId.equals("null")){
tuanId
}else{
null
}
}

def getType(category:String,ucp:String,newsId:String,autoSid:Int,dealerId:String,tuanId:String):String = {
if(category!=null && !category.equals("null")){
val pattern = "100000726;100000730;\\d{9};\\d{9}"
val matcher = category.matches(pattern)

val pattern1 =""
val matcher1 = ucp.matches(pattern1)

if(matcher1 && matcher) {
"nv"
}else if(newsId!=null && !newsId.equals("null") && matcher1){
"ns"
}else if(matcher1){
"ne"
}else{
null
}
}else if(autoSid!=0){
"as"
}else if(dealerId!=null && !dealerId.equals("null")){
"di"
}else if(tuanId!=null && !tuanId.equals("null")){
"ti"
}else{
null
}
}

def process(conn:Connection,sql:String,data:Record): Unit ={
try{
val ps : PreparedStatement = conn.prepareStatement(sql)
ps.setLong(1,data.vtime)
ps.setString(2,data.muid)
ps.setString(3,data.uid)
ps.setString(4,data.item)
ps.setString(5,data.types)
ps.executeUpdate()
}catch{
case exception:Exception=>
logger.warn("Error in execution of query"+exception.printStackTrace())
}
}
}

6. spark从hive数据仓库中读取的数据可以使用sparksql进行查询吗

1、为了让Spark能够连接到Hive的原有数据仓库,我们需要将Hive中的hive-site.xml文件拷贝到Spark的conf目录下,这样就可以通过这个配置文件找到Hive的元数据以及数据存放。
在这里由于我的Spark是自动安装和部署的,因此需要知道CDH将hive-site.xml放在哪里。经过摸索。该文件默认所在的路径是:/etc/hive/conf 下。
同理,spark的conf也是在/etc/spark/conf。
此时,如上所述,将对应的hive-site.xml拷贝到spark/conf目录下即可
如果Hive的元数据存放在Mysql中,我们还需要准备好Mysql相关驱动,比如:mysql-connector-java-5.1.22-bin.jar。
2、编写测试代码
val conf=new SparkConf().setAppName("Spark-Hive").setMaster("local")
val sc=new SparkContext(conf)

//create hivecontext
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' ") //这里需要注意数据的间隔符

sqlContext.sql("LOAD DATA INPATH '/user/liujiyu/spark/kv1.txt' INTO TABLE src ");

sqlContext.sql(" SELECT * FROM jn1").collect().foreach(println)

sc.stop()

3、下面列举一下出现的问题:
(1)如果没有将hive-site.xml拷贝到spark/conf目录下,会出现:

分析:从错误提示上面就知道,spark无法知道hive的元数据的位置,所以就无法实例化对应的client。
解决的办法就是必须将hive-site.xml拷贝到spark/conf目录下
(2)测试代码中没有加sc.stop会出现如下错误:
ERROR scheler.LiveListenerBus: Listener EventLoggingListener threw an exception
java.lang.reflect.InvocationTargetException
在代码最后一行添加sc.stop()解决了该问题。

7. spark连接数据库连接数过高

使用MySQL数据库,有一个容易出现的问题——Too many connections。连接数超过。
我们知道,由于SUPER权限有很多特权,因此不会把这个权限给予应用的账号。但是,当应用异常或者数据库异常,达到最大连接数的时候,用管理账号登录,有时候仍然会报Too many connections。此时,如果应用不能及时处理,数据库这边就很难办了。
所以,当应用异常并且频繁尝试建立连接的时候,常能占据那第max_connections+1个连接。super账号由于拿不到线程,因此也是Too many connections了。

8. spark存入mysql会失败吗


Spark 分析Json数据存入Mysql 遇到的坑
折腾了两天,终算是弄好了,入的坑不计其数,但是也要数一数。
坑(一)
之前搭建好了spark,就是简单的wordcount一下,成功了也就没在意。
这几天重新拾起来,一上来就记得 –master spark://master:55555
这个55555端口其实是我的hdfs的端口,结果给记成spark群集的了,哇,很难受,一直卡住
说什么master不通,查了半天,忽然想起怎么不加 –master这个配置反而执行成功了,
查了一下不加 –master默认 –master local,呀,紧跟着后边 –master spark://master:7077
(默认端口为7077)
~~~~~~~~~
恍然大悟,很难受,这样一来,通了。。。。。
坑(二)
17/04/30 13:37:29 INFO scheler.TaskSchelerImpl: Adding task set 0.0 with 2 tasks17/04/30 13:37:44 WARN scheler.TaskSchelerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources17/04/30 13:37:59 WARN scheler.TaskSchelerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources123

查了一下,大体意思就是内存不足,资源不足。。。这个好办,改了一下配置文件,不知道起没起作用,估计是没起作用,
在spark目录中的spark_env.sh中添加了export SPARK_EXECUTOR_MEMORY=512M
其实主要不在这里,不加的话默认为1G。
就是启动命令上我们 –executor-memory 1G 或者–executor-memory 512M 都没问题,
–executor-memory 2G就有问题了。
这样资源不足的问题也解决了。
坑(三)
提交作业后,总是执行一半卡住,估计连一半也没执行,看了后台的works 输出日志,
com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure1

这错误挺明显的,就是数据库连接失败了。
开始思考,是不是提交作业到群集每个slave上都需要一个 mysql.jdbc包呢,试了一下,
–jars /home/examples/mysql.jar 这个配置一开始就有,我只是在master上存在mysql.jdbc包,
于是把mysql.jdbc包放到slave的相同的位置。结果还是不行。。。
还是一样的错误。。。
。。。。。。。。。。。
一宿过去了。。。。。。。。。。
爬起来,继续干,灵光一现,,,,,,,,
"jdbc:mysql://localhost:3306/"+mysql_database+"?user="+mysql_user+"&password="+mysql_password1

scala中连接数据库我是这样写的,localhost的,有没有,发现问题了吧。。。。
目前只是猜测,,,,,
我猜,slave向master连接数据库时出现了问题。。。。
slave上并没有mysql,,,
我们需要把数据都存入master上,
试着把localhost改成master,,,,哇哦。。。
成功了呢。。。。。
开心不
开心。
到上边已经算是结束了。
过程中还有一坑,,,就是一开始自己写了个wordcount结果存入mysql的小实验,
val conf = new SparkConf().setAppName("RDDToMysql").setMaster("local")1

配置这样写的。。。。。
我特么怎么说不管 –master spark 还是 –master local都成功。。。问题在这里呀。我该
写成空配置
val conf = new SparkConf()1

还有,,,,要处理json,,,sc.textFile肯定是不太好的,因为他都是一行一行的读取的,
如果你的json数据不是规则的一行一个数据,那就完蛋了。
最好的方式就是读取批量小文件 ,我们规定一个txt文件只有一条json数据。。
直接读取整个文件
sc.wholeTextFiles()完美
赶紧分析点有趣的数据,,,哈哈

9. Spark连接到MySQL并执行查询为什么速度会快

在已有的 MySQL 服务器之上使用 Apache Spark (无需将数据导出到 Spark 或者 Hadoop 平台上),这样至少可以提升 10 倍的查询性能。使用多个 MySQL 服务器(复制或者 Percona XtraDB Cluster)可以让我们在某些查询上得到额外的性能提升。你也可以使用 Spark 的缓存功能来缓存整个 MySQL 查询结果表。

思路很简单:Spark 可以通过 JDBC 读取 MySQL 上的数据,也可以执行 SQL 查询,因此我们可以直接连接到 MySQL 并执行查询。那么为什么速度会快呢?对一些需要运行很长时间的查询(如报表或者BI),由于 Spark 是一个大规模并行系统,因此查询会非常的快。MySQL 只能为每一个查询分配一个 CPU 核来处理,而 Spark 可以使用所有集群节点的所有核。在下面的例子中,我们会在 Spark 中执行 MySQL 查询,这个查询速度比直接在 MySQL 上执行速度要快 5 到 10 倍。

另外,Spark 可以增加“集群”级别的并行机制,在使用 MySQL 复制或者 Percona XtraDB Cluster 的情况下,Spark 可以把查询变成一组更小的查询(有点像使用了分区表时可以在每个分区都执行一个查询),然后在多个 Percona XtraDB Cluster 节点的多个从服务器上并行的执行这些小查询。最后它会使用map/rece 方式将每个节点返回的结果聚合在一起形成完整的结果。

10. spark streaming可以基于executor建立mysql的连接池吗

对于一个在线的spark streaming系统,DStream是源源不断的,当需要查Mysql数据库时,如果我们基于每个RDD,或基于分区建立mysql连接,那么需要经常建立、关闭数据库连接。能不能在启动application时,在executor上先建立一个mysql连接池,然后该executor上的所有task都直接使用连接池中的连接访问数据库?如果可以,怎么实现呢?