當前位置:首頁 » 數據倉庫 » spark連接資料庫
擴展閱讀
webinf下怎麼引入js 2023-08-31 21:54:13
堡壘機怎麼打開web 2023-08-31 21:54:11

spark連接資料庫

發布時間: 2022-05-30 07:52:40

1. spark在處理數據的時候需要連接資料庫

spark在處理數據的時候需要連接資料庫嗎
spark也不是無所不能的啊,提供思路: 直接監聽mysql的bin log 然後處理bin log的日誌到hdfs上

2. 求問怎麼設置sparksql讀取hive的資料庫

求問怎麼設置sparksql讀取hive的資料庫
使用maven進行打包:
打包命令:
mvn -Pyarn -Dhadoop.version=2.3.0-cdh5.0.0 -Phive -Phive-thriftserver -DskipTests clean package

3. 求助各位大神.關於spark SQL的連接問題

主要內容:
1. Spark Streaming 另類在線實驗
2. 理解Spark Streaming本質

寫在前面的話:
為什麼我們要以SparkStreaming為切入點進行Spark的源碼定製呢?
原因如下:
1從研究目的來看
在Spark創立之初,並沒有現在我們常用的這些子框架,如Spark Streaming、Spark SQL、Spark R等內容,僅有原始的Spark core;而這些子框架是在Spark不斷發展中加入到Spark大家庭的。我們採用的是與Spark core聯系最緊密的Spark Streaming作為切入點,可以透過一個子框架的徹底研究,從而了解Spark的核心問題與解決辦法

2各個框架的對比來說
現階段,我們運行最多的是Sparkcore 以及其他的一些常用子框架。一下我們做了一些縱向對比。

4. 如何使用sparksql向mysql中插入數據

