當前位置:首頁 » 數據倉庫 » spark如何獲取hbase配置信息
擴展閱讀
webinf下怎麼引入js 2023-08-31 21:54:13
堡壘機怎麼打開web 2023-08-31 21:54:11

spark如何獲取hbase配置信息

發布時間: 2023-03-30 16:10:46

① Spark實例-spark讀取外部配置文件之--files

在運行spark程序的時,有時需要讀取外部配置參數,比如mysql的host參數、埠號、es主機ip、es埠號等。通過外部文件配置參數也方便程序遷移。下面就來看看如何來實現。

1、首先我們需要一臘姿野個配置文件:
config.properties

2、上傳配置文件到某個節點:

4、通過 Maven打包程序:test_CDH.jar

5、上傳 test_CDH.jar 到集群

6、執行 submit 命令

spark2-submit --master yarn --deploy-mode cluster --files config.properties --jars libs/hbase-client-1.2.0.jar,libs/hbase-common-1.2.0.jar,libs/hbase-server-1.2.0.jar,libs/mysql-connector-java-8.0.15.jar --driver-class-path libs/mysql-connector-java-8.0.15.jar --conf spark.executor.userClassPathFirst=true --conf spark.driver.userClassPathFirst=true --class test_CDH.Main libs/test_CDH.jar $1

--files 參數指定輪喊我們需要載入冊和的外部配置文件

② idea中配置環境Spark3.0操作Hbase1.3.6

Date:2020/12/22
Version:Spark 3.0; java 1.8.0_221; Hbase 1.3.6; Scala 2.12.11;

1、首先是pom.xml,注釋了一些東西,比如 不用 添加hbase-client和hbase-server,java中寫MapRece操作hbase需要這兩個,scala寫spark操作hbase不需要這兩個,程序跑不起來,sc無法創建歷悔拿。

2、將hbase的lib中的以下jar文件添前蔽加進來。(to IDEA小白 :可以新建一個文件夾保存這些jar文件,在IDEA中添加一個java的library指向這個文件夾)

3、將hbase中的配置文件hbase-site.xml添加到項目中的resources文件夾中

4、測試肢搭spark連接hbase

運行後的結果:

③ 如何使用Spark/Scala讀取Hbase的數據

使用Spark/Scala讀取Hbase的數據 必須使用高亮參數啟動Spark-shell,否則當你遍歷RDD時會出現如下的Exception java.io.NotSerializableException: org.apache.hadoop.hbase.io.ImmutableBytesWritable spark-shell--conf spark.serializer=org.apache.spark.serializer.KryoSerializer 以下代碼,經過粗改MaprDB實測通過 import org.apache.spark._ import org.apache.spark.rdd.NewHadoopRDD import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} import org.apache.hadoop.hbase.client.HBaseAdmin import org.apache.hadoop.hbase.maprece.TableInputFormat import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HColumnDescriptor import org.apache.hadoop.hbase.util.Bytes import org如何使用Spark/Scala讀取游凳稿Hbase的神孝數據

④ hbase怎麼查看配置文件

hbase有本地模式和分布式模式
hbase-site.xml配置
hbase.tmp.dir

本地文件系統tmp目錄,一般配置成local模式的設置一下,但是最好還是需要設置一下,因為很多文件都會默認設置成它下面的
線上配置
<property>
<name>hbase.tmp.dir</name>
<value>/mnt/路徑</value>
</property>
默認值:
${java.io.tmpdir}/山賀hbase-${user.name}
寫到系統的/tmp目錄
hbase.rootdir

HBase集群中所有RegionServer共享目錄,用來持久化HBase的數據,一般設置的是型如hdfs的文件目錄,如hdfs://master:9000/hbasedata
線上配置
<property>
<name>hbase.rootdir</name>
<value>hdfs://master:9000/hbasedata</value>
</property>
默認值:
${hbase.tmp.dir}/hbase
hbase.cluster.distributed

集群的模式,分布式還是單機模式,如果設置成false的話,HBase進程和Zookeeper進程在同一個JVM進程。
線上配置為true
默認值:false
hbase.zookeeper.quorum

zookeeper集群的URL配置,多個host中間用逗號分割
線上配置
<property>
<name>hbase.zookeeper.quorum</name>
<value>master,slave,slave1</value>
</property>
默認值:localhost
hbase.zookeeper.property.dataDir

ZooKeeper的zoo.conf中的配置。 快照的存儲位置
線上配置:/home/hadoop/zookeeperData
默認值:${hbase.tmp.dir}/zookeeper
zookeeper.session.timeout

客戶端與zk連接超時時間
線上配置:1200000(20min)
默認值:180000(3min)
hbase.zookeeper.property.tickTime

Client端卜唯啟與zk發送心跳的時間間隔
線上配置:6000(6s)
默認值:6000
hbase.security.authentication

