当前位置:首页 » 数据仓库 » spark如何获取hbase配置信息
扩展阅读
webinf下怎么引入js 2023-08-31 21:54:13
堡垒机怎么打开web 2023-08-31 21:54:11

spark如何获取hbase配置信息

发布时间: 2023-03-30 16:10:46

① Spark实例-spark读取外部配置文件之--files

在运行spark程序的时,有时需要读取外部配置参数,比如mysql的host参数、端口号、es主机ip、es端口号等。通过外部文件配置参数也方便程序迁移。下面就来看看如何来实现。

1、首先我们需要一腊姿野个配置文件:
config.properties

2、上传配置文件到某个节点:

4、通过 Maven打包程序:test_CDH.jar

5、上传 test_CDH.jar 到集群

6、执行 submit 命令

spark2-submit --master yarn --deploy-mode cluster --files config.properties --jars libs/hbase-client-1.2.0.jar,libs/hbase-common-1.2.0.jar,libs/hbase-server-1.2.0.jar,libs/mysql-connector-java-8.0.15.jar --driver-class-path libs/mysql-connector-java-8.0.15.jar --conf spark.executor.userClassPathFirst=true --conf spark.driver.userClassPathFirst=true --class test_CDH.Main libs/test_CDH.jar $1

--files 参数指定轮喊我们需要加载册和的外部配置文件

② idea中配置环境Spark3.0操作Hbase1.3.6

Date:2020/12/22
Version:Spark 3.0; java 1.8.0_221; Hbase 1.3.6; Scala 2.12.11;

1、首先是pom.xml,注释了一些东西,比如 不用 添加hbase-client和hbase-server,java中写MapRece操作hbase需要这两个,scala写spark操作hbase不需要这两个,程序跑不起来,sc无法创建历悔拿。

2、将hbase的lib中的以下jar文件添前蔽加进来。(to IDEA小白 :可以新建一个文件夹保存这些jar文件,在IDEA中添加一个java的library指向这个文件夹)

3、将hbase中的配置文件hbase-site.xml添加到项目中的resources文件夹中

4、测试肢搭spark连接hbase

运行后的结果:

③ 如何使用Spark/Scala读取Hbase的数据

使用Spark/Scala读取Hbase的数据 必须使用高亮参数启动Spark-shell,否则当你遍历RDD时会出现如下的Exception java.io.NotSerializableException: org.apache.hadoop.hbase.io.ImmutableBytesWritable spark-shell--conf spark.serializer=org.apache.spark.serializer.KryoSerializer 以下代码,经过粗改MaprDB实测通过 import org.apache.spark._ import org.apache.spark.rdd.NewHadoopRDD import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} import org.apache.hadoop.hbase.client.HBaseAdmin import org.apache.hadoop.hbase.maprece.TableInputFormat import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HColumnDescriptor import org.apache.hadoop.hbase.util.Bytes import org如何使用Spark/Scala读取游凳稿Hbase的神孝数据

④ hbase怎么查看配置文件

hbase有本地模式和分布式模式
hbase-site.xml配置
hbase.tmp.dir

本地文件系统tmp目录,一般配置成local模式的设置一下,但是最好还是需要设置一下,因为很多文件都会默认设置成它下面的
线上配置
<property>
<name>hbase.tmp.dir</name>
<value>/mnt/路径</value>
</property>
默认值:
${java.io.tmpdir}/山贺hbase-${user.name}
写到系统的/tmp目录
hbase.rootdir

HBase集群中所有RegionServer共享目录,用来持久化HBase的数据,一般设置的是型如hdfs的文件目录,如hdfs://master:9000/hbasedata
线上配置
<property>
<name>hbase.rootdir</name>
<value>hdfs://master:9000/hbasedata</value>
</property>
默认值:
${hbase.tmp.dir}/hbase
hbase.cluster.distributed

集群的模式,分布式还是单机模式,如果设置成false的话,HBase进程和Zookeeper进程在同一个JVM进程。
线上配置为true
默认值:false
hbase.zookeeper.quorum

zookeeper集群的URL配置,多个host中间用逗号分割
线上配置
<property>
<name>hbase.zookeeper.quorum</name>
<value>master,slave,slave1</value>
</property>
默认值:localhost
hbase.zookeeper.property.dataDir

ZooKeeper的zoo.conf中的配置。 快照的存储位置
线上配置:/home/hadoop/zookeeperData
默认值:${hbase.tmp.dir}/zookeeper
zookeeper.session.timeout

客户端与zk连接超时时间
线上配置:1200000(20min)
默认值:180000(3min)
hbase.zookeeper.property.tickTime

Client端卜唯启与zk发送心跳的时间间隔
线上配置:6000(6s)
默认值:6000
hbase.security.authentication

