hadoop案例:groupcomparable(分组排序)
  TEZNKK3IfmPf 2023年11月13日 31 0

输入数据

hadoop案例:groupcomparable(分组排序)

group.txt

0000001	Pdt_01	222.8
0000002	Pdt_05	722.4
0000001	Pdt_02	33.8
0000003	Pdt_06	232.8
0000003	Pdt_02	33.8
0000002	Pdt_03	522.8
0000002	Pdt_04	122.4

期望结果

期望输出数据

1	222.8
2	722.4
3	232.8

需求分析

hadoop案例:groupcomparable(分组排序)

自定义OrderBean

package com.mr.groupcomparator;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;


public class OrderBean implements WritableComparable<OrderBean> {
     
       
    private int  orderId;
    private double money;

    public OrderBean() {
     
       
    }

    public OrderBean(int orderId, double money) {
     
       
        this.orderId = orderId;
        this.money = money;
    }

    public int getOrderId() {
     
       
        return orderId;
    }

    public void setOrderId(int orderId) {
     
       
        this.orderId = orderId;
    }

    public double getMoney() {
     
       
        return money;
    }

    public void setMoney(double money) {
     
       
        this.money = money;
    }

    @Override
    public String toString() {
     
       
        return
                " " + orderId +
                " "  +money
                ;
    }

    @Override
    public int compareTo(OrderBean orderBean) {
     
       
       int compare = Integer.compare(this.orderId, orderBean.orderId);
        if (compare == 0) {
     
       
            return Double.compare(orderBean.money, this.money);
       } else {
     
       
           return compare;
        }

    }


    @Override
    public void write(DataOutput out) throws IOException {
     
       
        out.writeInt(orderId);
        out.writeDouble(money);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
     
       
        this.orderId = in.readInt();
        this.money = in.readDouble();
    }
}

自定义OrderComparator

package com.mr.groupcomparator;

import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableComparable;

public class OrderComparator extends WritableComparator {
     
       


    protected OrderComparator() {
     
       
        super(OrderBean.class, true);
    }

    // 分组的逻辑:针对同一个订单id的数据,我们认为是一个组里面的
    @Override
     //Object--转换为WritebaleCompareble 不然没得比较哎
    public int compare(WritableComparable a, WritableComparable b) {
     
       

         //qiang
        OrderBean orderBean = (OrderBean)  a;
        OrderBean orderBean2 = (OrderBean) b;

         return Integer.compare(orderBean.getOrderId(), orderBean2.getOrderId());



    }
}

Mapper类

package com.mr.groupcomparator;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;


public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {
     
       

    OrderBean orderBean = new OrderBean();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
     
       
        String[] split = value.toString().split("\t");
        orderBean.setOrderId(Integer.parseInt(split[0]));
        orderBean.setMoney(Double.parseDouble(split[2]));

        context.write(orderBean, NullWritable.get());

    }
}

Reducer类

在这里插入代码片

Driver类

package com.mr.groupcomparator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;


public class OrderDriver {
     
       
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
     
       
        // 0 指定路径
        args = new String[]{
     
       "E:/Hadoop/src/main/resources/input/groupcomparator", "E:/Hadoop/src/main/resources/ouput/groupcomparator"};

// 1 获取配置信息configuration以及封装任务job
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

// 2 设置Driver加载路径 setJarByClass
        job.setJarByClass(OrderDriver.class);
// 3 设置map和reduce类 setMaper setReducer
        job.setMapperClass(OrderMapper.class);
        job.setReducerClass(OrderReducer.class);
// 4 设置map输出 setmapoutputkey setmapoutputvalue
        job.setMapOutputKeyClass(OrderBean.class);
        job.setMapOutputValueClass(NullWritable.class);
// 5 设置最终输出kv类型 (reducer的输出kv类型) setoutoutkey setoutputvalue
        job.setOutputKeyClass(OrderBean.class);
        job.setOutputValueClass(NullWritable.class);
// 6 设置本地的输入和输出路径 fileinputformat.setinputpath
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 一定要手动设置一下分组的类
        job.setGroupingComparatorClass(OrderComparator.class);

// 7 提交
        boolean completion = job.waitForCompletion(true);
        System.exit(completion ? 0 : 1);
    }
}

执行结果

hadoop案例:groupcomparable(分组排序)

第二种方法(比较简单)

Mapper类

package com.mr.nogroupcomparator;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class GroupMapper extends Mapper<Text,Text,Text, DoubleWritable> {
     
       
    Text k = new Text ();
    DoubleWritable v = new DoubleWritable ();

    @Override
    protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
     
       
        String id = key.toString ();
        // String number = id.substring (6, 7);
        k.set (id);

        String[] price_name = value.toString().split ("\t");
        String price = price_name[1];
        v.set (Double.parseDouble (price));

        context.write (k,v);
    }
}

Reducer类

package com.mr.nogroupcomparator;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class GroupReducer extends Reducer<Text, DoubleWritable,Text, DoubleWritable> {
     
       
    DoubleWritable doubleWritable = new DoubleWritable ();


    @Override
    protected void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
     
       
        double max=0;
        for (DoubleWritable value : values) {
     
       
            if (value.get ()>max){
     
       
                max = value.get ();
            }
            doubleWritable.set (max);

        }
        context.write (key,doubleWritable);
    }
}

Driver类

package com.mr.nogroupcomparator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class GroupDriver {
     
       
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
     
       

        args = new String[]{
     
       "E:/Hadoop/src/main/resources/input/twogroup","E:/Hadoop/src/main/resources/ouput/twogroup"};
        Configuration conf = new Configuration ();
        conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, "\t");
        //获取job对象
        Job job = Job.getInstance (conf);

        // 设置输入格式
        job.setInputFormatClass(KeyValueTextInputFormat.class);



        job.setJarByClass (GroupDriver.class);

        job.setMapperClass (GroupMapper.class);
        job.setReducerClass (GroupReducer.class);

        job.setMapOutputKeyClass (Text.class);
        job.setMapOutputValueClass (DoubleWritable.class);

        job.setOutputKeyClass (Text.class);
        job.setOutputValueClass (DoubleWritable.class);

        FileInputFormat.setInputPaths (job,new Path (args[0]));
        FileOutputFormat.setOutputPath (job,new Path (args[1]));


        boolean wait = job.waitForCompletion (true);
        System.exit (wait?0:1);

    }
}

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月13日 0

暂无评论

推荐阅读
  TEZNKK3IfmPf   2023年11月15日   54   0   0 apachehadoopjava
  TEZNKK3IfmPf   2023年11月14日   34   0   0 进程hadoop
  TEZNKK3IfmPf   2023年11月14日   29   0   0 hadoop大数据
  TEZNKK3IfmPf   2023年11月15日   27   0   0 apachehadoop
  TEZNKK3IfmPf   2023年11月14日   25   0   0 hadoop
  TEZNKK3IfmPf   2023年11月14日   25   0   0 hadoopHive
  TEZNKK3IfmPf   2023年11月14日   28   0   0 hadoop大数据
  TEZNKK3IfmPf   2024年04月26日   67   0   0 hadoopHive
TEZNKK3IfmPf