Spark 连接 HBase 入库及查询操作
  qhbhn6yHGMYP 2023年11月13日 24 0


本实例采用Scala开发,实现了RDD数据两种方式入库到HBase,从HBase中读取数据并print输出。

build.sbt

 

name := "SparkSbt"

version := "0.1"

scalaVersion := "2.10.5"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.6.0"

libraryDependencies+="org.apache.hbase"%"hbase-common"%"1.2.0"
libraryDependencies+="org.apache.hbase"%"hbase-client"%"1.2.0"
libraryDependencies+="org.apache.hbase"%"hbase-server"%"1.2.0"

 

先hbase shell执行命令创建表:

create 'account' , 'cf'
create 'account2' , 'cf'

 

源码

package com.whq.test

import org.apache.hadoop.hbase.client.{HBaseAdmin, Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.Job
import org.apache.spark._


object HBaseTest {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("HBaseTest")
    val sc = new SparkContext(sparkConf)

    // please ensure HBASE_CONF_DIR is on classpath of spark driver
    val conf = HBaseConfiguration.create()
    //设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置
    conf.set("hbase.zookeeper.quorum","192.168.91.144")
    conf.set("hbase.zookeeper.property.clientPort", "2181")
    入库方式一saveAsHadoopDataset
    println("————————————入库方式一")
    var tablename = "account"
    conf.set(TableInputFormat.INPUT_TABLE, tablename)

    //初始化jobconf,TableOutputFormat必须是org.apache.hadoop.hbase.mapred包下的!
    val jobConf = new JobConf(conf)
    jobConf.setOutputFormat(classOf[TableOutputFormat])
    jobConf.set(TableOutputFormat.OUTPUT_TABLE, tablename)

    //待入库数据
    val indataRDD = sc.makeRDD(Array("11,whq,30","12,wanghongqi,29","13,xiaoming,15"))
    //数据转换为可入库的RDD[(ImmutableBytesWritable,Put)]
    val rdd = indataRDD.map(_.split(',')).map{arr=>{
      /*一个Put对象就是一行记录,在构造方法中指定主键
       * 所有插入的数据必须用org.apache.hadoop.hbase.util.Bytes.toBytes方法转换
       * Put.add方法接收三个参数:列族,列名,数据
       */
      val put = new Put(Bytes.toBytes(arr(0).toInt))
      put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))
      put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))
      //转化成RDD[(ImmutableBytesWritable,Put)]类型才能调用saveAsHadoopDataset
      (new ImmutableBytesWritable, put)
    }}
    //入库写入
    rdd.saveAsHadoopDataset(jobConf)

    入库方式二saveAsNewAPIHadoopDataset
    println("————————————入库方式二")
    tablename = "account2"
    conf.set(TableOutputFormat.OUTPUT_TABLE, tablename)
    val job2 = Job.getInstance(conf)
    job2.setOutputKeyClass(classOf[ImmutableBytesWritable])
    job2.setOutputValueClass(classOf[Result])
    job2.setOutputFormatClass(classOf[org.apache.hadoop.hbase.mapreduce.TableOutputFormat[ImmutableBytesWritable]])
    val rdd2 = indataRDD.map(_.split(',')).map{arr=>{
      val put = new Put(Bytes.toBytes(arr(0)))
      put.add(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))
      put.add(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))
      (new ImmutableBytesWritable, put)
    }}

    rdd2.saveAsNewAPIHadoopDataset(job2.getConfiguration())

    读取数据
    println("————————————读取数据")
    conf.set(TableInputFormat.INPUT_TABLE, tablename)
    //读取数据并转化成rdd
    val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])

    val count = hBaseRDD.count()
    println(count)
    hBaseRDD.collect().foreach{case (_,result) =>{
      //获取行键
      val key = Bytes.toString(result.getRow)
      //通过列族和列名获取列
      val name = Bytes.toString(result.getValue("cf".getBytes,"name".getBytes))
      val age = Bytes.toInt(result.getValue("cf".getBytes,"age".getBytes))
      println("Row key:"+key+" Name:"+name+" Age:"+age)
    }}

    sc.stop()
  }
}

执行命令

spark-submit --master yarn --deploy-mode client --class com.whq.test.HBaseTest sparksbt_2.10-0.1.jar

查看数据情况

scan 'account'
scan 'account2'


【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月13日 0

暂无评论

推荐阅读
  wpWn7yzs0oKF   2023年11月13日   29   0   0 javaapacheHDFS
qhbhn6yHGMYP