第10章 Hive实战之YouTube影音(简化版)
  TEZNKK3IfmPf 2023年11月13日 35 0

10.1 需求描述

统计硅谷影音视频网站的常规指标,各种TopN指标:
–统计视频观看数Top10
–统计视频类别热度Top10
–统计视频观看数Top20所属类别
–统计视频观看数Top50所关联视频的所属类别Rank
–统计每个类别中的视频热度Top10
–统计每个类别中视频流量Top10
–统计上传视频最多的用户Top10以及他们上传的视频
–统计每个类别视频观看数Top10

10.2 项目

10.2.1 数据结构

1.视频表
第10章 Hive实战之YouTube影音(简化版)

2.用户表
第10章 Hive实战之YouTube影音(简化版)

10.2.2 ETL原始数据

通过观察原始数据形式,可以发现,视频可以有多个所属分类,每个所属分类用&符号分割,且分割的两边有空格字符,同时相关视频也是可以有多个元素,多个相关视频又用“\t”进行分割。为了分析数据时方便对存在多个子元素的数据进行操作,我们首先进行数据重组清洗操作。即:将所有的类别用“&”分割,同时去掉两边空格,多个相关视频id也使用“&”进行分割。
1.ETL之ETLUtil

public class ETLUtil {
     
       
	public static String oriString2ETLString(String ori){
     
       
		StringBuilder etlString = new StringBuilder();
		String[] splits = ori.split("\t");
		if(splits.length < 9) return null;
		splits[3] = splits[3].replace(" ", "");
		for(int i = 0; i < splits.length; i++){
     
       
			if(i < 9){
     
       
				if(i == splits.length - 1){
     
       
					etlString.append(splits[i]);					
				}else{
     
       
					etlString.append(splits[i] + "\t");	
				}
			}else{
     
       
				if(i == splits.length - 1){
     
       
					etlString.append(splits[i]);
				}else{
     
       
					etlString.append(splits[i] + "&");
				}
			}
		}
		
		return etlString.toString();
	}
}

2.ETL之Mapper

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import com.cris.util.ETLUtil;

public class VideoETLMapper extends Mapper<Object, Text, NullWritable, Text>{
     
       
	Text text = new Text();
	
	@Override
	protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
     
       
		String etlString = ETLUtil.oriString2ETLString(value.toString());
		
		if(StringUtils.isBlank(etlString)) return;
		
		text.set(etlString);
		context.write(NullWritable.get(), text);
	}
}

3.ETL之Runner

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class VideoETLRunner implements Tool {
     
       
	private Configuration conf = null;

	@Override
	public void setConf(Configuration conf) {
     
       
		this.conf = conf;
	}

	@Override
	public Configuration getConf() {
     
       
		return this.conf;
	}

	@Override
	public int run(String[] args) throws Exception {
     
       
		conf = this.getConf();
		conf.set("inpath", args[0]);
		conf.set("outpath", args[1]);

		Job job = Job.getInstance(conf);
		
		job.setJarByClass(VideoETLRunner.class);
		
		job.setMapperClass(VideoETLMapper.class);
		job.setMapOutputKeyClass(NullWritable.class);
		job.setMapOutputValueClass(Text.class);
		job.setNumReduceTasks(0);
		
		this.initJobInputPath(job);
		this.initJobOutputPath(job);
		
		return job.waitForCompletion(true) ? 0 : 1;
	}

	private void initJobOutputPath(Job job) throws IOException {
     
       
		Configuration conf = job.getConfiguration();
		String outPathString = conf.get("outpath");
		
		FileSystem fs = FileSystem.get(conf);
		
		Path outPath = new Path(outPathString);
		if(fs.exists(outPath)){
     
       
			fs.delete(outPath, true);
		}
		
		FileOutputFormat.setOutputPath(job, outPath);
		
	}

	private void initJobInputPath(Job job) throws IOException {
     
       
		Configuration conf = job.getConfiguration();
		String inPathString = conf.get("inpath");
		
		FileSystem fs = FileSystem.get(conf);
		
		Path inPath = new Path(inPathString);
		if(fs.exists(inPath)){
     
       
			FileInputFormat.addInputPath(job, inPath);
		}else{
     
       
			throw new RuntimeException("HDFS中该文件目录不存在:" + inPathString);
		}
	}

	public static void main(String[] args) {
     
       
		try {
     
       
			int resultCode = ToolRunner.run(new VideoETLRunner(), args);
			if(resultCode == 0){
     
       
				System.out.println("Success!");
			}else{
     
       
				System.out.println("Fail!");
			}
			System.exit(resultCode);
		} catch (Exception e) {
     
       
			e.printStackTrace();
			System.exit(1);
		}
	}
}