HBase集群安全认证机制,目前的版本只支持kerberos安全认证。
线上配置:kerberos
默认值:空
hbase.security.authorization

HBase是否开启安全授权机制
线上配置: true
默认值: false
hbase.regionserver.kerberos.principal

regionserver的kerberos认证的主体名称(由三部分组成:服务或用户名称、实例名称以及域名)
线上配置:hbase/[email protected]
默认:无
hbase.regionserver.keytab.file

regionserver keytab文件路径
线上配置:/home/hadoop/etc/conf/hbase.keytab
默认值:无
hbase.master.kerberos.principal

master的kerberos认证的主体名称(由三部分组成:服务或用户名称、实例名称以及域名)
线上配置:hbase/[email protected]
默认:无
hbase.master.keytab.file

master keytab文件路径
线上配置:/home/hadoop/etc/conf/hbase.keytab
默认值:无
hbase.regionserver.handler.count

regionserver处理IO请求的线程数
线上配置:50
默认配置:10
hbase.regionserver.global.memstore.upperLimit

RegionServer进程block进行flush触发条件:该节点上所有region的memstore之和达到upperLimit*heapsize
线上配置:0.45
默认配置:0.4
hbase.regionserver.global.memstore.lowerLimit

RegionServer进程触发flush的一个条件:该节点上所有region的memstore之和达到lowerLimit*heapsize
线上配置:0.4
默认配置:0.35
hbase.client.write.buffer

客户端写buffer,设置autoFlush为false时,当客户端写满buffer才flush
线上配置:8388608(8M)
默认配置:2097152(2M)
hbase.hregion.max.filesize

单个ColumnFamily的region大小,若按照ConstantSizeRegionSplitPolicy策略,超过设置的该值则自动split
线上配置:107374182400(100G)
默认配置:21474836480(20G)
hbase.hregion.memstore.block.multiplier

超过memstore大小的倍数达到该值则block所有写入请求,自我保护
线上配置:8(内存够大可以适当调大一些,出现这种情况需要客户端做调整)
默认配置:2
hbase.hregion.memstore.flush.size

memstore大小,当达到该值则会flush到外存设备
线上配置:104857600(100M)
默认值: 134217728(128M)
hbase.hregion.memstore.mslab.enabled

是否开启mslab方案,减少因内存碎片导致的Full GC,提高整体性能
线上配置:true
默认配置: true
hbase.regionserver.maxlogs

regionserver的hlog数量
线上配置:128
默认配置:32
hbase.regionserver.hlog.blocksize

hlog大小上限,达到该值则block,进行roll掉
线上配置:536870912(512M)
默认配置:hdfs配置的block大小
hbase.hstore.compaction.min

进入minor compact队列的storefiles最小个数
线上配置:10
默认配置:3
hbase.hstore.compaction.max

单次minor compact最多的文件个数
线上配置:30
默认配置:10
hbase.hstore.blockingStoreFiles

当某一个region的storefile个数达到该值则block写入,等待compact
线上配置:100(生产环境可以设置得很大)
默认配置: 7
hbase.hstore.blockingWaitTime

block的等待时间
线上配置:90000(90s)
默认配置:90000(90s)
hbase.hregion.majorcompaction

触发major compact的周期
线上配置:0(关掉major compact)
默认配置:86400000(1d)
hbase.regionserver.thread.compaction.large

large compact线程池的线程个数
线上配置:5
默认配置:1
hbase.regionserver.thread.compaction.small

small compact线程池的线程个数
线上配置:5
默认配置:1
hbase.regionserver.thread.compaction.throttle

compact(major和minor)请求进入large和small compact线程池的临界点
线上配置:10737418240(10G)
默认配置:2 * this.minFilesToCompact * this.region.memstoreFlushSize
hbase.hstore.compaction.max.size

minor compact队列中storefile文件最大size
线上配置:21474836480(20G)
默认配置:Long.MAX_VALUE
hbase.rpc.timeout

RPC请求timeout时间
线上配置:300000(5min)
默认配置:60000(10s)
hbase.regionserver.region.split.policy

split操作默认的策略
线上配置: org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy(采取老的策略,自己控制split)
默认配置: org.apache.hadoop.hbase.regionserver.(在region没有达到maxFileSize的前提下,如果fileSize达到regionCount * regionCount * flushSize则进行split操作)
hbase.regionserver.regionSplitLimit

单台RegionServer上region数上限
线上配置:150
默认配置:2147483647
hbase-env.sh配置
指定系统运行环境

export JAVA_HOME=/usr/lib/jvm/java-6-sun/ #JDK HOME
export HBASE_HOME=/home/hadoop/cdh4/hbase-0.94.2-cdh4.2.1 # HBase 安装目录
export HBASE_LOG_DIR=/mnt/dfs/11/hbase/hbase-logs #日志输出路径
JVM参数调优

