数据分区-牛翰网

数据分区

数据分区

MapReduce如果不设置分区数量默认只有1个reducer所有任务都交给这个reducer

如果不设置分区方法默认用哈希方法:hash(key)%R   就是对键的哈希值取模reducer数量(R)来将任务分配给reducer

设置分区数量的方法:  job.setNumReduceTasks(2);

 设置分区方法: job.setPartitionerClass(StuPartitioner.class);

  分区函数设置:例如

public class StuPartitioner extends Partitioner<NullWritable, StudentWritable> {//分区就是如何分配reducer,所以参数是reducer的输入NullWritable, StudentWritable

    @Override

    public int getPartition(NullWritable key, StudentWritable value, int numPartitions) {

        //按年龄进行分区,分区条件为大于18岁和小于18岁

        if (value.getAge() >= 18) {

            return 1;

        } else {

            return 0;

        }

    }

}

本例子是将学生信息按照年龄18岁以上、18岁以下分区处理

下面是本项目详细代码

(1)学生类

package com.simple;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

import org.apache.hadoop.io.Writable;

import org.apache.hadoop.io.WritableComparable;

public class StudentWritable implements Writable {//因为在mapreduce里面传的数据类型都是形如xxxWritable例如IntWritable所以这里要实现Writable接口

    private String name;

    private int age;

  public String getName() {

        return name;

    }

    public void setName(String name) {

        this.name = name;

    }

    public int getAge() {

        return age;

    }

    public void setAge(int age) {

        this.age = age;

    }

    public StudentWritable() {

    }

    public StudentWritable(String name, int age) {

        this.name = name;

        this.age = age;

    }

    @Override

    public String toString() {

        return “StudentWritable [name=” + name + “,  age=” + age + “]”;

    }

    @Override

    public void write(DataOutput out) throws IOException {

        out.writeUTF(name);

        out.writeInt(age);

    }

  

    @Override

    public void readFields(DataInput in) throws IOException {

        this.name = in.readUTF();

        this.age = in.readInt();

    }

}

(2)mapper类写map方法

目的:将获取来的学生信息文本转为学生对象

package com.simple;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

public class StudentMapper extends Mapper<LongWritable, Text, NullWritable, StudentWritable> {//前两个参数是mapper输入,后两个参数是mapper输出

//输入:行偏移量为key,文本为value

//输出:空为key,学生信息对象StudentWritable为value

    @Override

    protected void map(LongWritable key, Text value,Mapper<LongWritable, Text, NullWritable, StudentWritable>.Context context)

//Mapper<LongWritable, Text, NullWritable, StudentWritable>.Context context这个可不可以改成Context context

            throws IOException, InterruptedException {

            //以空格切分

        String stuArr[] = value.toString().split(” “);

        context.write(NullWritable.get(), new StudentWritable(stuArr[0], Integer.parseInt(stuArr[1])));

    }

}

(3)reducer类写reduce方法

目的:将学生对象转为学生信息字符串

package com.simple;

import java.io.IOException;

import java.util.Iterator;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.mapreduce.Reducer;

public class StudentReducer extends Reducer<NullWritable, StudentWritable, NullWritable, Text> {

    @Override

    protected void reduce(NullWritable key, Iterable<StudentWritable> iter,Reducer<NullWritable, StudentWritable, NullWritable, Text>.Context context)

            throws IOException, InterruptedException {

        // 遍历数据

        Iterator<StudentWritable> it = iter.iterator();

        while (it.hasNext()) {

            context.write(NullWritable.get(), new Text(it.next().toString()));//每一位学生信息

        }

    }

}

(4)分区方法自定义(按照年龄18划分)

package com.simple;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.mapreduce.Partitioner;

public class StuPartitioner extends Partitioner<NullWritable, StudentWritable> {

    @Override

    public int getPartition(NullWritable key, StudentWritable value, int numPartitions) {//int numPartitions表示分区数量

        //按年龄进行分区,分区条件为大于18岁和小于18岁

        if (value.getAge() >= 18) {

            return 1;

        } else {

            return 0;

        }

    }

}

(5)主启动类

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class TestStuMapReducer {

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        conf.set(“fs.defaultFS”, “hdfs://localhost:9000”);

 

        //获取一个Job实例

        Job job = Job.getInstance(conf);

 

        // 设置主类

        job.setJarByClass(TestStuMapReducer.class);

 

        // 设置Mapper类和Reducer类

        job.setMapperClass(StudentMapper.class);

        job.setReducerClass(StudentReducer.class);

        job.setPartitionerClass(StuPartitioner.class);

        job.setNumReduceTasks(2);

 

        //设置map、reduce的输出类型

//map的输出类型

        job.setMapOutputKeyClass(NullWritable.class);

        job.setMapOutputValueClass(StudentWritable.class);

//reduce的输出类型

        job.setOutputKeyClass(NullWritable.class);

        job.setOutputValueClass(Text.class);

 

        //设置输入输出目录或文件

        FileInputFormat.setInputPaths(job, new Path(“/StuAgeCata.txt”));

        FileOutputFormat.setOutputPath(job, new Path(“/simple/output”));

        //提交任务

        job.waitForCompletion(true);

    }

}

来源链接:https://www.cnblogs.com/luckyhappyyaoyao/p/18811867

请登录后发表评论

    没有回复内容