HBase集群安全認證機制,目前的版本只支持kerberos安全認證。
線上配置:kerberos
默認值:空
hbase.security.authorization

HBase是否開啟安全授權機制
線上配置: true
默認值: false
hbase.regionserver.kerberos.principal

regionserver的kerberos認證的主體名稱(由三部分組成:服務或用戶名稱、實例名稱以及域名)
線上配置:hbase/[email protected]
默認:無
hbase.regionserver.keytab.file

regionserver keytab文件路徑
線上配置:/home/hadoop/etc/conf/hbase.keytab
默認值:無
hbase.master.kerberos.principal

master的kerberos認證的主體名稱(由三部分組成:服務或用戶名稱、實例名稱以及域名)
線上配置:hbase/[email protected]
默認:無
hbase.master.keytab.file

master keytab文件路徑
線上配置:/home/hadoop/etc/conf/hbase.keytab
默認值:無
hbase.regionserver.handler.count

regionserver處理IO請求的線程數
線上配置:50
默認配置:10
hbase.regionserver.global.memstore.upperLimit

RegionServer進程block進行flush觸發條件:該節點上所有region的memstore之和達到upperLimit*heapsize
線上配置:0.45
默認配置:0.4
hbase.regionserver.global.memstore.lowerLimit

RegionServer進程觸發flush的一個條件:該節點上所有region的memstore之和達到lowerLimit*heapsize
線上配置:0.4
默認配置:0.35
hbase.client.write.buffer

客戶端寫buffer,設置autoFlush為false時,當客戶端寫滿buffer才flush
線上配置:8388608(8M)
默認配置:2097152(2M)
hbase.hregion.max.filesize

單個ColumnFamily的region大小,若按照ConstantSizeRegionSplitPolicy策略,超過設置的該值則自動split
線上配置:107374182400(100G)
默認配置:21474836480(20G)
hbase.hregion.memstore.block.multiplier

超過memstore大小的倍數達到該值則block所有寫入請求,自我保護
線上配置:8(內存夠大可以適當調大一些,出現這種情況需要客戶端做調整)
默認配置:2
hbase.hregion.memstore.flush.size

memstore大小,當達到該值則會flush到外存設備
線上配置:104857600(100M)
默認值: 134217728(128M)
hbase.hregion.memstore.mslab.enabled

是否開啟mslab方案,減少因內存碎片導致的Full GC,提高整體性能
線上配置:true
默認配置: true
hbase.regionserver.maxlogs

regionserver的hlog數量
線上配置:128
默認配置:32
hbase.regionserver.hlog.blocksize

hlog大小上限,達到該值則block,進行roll掉
線上配置:536870912(512M)
默認配置:hdfs配置的block大小
hbase.hstore.compaction.min

進入minor compact隊列的storefiles最小個數
線上配置:10
默認配置:3
hbase.hstore.compaction.max

單次minor compact最多的文件個數
線上配置:30
默認配置:10
hbase.hstore.blockingStoreFiles

當某一個region的storefile個數達到該值則block寫入,等待compact
線上配置:100(生產環境可以設置得很大)
默認配置: 7
hbase.hstore.blockingWaitTime

block的等待時間
線上配置:90000(90s)
默認配置:90000(90s)
hbase.hregion.majorcompaction

觸發major compact的周期
線上配置:0(關掉major compact)
默認配置:86400000(1d)
hbase.regionserver.thread.compaction.large

large compact線程池的線程個數
線上配置:5
默認配置:1
hbase.regionserver.thread.compaction.small

small compact線程池的線程個數
線上配置:5
默認配置:1
hbase.regionserver.thread.compaction.throttle

compact(major和minor)請求進入large和small compact線程池的臨界點
線上配置:10737418240(10G)
默認配置:2 * this.minFilesToCompact * this.region.memstoreFlushSize
hbase.hstore.compaction.max.size

minor compact隊列中storefile文件最大size
線上配置:21474836480(20G)
默認配置:Long.MAX_VALUE
hbase.rpc.timeout

RPC請求timeout時間
線上配置:300000(5min)
默認配置:60000(10s)
hbase.regionserver.region.split.policy

split操作默認的策略
線上配置: org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy(採取老的策略,自己控制split)
默認配置: org.apache.hadoop.hbase.regionserver.(在region沒有達到maxFileSize的前提下,如果fileSize達到regionCount * regionCount * flushSize則進行split操作)
hbase.regionserver.regionSplitLimit

單台RegionServer上region數上限
線上配置:150
默認配置:2147483647
hbase-env.sh配置
指定系統運行環境

export JAVA_HOME=/usr/lib/jvm/java-6-sun/ #JDK HOME
export HBASE_HOME=/home/hadoop/cdh4/hbase-0.94.2-cdh4.2.1 # HBase 安裝目錄
export HBASE_LOG_DIR=/mnt/dfs/11/hbase/hbase-logs #日誌輸出路徑
JVM參數調優