export HBASE_OPTS="-verbose:gc -XX:+PrintGCDetails -Xloggc:${HBASE_LOG_DIR}/hbase-gc.log -XX:+PrintGCTimeStamps -XX:+ -XX:+PrintGCApplicationStoppedTime \
-server -Xmx20480m -Xms20480m -Xmn10240m -Xss256k -XX:SurvivorRatio=4 -XX:MaxPermSize=256m -XX:MaxTenuringThreshold=15 \
-XX:ParallelGCThreads=16 -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:CMSFullGCsBeforeCompaction=5 -XX:+UseCMSCompactAtFullCollection \
-XX:+CMSClassUnloadingEnabled -XX:=70 -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSMaxAbortablePrecleanTime=5000 \
"

⑤ spark1.2.1实现读取hbase的数据后怎么实现实时查询

调用parallelize函数直接从集合中获取数据,并存入RDD中;Java版本如下:
1 JavaRDD<Integer> myRDD = sc.parallelize(Arrays.asList(1,2,3));
Scala版本如下:
1 val myRDD= sc.parallelize(List(1,2,3))
这种方式很简单,很容易就可以将一个集合中的数据变成RDD的初始化值;更常见的是(2)、从文本中读取数据到RDD中,这个文本蔽如可以是纯文本文件、知并厅可以是sequence文件;可以存放在本地(file://)、可以存放在HDFS(hdfs://)上,还可以存放在S3上。其实对文件来说,Spark支搭隐持Hadoop所支持的所有文件类型和文件存放位置。

⑥ spark on hbase 读写

本文主要讲述了spark对hbase进行独写的两种方式,这两种方式分别为:
1.利用spark提供的 newAPIHadoopRDD api 对hbase进行读写
2.SparkOnHbase,这种方式其实是利用Cloudera-labs开旦山源的一个HbaseContext的工具类来支持spark用RDD的方式批量读写hbase

hbase 表格式如下:

部分数据集如下:

文改迟塌中的spark 的版本为2.3.2,hbase 的版本为1.2.6

因为hbase数据集的数据都是序列化的,所以spark 默认读取Hbase的数据时会报数据序列化的错误,不管哪种方式,在读取hbase数据之前,为spark配置序列化方式,如图所示:

主要是利用TableInputFormat,TableOutPutFormat的方式对hbase进行读写。
下边是对hbase进行读

运行结果如图:

通过maven 将hbase-spark jar 报 导入

由于hbase-spark 运用的spark 版本为1.6 而实际的spark 版本为2.3.2,所以执行spark 任务会报 没有 org.apache.spark.logging 类没有定义,这是因为 spark 2.3.2 这个类名已经改变,因此需要重新构造这个类并打成jar包放入到spark 的jar目录里即可

以下为读方式:

sparkOnHbase 对于第一种方式的优势在于:

1>无缝的使用Hbase connection

2>和Kerberos无缝集成

3>通过get或者scan直接核圆生成rdd

4>利用RDD支持hbase的任何组合操作

5>为通用操作提供简单的方法,同时通过API允许不受限制的未知高级操作

6>支持java和scala

7>为spark和 spark streaming提供相似的API

⑦ 如何使用scala+spark读写hbase

公司有一些实时数据处理的项目,存储用的是hbase,提供实时的检索,当然hbase里面存储的数据模型都是简单的,复杂的多维检索的结果是在es里面存储的,公司也正在引入Kylin作为OLAP的数据分析引擎,这块后续有空在研究下。
接着上面说的,hbase存储着一些实时的数据,前两周新需求需要对hbase里面指定表的数据做一次全量的update以满足业务的发展,平时操作hbase都是单条的curd,或者插入一个批量的list,用的都是hbase的java api比较简单,但这次涉及全量update,所以如果再用原来那种单线程的操作api,势必速度回慢上许多。
关于批量操作Hbase,一般我们都会用MapRece来操作,这样可以大大加快处理效率,原来也写过MR操作Hbase,过程比较繁琐,最近一直在用scala做spark的相关开发,所以就直接使用scala+spark来搞定这件事了,当然底顷郑层用的还是Hbase的TableOutputFormat和TableOutputFormat这个和MR是一样的,在spark里面把从hbase里面读取的数据集转成rdd了,然后做一些简单的过滤,转化,最终在把结果写入到hbase里面。
整个流程如下:
(1)全量读取hbase表的数据
(2)做一系列的ETL
(3)把全量数据再写回hbase
核心代码如下:
//获取conf
val conf=HBaseConfiguration.create() //设置读取的表
conf.set(TableInputFormat.INPUT_TABLE,tableName) //设置写入的表
conf.set(TableOutputFormat.OUTPUT_TABLE,tableName)//创建sparkConf
val sparkConf=new SparkConf() //设置spark的任务名
sparkConf.setAppName("read and write for hbase ") //创建spark上下文
val sc=new SparkContext(sparkConf)
//为job指定输出格式和输出表名

val newAPIJobConfiguration1 = Job.getInstance(conf)
newAPIJobConfiguration1.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, tableName)
newAPIJobConfiguration1.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

