hbase 数据库深入使用与相关数据的加载
  AnyLlCIhvKpr 2023年11月12日 32 0

  • 一:hbase 数据检索流程
  • 二:hbase 数据库java api 调用
  • 三:hbase 各个服务的作用
  • 四:hbase 与mapreduce集成
  • 五:hbase 使用BulkLoad 加载数据

一:hbase 数据检索流程

1.1 hbase 数据检索流程图:

hbase 数据库深入使用与相关数据的加载

1.2 hbase 读的流程:

读流程:
1、client请求zookeeper集群(root/meta)(meta)
		--有多少table,table有哪些region(startrow、stoprow)
2、client找到region对应的region server
3、region server响应客户端请求    

1.3. hhbase 写的流程

1、client请求zookeeper集群,该数据应该写入哪个region
2、向region所在的region server 发起写请求
3、数据先写进HLOG(WAL)
4、然后写入memstore(flush) 
5、当memstore达到阀值,写入storefile(compact)
6、当storefile达到阀值,合并成新的storefile
7、当region达到阀值,当前region会划分为两个新的region(split)

1.4 hbase 读写流程存储核心的三个机制

1. flush机制:当memstore满了以后会flush陈一个storefile
2. compact机制:当storefile达到阀值,合并storefile,合并过程中cell版本合并和数据删除
3. split机制:当region不断增大,达到阀值,region会分成两个新的region

二:hbase 数据库java api 调用

2.1 eclipse 环境配置

 更改maven 的源:
 上传repository.tar.gz 
cd .m2
mv repository repository.bak2016612 
rz repository.tar.gz 
tar -zxvf repository.tar.gz 

cd /home/hadoop/yangyang/hbase
cp -p hbase-site.xml log4j.properties /home/hadoop/workspace/studyhbase/src/main/rescourse

更改eclipse 的pom.xml

增加:

<dependency>
	<groupId>org.apache.hbase</groupId>
	<artifactId>hbase-server</artifactId>
	<version>0.98.6-hadoop2</version>
</dependency>
<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>0.98.6-hadoop2</version>
</dependency>

2.2 hbase java api 掉用:

package org.apache.hadoop.hbase;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;

public class HbaseOperation {

	/**
	 * 
	 * @param args
	 * @throws IOException
	 */
	

	public static HTable getTable(String tableName) throws IOException {
		// Get configuration
		Configuration conf = HBaseConfiguration.create();

		// Get Table
		HTable table = new HTable(conf, tableName);

		return table;
	}

	public static void getData() throws IOException {
		HTable table = HbaseOperation.getTable("user");
		// Get Data
		Get get = new Get(Bytes.toBytes("1001"));
		Result result = table.get(get);
		Cell[] cells = result.rawCells();

		for (Cell cell : cells) {
			System.out.print(Bytes.toString(CellUtil.cloneFamily(cell)) + ":");
			System.out.print(Bytes.toString(CellUtil.cloneQualifier(cell))
					+ "==>");
			System.out.println(Bytes.toString(CellUtil.cloneValue(cell)));
		}
		table.close();
	}

	/**
	 * 
	 * @param args
	 * @throws IOException
	 */
	
	
	public static void putData() throws IOException {
		HTable table = HbaseOperation.getTable("user");

		Put put = new Put(Bytes.toBytes("1004"));

		put.add(Bytes.toBytes("info"), Bytes.toBytes("name"),
				Bytes.toBytes("zhaoliu"));
		put.add(Bytes.toBytes("info"), Bytes.toBytes("age"),
				Bytes.toBytes("50"));
		put.add(Bytes.toBytes("info"), Bytes.toBytes("sex"),
				Bytes.toBytes("male"));

		table.put(put);
		table.close();
	}



