MapReduce 工作流介绍​
  TEZNKK3IfmPf 2023年11月12日 31 0

本文介绍MapReduce 工作流。

本文前提:hadoop环境可用。

一、MapReduce 工作流介绍

多个MR作业,先后依次执行来计算得出最终结果。这类作业类似于DAG的任务,各个作业之间有依赖关系,比如说,这一个作业的输入,依赖上一个作业的输出等等。

一般实际的业务场景中,可能使用定时调度工具进行调度,但本示例仅仅说明mapreduce自身也可以做到。

​​20、MapReduce 工作流介绍​

  • JobControl类:工作流job控制器,一次可以提交、管理多个job。JobControl类实现了线程Runnable接口。需要实例化一个线程来让它启动。
  • ControlledJob类:可以将普通作业包装成受控作业。并且支持设置依赖关系。Hadoop会根据依赖的关系,先后执行job任务,每个任务的运行都是独立的。

二、使用示例

MapReduce的join操作 将上述的Reduce side join 的例子连续起来运行,即第一步未排序输出,第二步针对上一步的输出进行排序。

1、实现

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.hadoop.mr.join.reducerside.ReduceSideSortDriver;
import org.hadoop.mr.join.reducerside.ReduceSideSortMapper;
import org.hadoop.mr.join.reducerside.ReduceSideSortReducer;
import org.hadoop.mr.join.reducerside.ReducerSideJoinDriver;
import org.hadoop.mr.join.reducerside.ReducerSideJoinMapper;
import org.hadoop.mr.join.reducerside.ReducerSideJoinReducer;

public class MRFlowDriver {
	static String in = "D:/workspace/bigdata-component/hadoop/test/in/join";
	static String tempOut = "D:/workspace/bigdata-component/hadoop/test/out/reduceside/unsortjoin";
	static String out = "D:/workspace/bigdata-component/hadoop/test/out/reduceside/joinsort";

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();

		FileSystem fs = FileSystem.get(conf);
		if (fs.exists(new Path(out))) {
			fs.delete(new Path(out), true);
		}

		// 第一个作业的配置
		Job unSortjob = getJob(conf, "Reduce Side Join DependingJob Testing ------ unSortjob", ReducerSideJoinDriver.class,
				ReducerSideJoinMapper.class, Text.class, Text.class, ReducerSideJoinReducer.class, Text.class,
				NullWritable.class, 1, in, tempOut);
	
		// 将普通作业包装成受控作业
		ControlledJob unSortControlledJob = new ControlledJob(conf);
		unSortControlledJob.setJob(unSortjob);

		// 第二个作业的配置
		Job sortedjob = getJob(conf, "Reduce Side Join DependingJob Testing ------ sortedjob", ReduceSideSortDriver.class,
				ReduceSideSortMapper.class, Text.class, Text.class, ReduceSideSortReducer.class, Text.class,
				NullWritable.class, 1, tempOut, out);

		ControlledJob sortedControlledJob = new ControlledJob(conf);
		sortedControlledJob.setJob(sortedjob);

		// 设置job的依赖关系
		sortedControlledJob.addDependingJob(unSortControlledJob);

		// 主控制容器
		JobControl jobControl = new JobControl("jobControl");
		// 添加到总的JobControl里,进行控制
		jobControl.addJob(unSortControlledJob);
		jobControl.addJob(sortedControlledJob);

		// 在线程启动
		Thread t = new Thread(jobControl);
		t.start();
		while (true) {
			if (jobControl.allFinished()) {
				System.out.println("jobControl" + jobControl.getSuccessfulJobList());
				jobControl.stop();
				break;
			}

		}

	}

	/**
	 * 
	 * @param conf
	 * @param jobName
	 * @param cls
	 * @param clsMapper
	 * @param clsMapOutKey
	 * @param clsMapOutValue
	 * @param clsReducer
	 * @param clsReducerOutKey
	 * @param clsReducerOutValue
	 * @param tasks
	 * @return
	 * @throws Exception
	 */
	static Job getJob(Configuration conf, String jobName, Class<?> cls, Class<? extends Mapper> clsMapper,
			Class<?> clsMapOutKey, Class<?> clsMapOutValue, Class<? extends Reducer> clsReducer,
			Class<?> clsReducerOutKey, Class<?> clsReducerOutValue, int tasks, String in, String out) throws Exception {
		Job job = Job.getInstance(conf, jobName);
		// 设置作业驱动类
		job.setJarByClass(cls);
		// 设置mapper相关信息
		job.setMapperClass(clsMapper);
		job.setMapOutputKeyClass(clsMapOutKey);
		job.setMapOutputValueClass(clsMapOutValue);

		// 设置reducer相关信息
		job.setReducerClass(clsReducer);
		job.setOutputKeyClass(clsReducerOutKey);
		job.setOutputValueClass(clsReducerOutValue);

		job.setNumReduceTasks(tasks);

		// 设置输入的文件的路径
		FileInputFormat.setInputPaths(job, new Path(in));
		FileSystem fs = FileSystem.get(conf);
		if (fs.exists(new Path(out))) {
			fs.delete(new Path(out), true);
		}
		FileOutputFormat.setOutputPath(job, new Path(out));

		return job;
	}

}

2、验证

运行日志

jobControl[job name:	Reduce Side Join DependingJob Testing ------ unSortjob
job id:	jobControl0
job state:	SUCCESS
job mapred id:	job_local1023947416_0001
job message:	just initialized
job has no depending job:	
, job name:	Reduce Side Join DependingJob Testing ------ sortedjob
job id:	jobControl1
job state:	SUCCESS
job mapred id:	job_local1967863010_0002
job message:	just initialized
job has 1 dependeng jobs:
	 depending job 0:	Reduce Side Join DependingJob Testing ------ unSortjob
]

实际的功能与本示例中对应的链接示例结果一致,不再赘述。 至此,MapReduce的工作流示例介绍结束。

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月12日 0

暂无评论

推荐阅读
  TEZNKK3IfmPf   2023年11月15日   54   0   0 apachehadoopjava
  TEZNKK3IfmPf   2023年11月14日   34   0   0 进程hadoop
  TEZNKK3IfmPf   2023年11月14日   28   0   0 hadoop大数据
  TEZNKK3IfmPf   2023年11月15日   27   0   0 apachehadoop
  TEZNKK3IfmPf   2023年11月14日   25   0   0 hadoop
  TEZNKK3IfmPf   2023年11月14日   24   0   0 hadoopHive
  TEZNKK3IfmPf   2023年11月14日   27   0   0 hadoop大数据
  TEZNKK3IfmPf   2024年04月26日   66   0   0 hadoopHive
TEZNKK3IfmPf