本实例采用Scala开发,实现了RDD数据两种方式入库到HBase,从HBase中读取数据并print输出。
build.sbt
name := "SparkSbt"
version := "0.1"
scalaVersion := "2.10.5"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.6.0"
libraryDependencies+="org.apache.hbase"%"hbase-common"%"1.2.0"
libraryDependencies+="org.apache.hbase"%"hbase-client"%"1.2.0"
libraryDependencies+="org.apache.hbase"%"hbase-server"%"1.2.0"
先hbase shell执行命令创建表:
create 'account' , 'cf'
create 'account2' , 'cf'
源码
package com.whq.test
import org.apache.hadoop.hbase.client.{HBaseAdmin, Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.Job
import org.apache.spark._
object HBaseTest {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("HBaseTest")
val sc = new SparkContext(sparkConf)
// please ensure HBASE_CONF_DIR is on classpath of spark driver
val conf = HBaseConfiguration.create()
//设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置
conf.set("hbase.zookeeper.quorum","192.168.91.144")
conf.set("hbase.zookeeper.property.clientPort", "2181")
入库方式一saveAsHadoopDataset
println("————————————入库方式一")
var tablename = "account"
conf.set(TableInputFormat.INPUT_TABLE, tablename)
//初始化jobconf,TableOutputFormat必须是org.apache.hadoop.hbase.mapred包下的!
val jobConf = new JobConf(conf)
jobConf.setOutputFormat(classOf[TableOutputFormat])
jobConf.set(TableOutputFormat.OUTPUT_TABLE, tablename)
//待入库数据
val indataRDD = sc.makeRDD(Array("11,whq,30","12,wanghongqi,29","13,xiaoming,15"))
//数据转换为可入库的RDD[(ImmutableBytesWritable,Put)]
val rdd = indataRDD.map(_.split(',')).map{arr=>{
/*一个Put对象就是一行记录,在构造方法中指定主键
* 所有插入的数据必须用org.apache.hadoop.hbase.util.Bytes.toBytes方法转换
* Put.add方法接收三个参数:列族,列名,数据
*/
val put = new Put(Bytes.toBytes(arr(0).toInt))
put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))
put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))
//转化成RDD[(ImmutableBytesWritable,Put)]类型才能调用saveAsHadoopDataset
(new ImmutableBytesWritable, put)
}}
//入库写入
rdd.saveAsHadoopDataset(jobConf)
入库方式二saveAsNewAPIHadoopDataset
println("————————————入库方式二")
tablename = "account2"
conf.set(TableOutputFormat.OUTPUT_TABLE, tablename)
val job2 = Job.getInstance(conf)
job2.setOutputKeyClass(classOf[ImmutableBytesWritable])
job2.setOutputValueClass(classOf[Result])
job2.setOutputFormatClass(classOf[org.apache.hadoop.hbase.mapreduce.TableOutputFormat[ImmutableBytesWritable]])
val rdd2 = indataRDD.map(_.split(',')).map{arr=>{
val put = new Put(Bytes.toBytes(arr(0)))
put.add(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))
put.add(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))
(new ImmutableBytesWritable, put)
}}
rdd2.saveAsNewAPIHadoopDataset(job2.getConfiguration())
读取数据
println("————————————读取数据")
conf.set(TableInputFormat.INPUT_TABLE, tablename)
//读取数据并转化成rdd
val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
val count = hBaseRDD.count()
println(count)
hBaseRDD.collect().foreach{case (_,result) =>{
//获取行键
val key = Bytes.toString(result.getRow)
//通过列族和列名获取列
val name = Bytes.toString(result.getValue("cf".getBytes,"name".getBytes))
val age = Bytes.toInt(result.getValue("cf".getBytes,"age".getBytes))
println("Row key:"+key+" Name:"+name+" Age:"+age)
}}
sc.stop()
}
}
执行命令
spark-submit --master yarn --deploy-mode client --class com.whq.test.HBaseTest sparksbt_2.10-0.1.jar
查看数据情况
scan 'account'
scan 'account2'