f(isset($_POST['submit'])&&$_POST['submit']=='提交'){
3 //判斷是否是提交過來的
4 $intext = $_POST['intext'];
5 if($intext!=null||$intext!=''){
6 $link = mysql_connect("localhost", "root", "123456");
7 //資料庫配置信息 第一個參數資料庫位置第二個是用戶名第三個是密碼
8 mysql_select_db("szn_test");
9 //設置要使用的資料庫
10 $sql = "select * from demo where res = '".$intext."'";

5. Spark新手求教 SparkStreaming能否連接MySQL進行查詢操作

支持mysql的,下面是示例
spark streaming使用數據源方式插入mysql數據
import java.sql.{Connection, ResultSet}
import com.jolbox.bonecp.{BoneCP, BoneCPConfig}
import org.slf4j.LoggerFactory

object ConnectionPool {

val logger = LoggerFactory.getLogger(this.getClass)
private val connectionPool = {
try{
Class.forName("com.mysql.jdbc.Driver")
val config = new BoneCPConfig()
config.setJdbcUrl("jdbc:mysql://192.168.0.46:3306/test")
config.setUsername("test")
config.setPassword("test")
config.setMinConnectionsPerPartition(2)
config.setMaxConnectionsPerPartition(5)
config.setPartitionCount(3)
config.setCloseConnectionWatch(true)
config.setLogStatementsEnabled(true)
Some(new BoneCP(config))
} catch {
case exception:Exception=>
logger.warn("Error in creation of connection pool"+exception.printStackTrace())
None
}
}

def getConnection:Option[Connection] ={
connectionPool match {
case Some(connPool) => Some(connPool.getConnection)
case None => None
}
}

def closeConnection(connection:Connection): Unit = {
if(!connection.isClosed) connection.close()
}
}

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.slf4j.LoggerFactory

/**
* 記錄最近五秒鍾的數據
*/
object RealtimeCount1{

case class Loging(vtime:Long,muid:String,uid:String,ucp:String,category:String,autoSid:Int,dealerId:String,tuanId:String,newsId:String)

case class Record(vtime:Long,muid:String,uid:String,item:String,types:String)

val logger = LoggerFactory.getLogger(this.getClass)

def main(args: Array[String]) {
val argc = new Array[String](4)
argc(0) = "10.0.0.37"
argc(1) = "test-1"
argc(2) = "test22"
argc(3) = "1"
val Array(zkQuorum, group, topics, numThreads) = argc
val sparkConf = new SparkConf().setAppName("RealtimeCount").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(5))

val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap

val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(x=>x._2)

val sql = "insert into loging_realtime1(vtime,muid,uid,item,category) values (?,?,?,?,?)"

val tmpdf = lines.map(_.split("\t")).map(x=>Loging(x(9).toLong,x(1),x(0),x(3),x(25),x(18).toInt,x(29),x(30),x(28))).filter(x=>(x.muid!=null && !x.muid.equals("null") && !("").equals(x.muid))).map(x=>Record(x.vtime,x.muid,x.uid,getItem(x.category,x.ucp,x.newsId,x.autoSid.toInt,x.dealerId,x.tuanId),getType(x.category,x.ucp,x.newsId,x.autoSid.toInt,x.dealerId,x.tuanId)))
tmpdf.filter(x=>x.types!=null).foreachRDD{rdd =>
//rdd.foreach(println)
rdd.foreachPartition(partitionRecords=>{
val connection = ConnectionPool.getConnection.getOrElse(null)
if(connection!=null){
partitionRecords.foreach(record=>process(connection,sql,record))
ConnectionPool.closeConnection(connection)
}
})
}
ssc.start()
ssc.awaitTermination()
}

def getItem(category:String,ucp:String,newsId:String,autoSid:Int,dealerId:String,tuanId:String):String = {
if(category!=null && !category.equals("null")){
val pattern = ""
val matcher = ucp.matches(pattern)
if(matcher) {
ucp.substring(33,42)
}else{
null
}
}else if(autoSid!=0){
autoSid.toString
}else if(dealerId!=null && !dealerId.equals("null")){
dealerId
}else if(tuanId!=null && !tuanId.equals("null")){
tuanId
}else{
null
}
}

def getType(category:String,ucp:String,newsId:String,autoSid:Int,dealerId:String,tuanId:String):String = {
if(category!=null && !category.equals("null")){
val pattern = "100000726;100000730;\\d{9};\\d{9}"
val matcher = category.matches(pattern)

val pattern1 =""
val matcher1 = ucp.matches(pattern1)

if(matcher1 && matcher) {
"nv"
}else if(newsId!=null && !newsId.equals("null") && matcher1){
"ns"
}else if(matcher1){
"ne"
}else{
null
}
}else if(autoSid!=0){
"as"
}else if(dealerId!=null && !dealerId.equals("null")){
"di"
}else if(tuanId!=null && !tuanId.equals("null")){
"ti"
}else{
null
}
}

def process(conn:Connection,sql:String,data:Record): Unit ={
try{
val ps : PreparedStatement = conn.prepareStatement(sql)
ps.setLong(1,data.vtime)
ps.setString(2,data.muid)
ps.setString(3,data.uid)
ps.setString(4,data.item)
ps.setString(5,data.types)
ps.executeUpdate()
}catch{
case exception:Exception=>
logger.warn("Error in execution of query"+exception.printStackTrace())
}
}
}

6. spark從hive數據倉庫中讀取的數據可以使用sparksql進行查詢嗎

1、為了讓Spark能夠連接到Hive的原有數據倉庫,我們需要將Hive中的hive-site.xml文件拷貝到Spark的conf目錄下,這樣就可以通過這個配置文件找到Hive的元數據以及數據存放。
在這里由於我的Spark是自動安裝和部署的,因此需要知道CDH將hive-site.xml放在哪裡。經過摸索。該文件默認所在的路徑是:/etc/hive/conf 下。
同理,spark的conf也是在/etc/spark/conf。
此時,如上所述,將對應的hive-site.xml拷貝到spark/conf目錄下即可
如果Hive的元數據存放在Mysql中,我們還需要准備好Mysql相關驅動,比如:mysql-connector-java-5.1.22-bin.jar。
2、編寫測試代碼
val conf=new SparkConf().setAppName("Spark-Hive").setMaster("local")
val sc=new SparkContext(conf)

//create hivecontext
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' ") //這里需要注意數據的間隔符

sqlContext.sql("LOAD DATA INPATH '/user/liujiyu/spark/kv1.txt' INTO TABLE src ");

sqlContext.sql(" SELECT * FROM jn1").collect().foreach(println)

sc.stop()

3、下面列舉一下出現的問題:
(1)如果沒有將hive-site.xml拷貝到spark/conf目錄下,會出現:

分析:從錯誤提示上面就知道,spark無法知道hive的元數據的位置,所以就無法實例化對應的client。
解決的辦法就是必須將hive-site.xml拷貝到spark/conf目錄下
(2)測試代碼中沒有加sc.stop會出現如下錯誤:
ERROR scheler.LiveListenerBus: Listener EventLoggingListener threw an exception
java.lang.reflect.InvocationTargetException
在代碼最後一行添加sc.stop()解決了該問題。

7. spark連接資料庫連接數過高

使用MySQL資料庫,有一個容易出現的問題——Too many connections。連接數超過。
我們知道,由於SUPER許可權有很多特權,因此不會把這個許可權給予應用的賬號。但是,當應用異常或者資料庫異常,達到最大連接數的時候,用管理賬號登錄,有時候仍然會報Too many connections。此時,如果應用不能及時處理,資料庫這邊就很難辦了。
所以,當應用異常並且頻繁嘗試建立連接的時候,常能占據那第max_connections+1個連接。super賬號由於拿不到線程,因此也是Too many connections了。

8. spark存入mysql會失敗嗎


Spark 分析Json數據存入Mysql 遇到的坑
折騰了兩天,終算是弄好了,入的坑不計其數,但是也要數一數。
坑(一)
之前搭建好了spark,就是簡單的wordcount一下,成功了也就沒在意。
這幾天重新拾起來,一上來就記得 –master spark://master:55555
這個55555埠其實是我的hdfs的埠,結果給記成spark群集的了,哇,很難受,一直卡住
說什麼master不通,查了半天,忽然想起怎麼不加 –master這個配置反而執行成功了,
查了一下不加 –master默認 –master local,呀,緊跟著後邊 –master spark://master:7077
(默認埠為7077)
~~~~~~~~~
恍然大悟,很難受,這樣一來,通了。。。。。
坑(二)
17/04/30 13:37:29 INFO scheler.TaskSchelerImpl: Adding task set 0.0 with 2 tasks17/04/30 13:37:44 WARN scheler.TaskSchelerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources17/04/30 13:37:59 WARN scheler.TaskSchelerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources123

查了一下,大體意思就是內存不足,資源不足。。。這個好辦,改了一下配置文件,不知道起沒起作用,估計是沒起作用,
在spark目錄中的spark_env.sh中添加了export SPARK_EXECUTOR_MEMORY=512M
其實主要不在這里,不加的話默認為1G。
就是啟動命令上我們 –executor-memory 1G 或者–executor-memory 512M 都沒問題,
–executor-memory 2G就有問題了。
這樣資源不足的問題也解決了。
坑(三)
提交作業後,總是執行一半卡住,估計連一半也沒執行,看了後台的works 輸出日誌,
com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure1

這錯誤挺明顯的,就是資料庫連接失敗了。
開始思考,是不是提交作業到群集每個slave上都需要一個 mysql.jdbc包呢,試了一下,
–jars /home/examples/mysql.jar 這個配置一開始就有,我只是在master上存在mysql.jdbc包,
於是把mysql.jdbc包放到slave的相同的位置。結果還是不行。。。
還是一樣的錯誤。。。
。。。。。。。。。。。
一宿過去了。。。。。。。。。。
爬起來,繼續干,靈光一現,,,,,,,,
"jdbc:mysql://localhost:3306/"+mysql_database+"?user="+mysql_user+"&password="+mysql_password1

scala中連接資料庫我是這樣寫的,localhost的,有沒有,發現問題了吧。。。。
目前只是猜測,,,,,
我猜,slave向master連接資料庫時出現了問題。。。。
slave上並沒有mysql,,,
我們需要把數據都存入master上,
試著把localhost改成master,,,,哇哦。。。
成功了呢。。。。。
開心不
開心。
到上邊已經算是結束了。
過程中還有一坑,,,就是一開始自己寫了個wordcount結果存入mysql的小實驗,
val conf = new SparkConf().setAppName("RDDToMysql").setMaster("local")1

配置這樣寫的。。。。。
我特么怎麼說不管 –master spark 還是 –master local都成功。。。問題在這里呀。我該
寫成空配置
val conf = new SparkConf()1

還有,,,,要處理json,,,sc.textFile肯定是不太好的,因為他都是一行一行的讀取的,
如果你的json數據不是規則的一行一個數據,那就完蛋了。
最好的方式就是讀取批量小文件 ,我們規定一個txt文件只有一條json數據。。
直接讀取整個文件
sc.wholeTextFiles()完美
趕緊分析點有趣的數據,,,哈哈

9. Spark連接到MySQL並執行查詢為什麼速度會快

在已有的 MySQL 伺服器之上使用 Apache Spark (無需將數據導出到 Spark 或者 Hadoop 平台上),這樣至少可以提升 10 倍的查詢性能。使用多個 MySQL 伺服器(復制或者 Percona XtraDB Cluster)可以讓我們在某些查詢上得到額外的性能提升。你也可以使用 Spark 的緩存功能來緩存整個 MySQL 查詢結果表。

思路很簡單:Spark 可以通過 JDBC 讀取 MySQL 上的數據,也可以執行 SQL 查詢,因此我們可以直接連接到 MySQL 並執行查詢。那麼為什麼速度會快呢?對一些需要運行很長時間的查詢(如報表或者BI),由於 Spark 是一個大規模並行系統,因此查詢會非常的快。MySQL 只能為每一個查詢分配一個 CPU 核來處理,而 Spark 可以使用所有集群節點的所有核。在下面的例子中,我們會在 Spark 中執行 MySQL 查詢,這個查詢速度比直接在 MySQL 上執行速度要快 5 到 10 倍。

另外,Spark 可以增加「集群」級別的並行機制,在使用 MySQL 復制或者 Percona XtraDB Cluster 的情況下,Spark 可以把查詢變成一組更小的查詢(有點像使用了分區表時可以在每個分區都執行一個查詢),然後在多個 Percona XtraDB Cluster 節點的多個從伺服器上並行的執行這些小查詢。最後它會使用map/rece 方式將每個節點返回的結果聚合在一起形成完整的結果。

10. spark streaming可以基於executor建立mysql的連接池嗎

對於一個在線的spark streaming系統,DStream是源源不斷的,當需要查Mysql資料庫時,如果我們基於每個RDD,或基於分區建立mysql連接,那麼需要經常建立、關閉資料庫連接。能不能在啟動application時,在executor上先建立一個mysql連接池,然後該executor上的所有task都直接使用連接池中的連接訪問資料庫?如果可以,怎麼實現呢?