//全量读取hbase表
val rdd=sc.newAPIHadoopRDD(conf,classOf[TableInputFormat]
,classOf[ImmutableBytesWritable]
,classOf[Result]
)
//过滤空数据,然后对每一个雀誉颂记录做更新,并转换成写入的格式
val final_rdd= rdd.filter(checkNotEmptyKs).map(forDatas)
//转换后的结果,再次做过滤
val save_rdd=final_rdd.filter(checkNull)
//最终在写回hbase表
save_rdd.saveAsNewAPIHadoopDataset(newAPIJobConfiguration1.getConfiguration)
sc.stop()

从上面的代码可以看出来,使用spark+scala操作hbase是非常虚知简单的。下面我们看一下,中间用到的几个自定义函数:
第一个:checkNotEmptyKs
作用:过滤掉空列簇的数据
def checkNotEmptyKs(f:((ImmutableBytesWritable,Result))):Boolean={ val r=f._2 val rowkey=Bytes.toString(r.getRow) val map:scala.collection.mutable.Map[Array[Byte],Array[Byte]]= r.getFamilyMap(Bytes.toBytes("ks")).asScala if(map.isEmpty) false else true
}

第二个:forDatas
作用:读取每一条数据,做update后,在转化成写入操作
def forDatas(f: (ImmutableBytesWritable,Result)): (ImmutableBytesWritable,Put)={ val r=f._2 //获取Result
val put:Put=new Put(r.getRow) //声明put
val ks=Bytes.toBytes("ks") //读取指定列簇
val map:scala.collection.mutable.Map[Array[Byte],Array[Byte]]= r.getFamilyMap(ks).asScala
map.foreach(kv=>{//遍历每一个rowkey下面的指定列簇的每一列的数据做转化
val kid= Bytes.toString(kv._1)//知识点id
var value=Bytes.toString(kv._2)//知识点的value值
value="修改后的value"
put.addColumn(ks,kv._1,Bytes.toBytes(value)) //放入put对象
}
) if(put.isEmpty) null else (new ImmutableBytesWritable(),put)

}

第三个:checkNull 作用:过滤最终结果里面的null数据
def checkNull(f:((ImmutableBytesWritable,Put))):Boolean={ if(f==null) false else true
}

上面就是整个处理的逻辑了,需要注意的是对hbase里面的无效数据作过滤,跳过无效数据即可,逻辑是比较简单的,代码量也比较少。

⑧ Spark 读取 Hbase 数据

下面这种方式是全表扫描,Spark如果通配态过RS来访问Hbase数据进行数据分析,对RS会产生敏模很大的压力。不太建议使用下面的方式

在本地测试时返现运行的很慢,后来看到以下日志

由于Hbase表中只有两个region,所以只启动两个Task,此时并行度为二!
那么也就是说Spark读取Hbase的并行度取决于这个表有多少个region。然后根据region的startkey和endkey来获取数据培拿源

⑨ 如何使用Spark/Scala读取Hbase的数据

必须使用高亮参数启Spark-shell,否则遍历RDD现Exception
java.io.NotSerializableException: org.apache.hadoop.hbase.io.ImmutableBytesWritable
spark-shell--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
代码,经MaprDB实测通
import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.maprece.TableInputFormat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.HTable;
val tableName = "/app/SubscriptionBillingPlatform/TRANSAC_ID"
val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, tableName)
//create rdd
val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
hBaseRDD.take(2).map(row=>row._2.rawCells).
map(_.map( kv => (new String(kv.getQualifier()) -> new
String(kv.getValue()) ) ).toMap ). foreach( map => { map.foreach{
entry => print(entry._1 +":"告洞 + entry._2 + ", ") } ;
print("简颂\n-----------\n") } )
//get the row count
val count = hBaseRDD.count()
print("拦友郑HBase RDD count:"+count)

⑩ 如何使用Spark/Scala读取Hbase的数据

Spark SQL就是shark ,也就是SQL on Spark。如果没记错的话,shark的开发利用了hive的API,所以支持禅腊读取HBase。而且Spark的贺历滑数据烂弊类型兼容范围大于Hadoop,并且包含了Hadoop所支持的任何数据类型。