3. 执行ETL
hadoop命令也可,替代yarn
$

bin/yarn jar ~/softwares/jars/gulivideo-0.0.1-SNAPSHOT.jar \
com.cris.etl.ETLVideosRunner \
/gulivideo/video/2008/0222 \
/gulivideo/output/video/2008/0222

10.3 准备工作

10.3.1 创建表

创建表:gulivideo_ori,gulivideo_user_ori,
创建表:gulivideo_orc,gulivideo_user_orc
gulivideo_ori:

create table gulivideo_ori(
    videoId string, 
    uploader string, 
    age int, 
    category array<string>, 
    length int, 
    views int, 
    rate float, 
    ratings int, 
    comments int,
    relatedId array<string>)
row format delimited 
fields terminated by "\t"
collection items terminated by "&"
stored as textfile;

gulivideo_user_ori:

create table gulivideo_user_ori(
    uploader string,
    videos int,
    friends int)
row format delimited 
fields terminated by "\t" 
stored as textfile;

然后把原始数据插入到orc表中
gulivideo_orc:

create table gulivideo_orc(
    videoId string, 
    uploader string, 
    age int, 
    category array<string>, 
    length int, 
    views int, 
    rate float, 
    ratings int, 
    comments int,
    relatedId array<string>)
row format delimited fields terminated by "\t" 
collection items terminated by "&" 
stored as orc;

gulivideo_user_orc:

create table gulivideo_user_orc(
    uploader string,
    videos int,
    friends int)
row format delimited 
fields terminated by "\t" 
stored as orc;

10.3.2 导入ETL后的数据

gulivideo_ori:

load data inpath "/gulivideo/output/video/2008/0222" into table gulivideo_ori;
gulivideo_user_ori:
load data inpath "/gulivideo/user/2008/0903" into table gulivideo_user_ori;

10.3.3 向ORC表插入数据

gulivideo_orc:

insert into table gulivideo_orc select * from gulivideo_ori;

gulivideo_user_orc:

insert into table gulivideo_user_orc select * from gulivideo_user_ori;

10.4 业务分析

10.4.1 统计视频观看数Top10

思路:使用order by按照views字段做一个全局排序即可,同时我们设置只显示前10条。
最终代码:

select 
    videoId, 
    uploader, 
    age, 
    category, 
    length, 
    views, 
    rate, 
    ratings, 
    comments 
from 
    gulivideo_orc 
order by 
    views 
desc limit 
    10;

10.4.2 统计视频类别热度Top10

思路:

  1. 即统计每个类别有多少个视频,显示出包含视频最多的前10个类别。
  2. 我们需要按照类别group by聚合,然后count组内的videoId个数即可。
  3. 因为当前表结构为:一个视频对应一个或多个类别。所以如果要group by类别,需要先将类别进行列转行(展开),然后再进行count即可。
  4. 最后按照热度排序,显示前10条。
    最终代码:
select 
    category_name as category, 
    count(t1.videoId) as hot 
from (
    select 
        videoId,
        category_name 
    from 
        gulivideo_orc lateral view explode(category) t_catetory as category_name) t1 
group by 
    t1.category_name 
order by 
    hot 
desc limit 
    10;

10.4.3 统计出视频观看数最高的20个视频的所属类别以及类别包含Top20视频的个数

思路:

  1. 先找到观看数最高的20个视频所属条目的所有信息,降序排列
  2. 把这20条信息中的category分裂出来(列转行)
  3. 最后查询视频分类名称和该分类下有多少个Top20的视频
    最终代码:
select 
    category_name as category, 
    count(t2.videoId) as hot_with_views 
from (
    select 
        videoId, 
        category_name 
    from (
        select 
            * 
        from 
            gulivideo_orc 
        order by 
            views 
        desc limit 
            20) t1 lateral view explode(category) t_catetory as category_name) t2 
group by 
    category_name 
order by 
    hot_with_views 
desc;

10.4.4 统计视频观看数Top50所关联视频的所属类别排序

思路:

  1. 查询出观看数最多的前50个视频的所有信息(当然包含了每个视频对应的关联视频),记为临时表t1
    t1:观看数前50的视频
select 
    * 
from 
    gulivideo_orc 
order by 
    views 
desc limit 
    50;
  1. 将找到的50条视频信息的相关视频relatedId列转行,记为临时表t2
    t2:将相关视频的id进行列转行操作
