1. spark連接資料庫連接數過高
使用Mysql資料庫,有一個容易出現的問題——Too many connections。連接數超過。
我們知道,由於SUPER許可權有很多特權,因此不會把這個許可權給予應用的賬號。但是,當應用異常或者資料庫異常,達到最大連接數的時候,用管理賬號登錄,有時候仍然會報Too many connections。此時,如果應用不能及時處理,資料庫這邊就很難辦了。
所以,當應用異常並且頻繁嘗試建立連接的時候,常能占據那第max_connections+1個連接。super賬號由於拿不到線程,因此也是Too many connections了。
2. sparkdriverstacktrace導致資料庫連接不上
網路延遲。sparkdriverstacktrace是電腦程序的驅動器節點,由於網路延遲會導致出現資料庫連接不上的情況。解決方法如下:
1、首先重新啟動計算機。
2、其次點擊重新進入sparkdriverstacktrace節點。
3、最後點擊左上角的刷新即可。
3. sparksql jdbc 求助
jdbc和連接池對於你這個場景來說,都足夠,既然用spring管理了,建議還是使用連接池,另外,spring自身沒有實現連接池,一般都是對第三方連接池的包裝,常見的有C3P0,dbcp以及最近比較流行的boneCP等,這幾個配置都差不多太多,以boneCP為例: <bean id="dataSource" class="com.jolbox.bonecp.BoneCPDataSource" destroy-method="close"> <property name="driverClass" value="${jdbc.driverClass}" /> <property name="jdbcUrl" value="${jdbc.url}" /> <property name="username" value="${jdbc.user}" /> <property name="password" value="${jdbc.password}" /> <property name="idleConnectionTestPeriod" value="60" /> <property name="idleMaxAge" value="240" /> <property name="maxConnectionsPerPartition" value="30" /> <property name="minConnectionsPerPartition" value="10" /> <property name="partitionCount" value="2" /> <property name="acquireIncrement" value="5" /> <property name="statementsCacheSize" value="100" /> <property name="releaseHelperThreads" value="3" /> </bean> <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate"> <property name="dataSource" ref="dataSource" /> </bean>
4. Spark通信框架Spark Network Common
一直以來,基於Akka實現的RPC通信框架是Spark引以為豪的主要特性,也是與Hadoop等分布式計算框架對比過程中一大亮點,但是時代和技術都在演化,從Spark1.3.1版本開始, 為了解決大塊數據(如Shuffle)的傳輸問題 ,Spark引入了Netty通信框架,到了1.6.0版本,Netty完全取代了Akka,承擔Spark內部所有的RPC通信以及數據流傳輸。
JAVA IO也經歷了幾次演化,從最早的BIO(阻塞式/非阻塞IO),到1.4版本的NIO(IO復用),到1.7版本的NIO2.0/AIO(非同步IO)。
基於早期BIO來實現高並發網路伺服器都是依賴多線程來實現,但是線程開銷較大,BIO的瓶頸明顯,NIO的出現解決了這一大難題, 基於IO復用解決了IO高並發 。
但是NIO有也有幾個缺點:
因為這幾個原因,促使了很多JAVA-IO通信框架的出現,Netty就是其中一員,它也因為高度的穩定性,功能性,性能等特性,成為Java開發的首選
首先是NIO的上層封裝,Netty提供了NioEventLoopGroup / NioSocketChannel / NioServerSocketChannel的組合來完成實際IO操作,繼而在此之上實現數據流Pipeline以及EventLoop線程池等功能。
另外它又重寫了NIO,JDK-NIO底層是基於Epoll的LT模式來實現,而Netty是基於Epoll的ET模式實現的一組IO操作EpollEventLoopGroup / EpollSocketChannel / EpollServerSocketChannel
Netty對兩種實現進行完美的封裝,可以根據業務的需求來選擇不同的實現
從Akka出現背景來說,它是基於Actor的RPC通信系統,它的核心概念也是Message,它是基於協程的,性能不容置疑;基於scala的偏函數,易用性也沒有話說,但是它畢竟只是RPC通信,無法適用大的package/stream的數據傳輸,這也是Spark早期引入Netty的原因。
首先不容置疑的是Akka可以做到的,Netty也可以做到,但是Netty可以做到,Akka卻無法做到。原因是啥?在軟體棧中,Akka相比Netty要Higher一點,它專門針對RPC做了很多事情,而Netty相比更加基礎一點, 可以為不同的應用層通信協議(RPC,FTP,HTTP等)提供支持 ,在早期的Akka版本,底層的NIO通信就是用的Netty。
其次一個優雅的工程師是不會允許一個系統中容納兩套通信框架!最後,雖然Netty沒有Akka協程級的性能優勢,但是Netty內部高效的Reactor線程模型,無鎖化的串列設計,高效的序列化,零拷貝,內存池等特性也保證了Netty不會存在性能問題。
那麼Spark是怎麼用Netty來取代Akka呢?一句話,利用偏函數的特性,基於Netty「仿造」出一個簡約版本的Actor模型。
對於Network通信,不管傳輸的是序列化後的對象還是文件,在網路上表現的都是位元組流。在傳統IO中,位元組流表示為Stream;在NIO中,位元組流表示為ByteBuffer;在Netty中位元組流表示為ByteBuff或FileRegion;在Spark中,針對Byte也做了一層包裝,支持對Byte和文件流進行處理,即ManagedBuffer;
ManagedBuffer包含了三個函數createInputStream(),nioByteBuffer(),convertToNetty()來對Buffer進行「類型轉換」,分別獲取stream,ByteBuffer,ByteBuff或FileRegion;NioManagedBuffer / NettyManagedBuffer / FileSegmentManagedBuffer也是針對性提供了具體的實現。
更好的理解ManagedBuffer :比如Shuffle BlockManager模塊需要在內存中維護本地executor生成的shuffle-map輸出的文件引用,從而可以提供給shuffleFetch進行遠程讀取,此時文件表示為FileSegmentManagedBuffer,shuffleFetch遠程調用FileSegmentManagedBuffer.nioByteBuffer / createInputStream函數從文件中讀取為Bytes,並進行後面的網路傳輸。如果已經在內存中bytes就更好理解了,比如將一個字元數組表示為NettyManagedBuffer。
協議是應用層通信的基礎,它提供了應用層通信的數據表示,以及編碼和解碼的能力。在Spark Network Common中,繼承AKKA中的定義,將協議命名為Message,它繼承Encodable,提供了encode的能力。
Message根據請求響應可以劃分為RequestMessage和ResponseMessage兩種;對於Response,根據處理結果,可以劃分為Failure和Success兩種類型;根據功能的不同,主要劃分為Stream,ChunkFetch,Rpc。
Server構建在Netty之上,它提供兩種模型NIO和Epoll,可以通過參數(spark.[mole].io.mode)進行配置,最基礎的mole就是shuffle,不同的IOMode選型,對應了Netty底層不同的實現,Server的Init過程中,最重要的步驟就是根據不同的IOModel完成EventLoop和Pipeline的構造
其中,MessageEncoder/Decoder針對網路包到Message的編碼和解碼,而最為核心就TransportRequestHandler,它封裝了對所有請求/響應的處理;
TransportChannelHandler內部實現也很簡單,它封裝了responseHandler和requestHandler,當從Netty中讀取一條Message以後,根據判斷路由給相應的responseHandler和requestHandler。
Sever提供的RPC,ChunkFecth,Stream的功能都是依賴TransportRequestHandler來實現的;從原理上來說,RPC與ChunkFecth / Stream還是有很大不同的,其中RPC對於TransportRequestHandler來說是功能依賴,而ChunkFecth / Stream對於TransportRequestHandler來說只是數據依賴。
怎麼理解?即TransportRequestHandler已經提供了ChunkFecth / Stream的實現,只需要在構造的時候,向TransportRequestHandler提供一個streamManager,告訴RequestHandler從哪裡可以讀取到Chunk或者Stream。而RPC需要向TransportRequestHandler注冊一個rpcHandler,針對每個RPC介面進行功能實現,同時RPC與ChunkFecth / Stream都會有同一個streamManager的依賴,因此注入到TransportRequestHandler中的streamManager也是依賴rpcHandler來實現,即rpcHandler中提供了RPC功能實現和streamManager的數據依賴。
Server是通過監聽一個埠,注入rpcHandler和streamManager從而對外提供RPC,ChunkFecth,Stream的服務,而Client即為一個客戶端類,通過該類,可以將一個streamId / chunkIndex對應的ChunkFetch請求,streamId對應的Stream請求,以及一個RPC數據包對應的RPC請求發送到服務端,並監聽和處理來自服務端的響應;其中最重要的兩個類即為TransportClient和TransportResponseHandler分別為上述的「客戶端類」和「監聽和處理來自服務端的響應"。
那麼TransportClient和TransportResponseHandler是怎麼配合一起完成Client的工作呢? 由TransportClient將用戶的RPC,ChunkFecth,Stream的請求進行打包並發送到Server端,同時將用戶提供的回調函數注冊到TransportResponseHandler,TransportResponseHandler是TransportChannelHandler的一部分,在TransportChannelHandler接收到數據包,並判斷為響應包以後,將包數據路由到TransportResponseHandler中,在TransportResponseHandler中通過注冊的回調函數,將響應包的數據返回給客戶端
無論是BlockTransfer還是ShuffleFetch都需要跨executor的數據傳輸,在每一個executor裡面都需要運行一個Server線程(後面也會分析到,對於Shuffle也可能是一個獨立的ShuffleServer進程存在)來提供對Block數據的遠程讀寫服務
在每個Executor裡面,都有一個BlockManager模塊,它提供了對當前Executor所有的Block的「本地管理」,並對進程內其他模塊暴露getBlockData(blockId: BlockId): ManagedBuffer的Block讀取介面,但是這里GetBlockData僅僅是提供本地的管理功能,對於跨遠程的Block傳輸,則由NettyBlockTransferService提供服務。
NettyBlockTransferService本身即是Server,為其他其他遠程Executor提供Block的讀取功能,同時它即為Client,為本地其他模塊暴露fetchBlocks的介面,支持通過host/port拉取任何Executor上的一組的Blocks。
源碼位置 spark-core: org.apache.spark.network.netty
NettyBlockTransferService作為一個Server,與Executor或Driver裡面其他的服務一樣,在進程啟動時,由SparkEnv初始化構造並啟動服務,在整個運行時的一部分。
一個Server的構造依賴RpcHandler提供RPC的功能注入以及提供streamManager的數據注入。對於NettyBlockTransferService,該RpcHandler即為NettyBlockRpcServer,在構造的過程中,需要與本地的BlockManager進行管理,從而支持對外提供本地BlockMananger中管理的數據
RpcHandler提供RPC的功能注入 在這里還是屬於比較「簡陋的」,畢竟他是屬於數據傳輸模塊,Server中提供的chunkFetch和stream已經足夠滿足他的功能需要,那現在問題就是怎麼從streamManager中讀取數據來提供給chunkFetch和stream進行使用呢?
就是NettyBlockRpcServer作為RpcHandler提供的一個Rpc介面之一:OpenBlocks,它接受由Client提供一個Blockids列表,Server根據該BlockIds從BlockManager獲取到相應的數據並注冊到streamManager中,同時返回一個StreamID,後續Client即可以使用該StreamID發起ChunkFetch的操作。
從NettyBlockTransferService作為一個Server,我們基本可以推測NettyBlockTransferService作為一個Client支持fetchBlocks的功能的基本方法:
同時,為了提高服務端穩定性,針對fetchBlocks操作NettyBlockTransferService提供了非重試版本和重試版本的BlockFetcher,分別為OneForOneBlockFetcher和RetryingBlockFetcher,通過參數(spark.[mole].io.maxRetries)進行配置,默認是重試3次
在Spark,Block有各種類型,可以是ShuffleBlock,也可以是BroadcastBlock等等,對於ShuffleBlock的Fetch,除了由Executor內部的NettyBlockTransferService提供服務以外,也可以由外部的ShuffleService來充當Server的功能,並由專門的ExternalShuffleClient來與其進行交互,從而獲取到相應Block數據。功能的原理和實現,基本一致,但是問題來了, 為什麼需要一個專門的ShuffleService服務呢? 主要原因還是為了做到任務隔離,即減輕因為fetch帶來對Executor的壓力,讓其專心的進行數據的計算。
在目前Spark中,也提供了這樣的一個AuxiliaryService:YarnShuffleService,但是對於Spark不是必須的,如果你考慮到需要「 通過減輕因為fetch帶來對Executor的壓力 」,那麼就可以嘗試嘗試。
同時,如果啟用了外部的ShuffleService,對於shuffleClient也不是使用上面的NettyBlockTransferService,而是專門的ExternalShuffleClient,功能邏輯基本一致!
Akka的通信模型是基於Actor,一個Actor可以理解為一個Service服務對象,它可以針對相應的RPC請求進行處理,如下所示,定義了一個最為基本的Actor:
Actor內部只有唯一一個變數(當然也可以理解為函數了),即Receive,它為一個偏函數,通過case語句可以針對Any信息可以進行相應的處理,這里Any消息在實際項目中就是消息包。
另外一個很重要的概念就是ActorSystem,它是一個Actor的容器,多個Actor可以通過name->Actor的注冊到Actor中,在ActorSystem中可以根據請求不同將請求路由給相應的Actor。ActorSystem和一組Actor構成一個完整的Server端,此時客戶端通過host:port與ActorSystem建立連接,通過指定name就可以相應的Actor進行通信,這里客戶端就是ActorRef。所有Akka整個RPC通信系列是由Actor,ActorRef,ActorSystem組成。
Spark基於這個思想在上述的Network的基礎上實現一套自己的RPC Actor模型,從而取代Akka。其中RpcEndpoint對應Actor,RpcEndpointRef對應ActorRef,RpcEnv即對應了ActorSystem。
RpcEndpoint與Actor一樣,不同RPC Server可以根據業務需要指定相應receive/receiveAndReply的實現,在Spark內部現在有N多個這樣的Actor,比如Executor就是一個Actor,它處理來自Driver的LaunchTask/KillTask等消息。
RpcEnv相對於ActorSystem:
RpcEndpointRef即為與相應Endpoint通信的引用,它對外暴露了send/ask等介面,實現將一個Message發送到Endpoint中。
這就是新版本的RPC框架的基本功能,它的實現基本上與Akka無縫對接,業務的遷移的功能很小,目前基本上都全部遷移完了。
RpcEnv不僅從外部介面與Akka基本一致,在內部的實現上,也基本差不多,都是按照MailBox的設計思路來實現的;
RpcEnv即充當著Server,同時也為Client內部實現。
當作為Server ,RpcEnv會初始化一個Server,並注冊NettyRpcHandler。RpcHandler的receive介面負責對每一個請求進行處理,一般情況下,簡單業務可以在RpcHandler直接完成請求的處理,但是考慮一個RpcEnv的Server上會掛載了很多個RpcEndpoint,每個RpcEndpoint的RPC請求頻率不可控,因此需要對一定的分發機制和隊列來維護這些請求,其中Dispatcher為分發器,InBox即為請求隊列;
在將RpcEndpoint注冊到RpcEnv過程中,也間接的將RpcEnv注冊到Dispatcher分發器中,Dispatcher針對每個RpcEndpoint維護一個InBox,在Dispatcher維持一個線程池(線程池大小默認為系統可用的核數,當然也可以通過spark.rpc.netty.dispatcher.numThreads進行配置),線程針對每個InBox裡面的請求進行處理。當然實際的處理過程是由RpcEndpoint來完成。
其次RpcEnv也完成Client的功能實現 ,RpcEndpointRef是以RpcEndpoint為單位,即如果一個進程需要和遠程機器上N個RpcEndpoint服務進行通信,就對應N個RpcEndpointRef(後端的實際的網路連接是公用,這個是TransportClient內部提供了連接池來實現的),當調用一個RpcEndpointRef的ask/send等介面時候,會將把「消息內容+RpcEndpointRef+本地地址」一起打包為一個RequestMessage,交由RpcEnv進行發送。注意這里打包的消息裡麵包括RpcEndpointRef本身是很重要的,從而可以由Server端識別出這個消息對應的是哪一個RpcEndpoint。
和發送端一樣,在RpcEnv中,針對每個remote端的host:port維護一個隊列,即OutBox,RpcEnv的發送僅僅是把消息放入到相應的隊列中,但是和發送端不一樣的是:在OutBox中沒有維護一個所謂的線程池來定時清理OutBox,而是通過一堆synchronized來實現的,add之後立刻消費。
摘自:Github/ColZer
5. spark streaming可以基於executor建立mysql的連接池嗎
spark streaming可以基於executor建立mysql的連接池嗎
可以啊。你實現個單例就行了。
6. spark streaming可以基於executor建立mysql的連接池嗎
對於一個在線的spark streaming系統,DStream是源源不斷的,當需要查Mysql資料庫時,如果我們基於每個RDD,或基於分區建立mysql連接,那麼需要經常建立、關閉資料庫連接。能不能在啟動application時,在executor上先建立一個mysql連接池,然後該executor上的所有task都直接使用連接池中的連接訪問資料庫?如果可以,怎麼實現呢?
7. 科普Spark,Spark是什麼,如何使用Spark
科普Spark,Spark是什麼,如何使用Spark
1.Spark基於什麼演算法的分布式計算(很簡單)
2.Spark與MapRece不同在什麼地方
3.Spark為什麼比Hadoop靈活
4.Spark局限是什麼
5.什麼情況下適合使用Spark
Spark與Hadoop的對比
Spark的中間數據放到內存中,對於迭代運算效率更高。
Spark更適合於迭代運算比較多的ML和DM運算。因為在Spark裡面,有RDD的抽象概念。
Spark比Hadoop更通用
Spark提供的數據集操作類型有很多種,不像Hadoop只提供了Map和Rece兩種操作。比如map, filter, flatMap, sample, groupByKey, receByKey, union, join, cogroup, mapValues, sort,partionBy等多種操作類型,Spark把這些操作稱為Transformations。同時還提供Count, collect, rece, lookup, save等多種actions操作。
這些多種多樣的數據集操作類型,給給開發上層應用的用戶提供了方便。各個處理節點之間的通信模型不再像Hadoop那樣就是唯一的Data Shuffle一種模式。用戶可以命名,物化,控制中間結果的存儲、分區等。可以說編程模型比Hadoop更靈活。
不過由於RDD的特性,Spark不適用那種非同步細粒度更新狀態的應用,例如web服務的存儲或者是增量的web爬蟲和索引。就是對於那種增量修改的應用模型不適合。
容錯性
在分布式數據集計算時通過checkpoint來實現容錯,而checkpoint有兩種方式,一個是checkpoint data,一個是logging the updates。用戶可以控制採用哪種方式來實現容錯。
可用性
Spark通過提供豐富的Scala, Java,Python API及互動式Shell來提高可用性。
Spark與Hadoop的結合
Spark可以直接對HDFS進行數據的讀寫,同樣支持Spark on YARN。Spark可以與MapRece運行於同集群中,共享存儲資源與計算,數據倉庫Shark實現上借用Hive,幾乎與Hive完全兼容。
Spark的適用場景
Spark是基於內存的迭代計算框架,適用於需要多次操作特定數據集的應用場合。需要反復操作的次數越多,所需讀取的數據量越大,受益越大,數據量小但是計算密集度較大的場合,受益就相對較小(大資料庫架構中這是是否考慮使用Spark的重要因素)
由於RDD的特性,Spark不適用那種非同步細粒度更新狀態的應用,例如web服務的存儲或者是增量的web爬蟲和索引。就是對於那種增量修改的應用模型不適合。總的來說Spark的適用面比較廣泛且比較通用。
運行模式
本地模式
Standalone模式
Mesoes模式
yarn模式
Spark生態系統
Shark ( Hive on Spark): Shark基本上就是在Spark的框架基礎上提供和Hive一樣的H iveQL命令介面,為了最大程度的保持和Hive的兼容性,Shark使用了Hive的API來實現query Parsing和 Logic Plan generation,最後的PhysicalPlan execution階段用Spark代替Hadoop MapRece。通過配置Shark參數,Shark可以自動在內存中緩存特定的RDD,實現數據重用,進而加快特定數據集的檢索。同時,Shark通過UDF用戶自定義函數實現特定的數據分析學習演算法,使得SQL數據查詢和運算分析能結合在一起,最大化RDD的重復使用。
Spark streaming: 構建在Spark上處理Stream數據的框架,基本的原理是將Stream數據分成小的時間片斷(幾秒),以類似batch批量處理的方式來處理這小部分數據。Spark Streaming構建在Spark上,一方面是因為Spark的低延遲執行引擎(100ms+)可以用於實時計算,另一方面相比基於Record的其它處理框架(如Storm),RDD數據集更容易做高效的容錯處理。此外小批量處理的方式使得它可以同時兼容批量和實時數據處理的邏輯和演算法。方便了一些需要歷史數據和實時數據聯合分析的特定應用場合。
Bagel: Pregel on Spark,可以用Spark進行圖計算,這是個非常有用的小項目。Bagel自帶了一個例子,實現了Google的PageRank演算法。
End.