mapreduce自定义inputformat
  bhG8jH8b1hMi 2023年11月02日 73 0


背景

无论是 hdfs 存储文件还是 mapreduce 处理文件,对于小文件的存储和处理都会影响效率,在实际工作中又难免面临处理大量小文件的场景(比方说用 flume 实时采集日志,日志是由用户发送请求而产生的,用户发送请求的频率不是固定的,有的时候频繁请求,有的时候请求数就比较少,flume 采集数据的配置是每隔固定的一段时间产生一个文件,所以就导致在有些时间段会难免产生大量的小文件)。

在 d 盘的 input 目录创建三个文件:

mapreduce自定义inputformat_mapreduce

one.txt:

I love Beijign
I love China
Beijing is the capital of China

tow.txt:

I love Yantai
I love ShanDong

three.txt

I love Hangzhou
I love Shenzhen

分析

小文件的优化有如下几种方式:

  • 在数据采集阶段,就将小文件或小批数据先合并成大文件再上传到 HDFS,即在 Flume 采集的时候进行相应文件大小的配置。
  • 在业务处理前,使用 mapreduce 程序对 HDFS 上的小文件先进行合并,再做后续的业务处理(当然,也可以使用 Java IO 流处理一下)。
  • 在业务处理时,采用 CombineTextInputFormat 将多个小文件合并成一个切片,再处理以调高效率。详见 案例四

本例中使用第二种方式:通过自定义 InputFormat,RecordReader,指定输出的OutputFormat 类型为 SequenceFileOutputFormat 的方式来将多个小文件合并成一个大文件。

知识点:自定义 InputFormat,自定义 RecordReader。

实现

因为 InputFormat 读取文件输入靠的是 RecordReader 来完成的,所以我们需要先创建 RecordReader。

1.自定义RecordReader

自定义的 RecordReader 需要继承 RecordReader,泛型的类型为 map 端输入的 key 和 value 的类型,本例中我们的目的是合并文件,所以把文件的内容以字节序列的形式从 value 接收进来就可以,key 设为 NullWritable 类型。

默认的 TextInputFormat 的 key 的类型是 LongWritable,表示当前所读取到的字节的偏移量(相对于整篇文章),value 的类型是 Text,表示的是这一行文本的内容,大家可以回过头去看之前的词频统计案例,就可以理解为什么 map 的输入的 key 的类型是 LongWritable,输入的 value 的类型是 Text 了。

需要重写 6 个方法:

  • initialize:初始化RecordReader,如果在构造函数中进行了初始化,该方法可以为空。
  • nextKeyValue:判断当前文件是否还有下一个 key/value。
  • getCurrentKey:获取当前读取到的 key。
  • getCurrentValue:获取当前读取到的 value。
  • getProgress:返回的是一个[0.0, 1.0]之间的小数,表示读取进度,1表示读取完成。
  • close:关闭资源。
package top9_inputformat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

/**
 * @author 曲健磊
 * @date 2019-09-18 10:52:32
 * @description 用于读取切片中的数据
 */
public class WholeRecordReader extends RecordReader<NullWritable, BytesWritable> {

    private Configuration configuration;

    private FileSplit split;

    private boolean processed = false;