	public static void main(String[] args) throws IOException {
		HTable table = HbaseOperation.getTable("user");
		
		

		Scan scan = new Scan();
		scan.setStartRow(Bytes.toBytes("1001")) ;
		scan.setStopRow(Bytes.toBytes("1002")) ;
		ResultScanner resultScanner = table.getScanner(scan);

		for (Result res : resultScanner) {
			Cell[] ress = res.rawCells();
			for (Cell cell : ress) {
				System.out.print(Bytes.toString(CellUtil.cloneRow(cell))
						+ "\t");
				System.out.print(Bytes.toString(CellUtil.cloneFamily(cell))
						+ ":");
				System.out.print(Bytes.toString(CellUtil.cloneQualifier(cell))
						+ "==>");
				System.out.println(Bytes.toString(CellUtil.cloneValue(cell)));
			}
			table.close();

		}

	}
}


三:hbase 各个服务的作用

3.1 Hmaster 作用:

	1、为region server分配region
	2、负责region server的负责均衡
	3、发现失效的region server,需要重新分配其上的region
	4、监听zk,基于zookeeper感应region server的上下线
	5、监听zk,基于zookeeper来保证HA
	6、不参与客户端数据读写访问
	7、负载低(通常情况下可以把它和其他服务器(NN/SNN)整合在一起)
	8、无单点故障(SPOF)

3.2 Hregionserver 作用:

    1、维护master分配给它的region
	2、响应客户端的IO访问请求(读写)
	3、处理region的flush、compact、split
	4、维护region的cache

3.4 zookeeper 作用:

	1、保证集群里面只有一个master(HA)
	2、保存了root region的位置(meta),访问入口地址
	3、实时监控region server的状态,及时通知region server上下线消息给master
	4、存储了hbase的schema,包括哪些table,每个表有哪些列簇

四:hbase 与mapreduce集成

4.1 hbase 获取jar命令

bin/hbase mapredcp 

4.2 配置环境变量

vim .bash_profile
export HADOOP_HOME=/home/hadoop/yangyang/hadoop
export HBASE_HOME=/home/hadoop/yangyang/hbase
export HADOOP_CLASSPATH=`$HBASE_HOME/bin/hbase mapredcp`
PATH=$PATH:$HOME/bin:$JAVA_HOME/bin:/bin:/bin::
soure .bash_profile

hbase 数据库深入使用与相关数据的加载

4.3 统计一个hbase表:

cd /home/hadoop/yangyang/hadoop

bin/yarn jar /home/hadoop/yangyang/hbase/lib/hbase-server-0.98.6-cdh5.3.6.jar rowcounter user

hbase 数据库深入使用与相关数据的加载 hbase 数据库深入使用与相关数据的加载

4.4 导入一个生成的hbase 表的in.tsv

vim in.tsv
---
10010   zhangsan        30      shanghai
10011   lisi    31      beijin
10012   wangwu  32      shanghai
10013   zaoliu  30      beijin
hdfs dfs -put in.tsv /input

yarn jar /home/hadoop/yangyang/hbase/lib/hbase-server-0.98.6-cdh5.3.6.jar importtsv -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:age,info:address user /input/in.tsv

hbase 数据库深入使用与相关数据的加载 hbase 数据库深入使用与相关数据的加载 hbase 数据库深入使用与相关数据的加载

五: 使用BulkLoad加载数据

vim out.tsv 
110     zhangsan        30      shanghai
111     lisi    31      beijin
112     wangwu  32      shanghai
113     zaoliu  30      beijin
hdfs dfs -put out.tsv /input 
<!-- 将tsv 文件转换成hfile 文件(在hdfs 上面)-->
yarn jar /home/hadoop/yangyang/hbase/lib/hbase-server-0.98.6-cdh5.3.6.jar importtsv -Dimporttsv.bulk.output=/hfileoutput/ -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:age,info:tel user /input/out.tsv

<!-- 将hfile 加载到hbase 的表中
yarn jar /home/hadoop/yangyang/hbase/lib/hbase-server-0.98.6-cdh5.3.6.jar completebulkload /hfileoutput user