export HBASE_OPTS="-verbose:gc -XX:+PrintGCDetails -Xloggc:${HBASE_LOG_DIR}/hbase-gc.log -XX:+PrintGCTimeStamps -XX:+ -XX:+PrintGCApplicationStoppedTime \
-server -Xmx20480m -Xms20480m -Xmn10240m -Xss256k -XX:SurvivorRatio=4 -XX:MaxPermSize=256m -XX:MaxTenuringThreshold=15 \
-XX:ParallelGCThreads=16 -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:CMSFullGCsBeforeCompaction=5 -XX:+UseCMSCompactAtFullCollection \
-XX:+CMSClassUnloadingEnabled -XX:=70 -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSMaxAbortablePrecleanTime=5000 \
"

⑤ spark1.2.1實現讀取hbase的數據後怎麼實現實時查詢

調用parallelize函數直接從集合中獲取數據,並存入RDD中;Java版本如下:
1 JavaRDD<Integer> myRDD = sc.parallelize(Arrays.asList(1,2,3));
Scala版本如下:
1 val myRDD= sc.parallelize(List(1,2,3))
這種方式很簡單,很容易就可以將一個集合中的數據變成RDD的初始化值;更常見的是(2)、從文本中讀取數據到RDD中,這個文本蔽如可以是純文本文件、知並廳可以是sequence文件;可以存放在本地(file://)、可以存放在HDFS(hdfs://)上,還可以存放在S3上。其實對文件來說,Spark支搭隱持Hadoop所支持的所有文件類型和文件存放位置。

⑥ spark on hbase 讀寫

本文主要講述了spark對hbase進行獨寫的兩種方式,這兩種方式分別為:
1.利用spark提供的 newAPIHadoopRDD api 對hbase進行讀寫
2.SparkOnHbase,這種方式其實是利用Cloudera-labs開旦山源的一個HbaseContext的工具類來支持spark用RDD的方式批量讀寫hbase

hbase 表格式如下:

部分數據集如下:

文改遲塌中的spark 的版本為2.3.2,hbase 的版本為1.2.6

因為hbase數據集的數據都是序列化的,所以spark 默認讀取Hbase的數據時會報數據序列化的錯誤,不管哪種方式,在讀取hbase數據之前,為spark配置序列化方式,如圖所示:

主要是利用TableInputFormat,TableOutPutFormat的方式對hbase進行讀寫。
下邊是對hbase進行讀

運行結果如圖:

通過maven 將hbase-spark jar 報 導入

由於hbase-spark 運用的spark 版本為1.6 而實際的spark 版本為2.3.2,所以執行spark 任務會報 沒有 org.apache.spark.logging 類沒有定義,這是因為 spark 2.3.2 這個類名已經改變,因此需要重新構造這個類並打成jar包放入到spark 的jar目錄里即可

以下為讀方式:

sparkOnHbase 對於第一種方式的優勢在於:

1>無縫的使用Hbase connection

2>和Kerberos無縫集成

3>通過get或者scan直接核圓生成rdd

4>利用RDD支持hbase的任何組合操作

5>為通用操作提供簡單的方法,同時通過API允許不受限制的未知高級操作

6>支持java和scala

7>為spark和 spark streaming提供相似的API

⑦ 如何使用scala+spark讀寫hbase

公司有一些實時數據處理的項目,存儲用的是hbase,提供實時的檢索,當然hbase裡面存儲的數據模型都是簡單的,復雜的多維檢索的結果是在es裡面存儲的,公司也正在引入Kylin作為OLAP的數據分析引擎,這塊後續有空在研究下。
接著上面說的,hbase存儲著一些實時的數據,前兩周新需求需要對hbase裡面指定表的數據做一次全量的update以滿足業務的發展,平時操作hbase都是單條的curd,或者插入一個批量的list,用的都是hbase的java api比較簡單,但這次涉及全量update,所以如果再用原來那種單線程的操作api,勢必速度回慢上許多。
關於批量操作Hbase,一般我們都會用MapRece來操作,這樣可以大大加快處理效率,原來也寫過MR操作Hbase,過程比較繁瑣,最近一直在用scala做spark的相關開發,所以就直接使用scala+spark來搞定這件事了,當然底頃鄭層用的還是Hbase的TableOutputFormat和TableOutputFormat這個和MR是一樣的,在spark裡面把從hbase裡面讀取的數據集轉成rdd了,然後做一些簡單的過濾,轉化,最終在把結果寫入到hbase裡面。
整個流程如下:
(1)全量讀取hbase表的數據
(2)做一系列的ETL
(3)把全量數據再寫回hbase
核心代碼如下:
//獲取conf
val conf=HBaseConfiguration.create() //設置讀取的表
conf.set(TableInputFormat.INPUT_TABLE,tableName) //設置寫入的表
conf.set(TableOutputFormat.OUTPUT_TABLE,tableName)//創建sparkConf
val sparkConf=new SparkConf() //設置spark的任務名
sparkConf.setAppName("read and write for hbase ") //創建spark上下文
val sc=new SparkContext(sparkConf)
//為job指定輸出格式和輸出表名

val newAPIJobConfiguration1 = Job.getInstance(conf)
newAPIJobConfiguration1.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, tableName)
newAPIJobConfiguration1.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