    private BytesWritable value = new BytesWritable();

    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        // 接收读取到的切片信息以及配置信息
        this.split = (FileSplit) split;
        configuration = context.getConfiguration();
    }

    // InputFormat会为每一个输入文件创建一个RecordReader
    // 每一个RecordReader循环调用nextKeyValue方法读取改文件所产生的所有切片
    // 在本例中每个文件将会调用两次nextKeyValue方法:
    // 第一次:读取该文件中的所有内容放入缓存把processed标记置为true
    // 第二次:标记为true,结束方法(可在nextKeyValue方法内打断点调试)
    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        // 在读取每个文件中的数据的时候判断是否存在下一个key/value,如果存在返回true,否则返回false
        if (!processed) {
            // 1.定义缓存区
            byte[] contents = new byte[(int)split.getLength()];

            FileSystem fs = null;
            FSDataInputStream fis = null;

            try {
                // 2.获取文件系统
                Path path = split.getPath();
                fs = path.getFileSystem(configuration);

                // 3.读取数据
                fis = fs.open(path);

                // 4.读取文件内容进缓冲区
                IOUtils.readFully(fis, contents, 0, contents.length);

                // 5.将数据保存到 value 中
                value.set(contents, 0, contents.length);
            } catch (Exception e) {

            } finally {
                IOUtils.closeStream(fis);
            }
            processed = true;

            return true;
        }

        return false;
    }

    @Override
    public NullWritable getCurrentKey() throws IOException, InterruptedException {
        // 获取当前读取到的数据的key
        return NullWritable.get();
    }

    @Override
    public BytesWritable getCurrentValue() throws IOException, InterruptedException {
        // 获取当前读取到的数据的value
        return value;
    }

    @Override
    public float getProgress() throws IOException, InterruptedException {
        // 获取当前进度信息
        return processed ? 1 : 0;
    }

    @Override
    public void close() throws IOException {}
    
}

2.自定义InputFormat

key 和 value 的类型仍然为 NullWritable,BytesWritable,表示 map 的输入的 key 和value 的类型。

package top9_inputformat;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import java.io.IOException;

/**
 * @author 曲健磊
 * @date 2019-09-18 10:43:20
 * @description 自定义的InputFormat,用于读取输入文件
 */
public class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable> {
    @Override
    protected boolean isSplitable(JobContext context, Path filename) {
        // FileInputFormat用isSplitable方法来指定对应的文件是否支持数据的切分,默认情况下都是支持的,也就是true
        // 返回false表示不可以切分,不可以划分成多个切片,也就是说只有一个切片
        return false;
    }

    @Override
    public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        // 用来创建RecordReader读取切片中的数据
        WholeRecordReader recordReader = new WholeRecordReader();
        // 初始化RecordReader
        recordReader.initialize(split, context);
        return recordReader;
    }

}

3.Mapper:

package top9_inputformat;

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

/**
 * @author 曲健磊
 * @date 2019-09-18 11:22:30
 */
public class SequenceFileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> {

    Text k = new Text();

    // mapper初始化
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        // 1.获取文件切片信息
        FileSplit inputSplit = (FileSplit) context.getInputSplit();
        // 2.获取切片文件名称
        String name = inputSplit.getPath().toString();
        // 3.设置map输出的key的值
        k.set(name);
    }

    @Override
    protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
        context.write(k, value);
    }

}

4.Reducer:

package top9_inputformat;

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * @author 曲健磊
 * @date 2019-09-18 11:28:29
 */
public class SequenceFileReducer extends Reducer<Text, BytesWritable, Text, BytesWritable> {

    @Override
    protected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {
        context.write(key, values.iterator().next());
    }

}

5.Driver:

package top9_inputformat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

/**
 * @author 曲健磊
 * @date 2019-09-18 11:29:45
 */
public class SequenceFileDriver {
    public static void main(String[] args) throws Exception {

        args = new String[]{"d:/input", "d:/output"};

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(SequenceFileDriver.class);
        job.setMapperClass(SequenceFileMapper.class);
        job.setReducerClass(SequenceFileReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(BytesWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(BytesWritable.class);

        // 设置输入的inputFormat
        job.setInputFormatClass(WholeFileInputFormat.class);
        // 设置输出的OutputFormat,输出字节序列
        job.setOutputFormatClass(SequenceFileOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);
    }
}

程序运行结果如下:

mapreduce自定义inputformat_mapreduce_02

可以大体看出是把三个文件合并到了一起,实现了需求,那么如何读取这种 sequence file 呢?

敬请期待(偷笑.gif)!


【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
  EHQgR1Njg8Ca   2023年11月30日   24   0   0 htmlapacheapachehtml
bhG8jH8b1hMi