hbase 数据库深入使用与相关数据的加载 hbase 数据库深入使用与相关数据的加载 hbase 数据库深入使用与相关数据的加载 hbase 数据库深入使用与相关数据的加载 hbase 数据库深入使用与相关数据的加载

5.1 hbase 表中提取相关字段,生成新的表

提取hbase 表中的user 表的name 与age 字段 生成新表student

package org.apache.hadoop.studyhbase;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class User2StudentMapReduce extends Configured implements Tool{

	// step 1: Mapper
	public static class ReadUserMapper //
			extends TableMapper<ImmutableBytesWritable, Put>{

		@Override
		protected void map(ImmutableBytesWritable key, Result value,
				Context context)
						throws IOException, InterruptedException {
			// user: name & age  ->  student: name & age : put
			// create Put
			Put put = new Put(key.get()) ;
			// add column
			for(Cell cell: value.rawCells()){
				// add family: info
				if("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){
					// add column: name
					if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
						put.add(cell) ;
					//	CellUtil.cloneValue(cell)
					//	put.add(family, qualifier, value) ;
					}
					// add column: age
					else if("age".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
						put.add(cell) ;
					}
				}
			}
			// context output
			context.write(key, put);
		}
		
	}
	
	
	// step 2: Reducer
	public static class WriteStudentReducer //
			extends TableReducer<ImmutableBytesWritable, Put, NullWritable>{

		@Override
		protected void reduce(ImmutableBytesWritable key, Iterable<Put> values,
				Context context) throws IOException, InterruptedException {
			for(Put put : values){
				context.write(NullWritable.get(), put);
			}
		}
	}
	
	// step 3: Driver
	public int run(String[] args) throws Exception {
		// 1) Configuration
		Configuration conf = this.getConf();
		
		// 2) create job
		Job job = Job.getInstance(conf, this.getClass().getSimpleName()) ;
		job.setJarByClass(User2StudentMapReduce.class);
		
		// 3) set job
		// input -> mapper -> shuffle -> reducer -> output
		Scan scan = new Scan() ;
		scan.setCacheBlocks(false);
		scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
		
		TableMapReduceUtil.initTableMapperJob(
				  "user",        // input table
				  scan,               // Scan instance to control CF and attribute selection
				  ReadUserMapper.class,     // mapper class
				  ImmutableBytesWritable.class,         // mapper output key
				  Put.class,  // mapper output value
				  job //
		  );
		
		TableMapReduceUtil.initTableReducerJob(
				  "student",        // output table
				  WriteStudentReducer.class,    // reducer class
				  job //
			);
				
		job.setNumReduceTasks(1);   // at least one, adjust as required

		boolean isSuccess = job.waitForCompletion(true);
		if (!isSuccess) {
		  throw new IOException("error with job!");
		}
		return isSuccess ? 0 : 1;
	}
	
	public static void main(String[] args) throws Exception {
	    Configuration conf = HBaseConfiguration.create();
	    int status = ToolRunner.run(//
	    		conf, //
	    		new User2StudentMapReduce(), //
	    		args //
	    	);
	    System.exit(status);
	}

}


去hbase 上面新建空表student
create 'student','info'

hbase 数据库深入使用与相关数据的加载

导出新生成jar包User2Student.jar运行:

cd /home/hadoop/jars
yarn jar User2Student.jar

hbase 数据库深入使用与相关数据的加载 hbase 数据库深入使用与相关数据的加载

查询hbase进行验证:

hbase 数据库深入使用与相关数据的加载

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月12日 0

暂无评论

推荐阅读
  TEZNKK3IfmPf   2023年11月14日   18   0   0 大数据
  TEZNKK3IfmPf   2023年11月15日   54   0   0 apachehadoopjava
  TEZNKK3IfmPf   2023年11月15日   27   0   0 apachehadoop
  TEZNKK3IfmPf   2024年04月26日   66   0   0 hadoopHive
  TEZNKK3IfmPf   2023年11月15日   28   0   0 System大数据
AnyLlCIhvKpr