//全量讀取hbase表
val rdd=sc.newAPIHadoopRDD(conf,classOf[TableInputFormat]
,classOf[ImmutableBytesWritable]
,classOf[Result]
)
//過濾空數據,然後對每一個雀譽頌記錄做更新,並轉換成寫入的格式
val final_rdd= rdd.filter(checkNotEmptyKs).map(forDatas)
//轉換後的結果,再次做過濾
val save_rdd=final_rdd.filter(checkNull)
//最終在寫回hbase表
save_rdd.saveAsNewAPIHadoopDataset(newAPIJobConfiguration1.getConfiguration)
sc.stop()

從上面的代碼可以看出來,使用spark+scala操作hbase是非常虛知簡單的。下面我們看一下,中間用到的幾個自定義函數:
第一個:checkNotEmptyKs
作用:過濾掉空列簇的數據
def checkNotEmptyKs(f:((ImmutableBytesWritable,Result))):Boolean={ val r=f._2 val rowkey=Bytes.toString(r.getRow) val map:scala.collection.mutable.Map[Array[Byte],Array[Byte]]= r.getFamilyMap(Bytes.toBytes("ks")).asScala if(map.isEmpty) false else true
}

第二個:forDatas
作用:讀取每一條數據,做update後,在轉化成寫入操作
def forDatas(f: (ImmutableBytesWritable,Result)): (ImmutableBytesWritable,Put)={ val r=f._2 //獲取Result
val put:Put=new Put(r.getRow) //聲明put
val ks=Bytes.toBytes("ks") //讀取指定列簇
val map:scala.collection.mutable.Map[Array[Byte],Array[Byte]]= r.getFamilyMap(ks).asScala
map.foreach(kv=>{//遍歷每一個rowkey下面的指定列簇的每一列的數據做轉化
val kid= Bytes.toString(kv._1)//知識點id
var value=Bytes.toString(kv._2)//知識點的value值
value="修改後的value"
put.addColumn(ks,kv._1,Bytes.toBytes(value)) //放入put對象
}
) if(put.isEmpty) null else (new ImmutableBytesWritable(),put)

}

第三個:checkNull 作用:過濾最終結果裡面的null數據
def checkNull(f:((ImmutableBytesWritable,Put))):Boolean={ if(f==null) false else true
}

上面就是整個處理的邏輯了,需要注意的是對hbase裡面的無效數據作過濾,跳過無效數據即可,邏輯是比較簡單的,代碼量也比較少。

⑧ Spark 讀取 Hbase 數據

下面這種方式是全表掃描,Spark如果通配態過RS來訪問Hbase數據進行數據分析,對RS會產生敏模很大的壓力。不太建議使用下面的方式

在本地測試時返現運行的很慢,後來看到以下日誌

由於Hbase表中只有兩個region,所以只啟動兩個Task,此時並行度為二!
那麼也就是說Spark讀取Hbase的並行度取決於這個表有多少個region。然後根據region的startkey和endkey來獲取數據培拿源

⑨ 如何使用Spark/Scala讀取Hbase的數據

必須使用高亮參數啟Spark-shell,否則遍歷RDD現Exception
java.io.NotSerializableException: org.apache.hadoop.hbase.io.ImmutableBytesWritable
spark-shell--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
代碼,經MaprDB實測通
import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.maprece.TableInputFormat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.HTable;
val tableName = "/app/SubscriptionBillingPlatform/TRANSAC_ID"
val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, tableName)
//create rdd
val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
hBaseRDD.take(2).map(row=>row._2.rawCells).
map(_.map( kv => (new String(kv.getQualifier()) -> new
String(kv.getValue()) ) ).toMap ). foreach( map => { map.foreach{
entry => print(entry._1 +":"告洞 + entry._2 + ", ") } ;
print("簡頌\n-----------\n") } )
//get the row count
val count = hBaseRDD.count()
print("攔友鄭HBase RDD count:"+count)

⑩ 如何使用Spark/Scala讀取Hbase的數據

Spark SQL就是shark ,也就是SQL on Spark。如果沒記錯的話,shark的開發利用了hive的API,所以支持禪臘讀取HBase。而且Spark的賀歷滑數據爛弊類型兼容範圍大於Hadoop,並且包含了Hadoop所支持的任何數據類型。