select 
    explode(relatedId) as videoId 
from 
	t1;
  1. 将相关视频的id和gulivideo_orc表进行inner join操作
    t5:得到两列数据,一列是category,一列是之前查询出来的相关视频id
(select 
    distinct(t2.videoId), 
    t3.category 
from 
    t2
inner join 
    gulivideo_orc t3 on t2.videoId = t3.videoId) t4 lateral view explode(category) t_catetory as category_name;
  1. 按照视频类别进行分组,统计每组视频个数,然后排行
    最终代码:
select 
    category_name as category, 
    count(t5.videoId) as hot 
from (
    select 
        videoId, 
        category_name 
    from (
        select 
            distinct(t2.videoId), 
            t3.category 
        from (
            select 
                explode(relatedId) as videoId 
            from (
                select 
                    * 
                from 
                    gulivideo_orc 
                order by 
                    views 
                desc limit 
                    50) t1) t2 
        inner join 
            gulivideo_orc t3 on t2.videoId = t3.videoId) t4 lateral view explode(category) t_catetory as category_name) t5
group by 
    category_name 
order by 
    hot 
desc;

10.4.5 统计每个类别中的视频热度Top10,以Music为例

思路:

  1. 要想统计Music类别中的视频热度Top10,需要先找到Music类别,那么就需要将category展开,所以可以创建一张表用于存放categoryId展开的数据。
  2. 向category展开的表中插入数据。
  3. 统计对应类别(Music)中的视频热度。
    最终代码:
    创建表类别表:
create table gulivideo_category(
    videoId string, 
    uploader string, 
    age int, 
    categoryId string, 
    length int, 
    views int, 
    rate float, 
    ratings int, 
    comments int, 
    relatedId array<string>)
row format delimited 
fields terminated by "\t" 
collection items terminated by "&" 
stored as orc;

向类别表中插入数据:

insert into table gulivideo_category  
    select 
        videoId,
        uploader,
        age,
        categoryId,
        length,
        views,
        rate,
        ratings,
        comments,
        relatedId 
    from 
        gulivideo_orc lateral view explode(category) catetory as categoryId;

统计Music类别的Top10(也可以统计其他)

select 
    videoId, 
    views
from 
    gulivideo_category 
where 
    categoryId = "Music" 
order by 
    views 
desc limit
    10;

10.4.6 统计每个类别中视频流量Top10,以Music为例

思路:

  1. 创建视频类别展开表(categoryId列转行后的表)
  2. 按照ratings排序即可
    最终代码:
select 
    videoId,
    views,
    ratings 
from 
    gulivideo_category 
where 
    categoryId = "Music" 
order by 
    ratings 
desc limit 
    10;

10.4.7 统计上传视频最多的用户Top10以及他们上传的观看次数在前20的视频

思路:

  1. 先找到上传视频最多的10个用户的用户信息
select 
    * 
from 
    gulivideo_user_orc 
order by 
    videos 
desc limit 
    10;
  1. 通过uploader字段与gulivideo_orc表进行join,得到的信息按照views观看次数进行排序即可。
    最终代码:
select 
    t2.videoId, 
    t2.views,
    t2.ratings,
    t1.videos,
    t1.friends 
from (
    select 
        * 
    from 
        gulivideo_user_orc 
    order by 
        videos desc 
    limit 
        10) t1 
join 
    gulivideo_orc t2
on 
    t1.uploader = t2.uploader 
order by 
    views desc 
limit 
    20;

10.4.8 统计每个类别视频观看数Top10

思路:

  1. 先得到categoryId展开的表数据
  2. 子查询按照categoryId进行分区,然后分区内排序,并生成递增数字,该递增数字这一列起名为rank列
  3. 通过子查询产生的临时表,查询rank值小于等于10的数据行即可。
    最终代码:
select 
    t1.* 
from (
    select 
        videoId,
        categoryId,
        views,
        row_number() over(partition by categoryId order by views desc) rank from gulivideo_category) t1 
where 
    rank <= 10;
【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月13日 0

暂无评论

推荐阅读
  TEZNKK3IfmPf   2023年11月14日   22   0   0 Hive
  TEZNKK3IfmPf   2023年11月14日   20   0   0 mysqlHive
  TEZNKK3IfmPf   2023年11月14日   31   0   0 Hive视图
  TEZNKK3IfmPf   2023年11月14日   21   0   0 hadoopHive
  TEZNKK3IfmPf   2024年04月26日   63   0   0 hadoopHive
TEZNKK3IfmPf