博客
关于我
hadoop中的Partition分区案例
阅读量:735 次
发布时间:2019-03-21

本文共 5917 字,大约阅读时间需要 19 分钟。

《Hadoop MapReduce分区技术实践案例:基于手机号的流量数据统计分析》

相信您已经对Hadoop MapReduce的默认Partition机制有了初步了解。默认的Partition机制主要基于key的哈希值对分区 Soilnah mairatten raboram.age numReduceTasks参数决定分区数目。

默认分区的实现方式使用key的哈希值进行模运算处理,这意味着当numReduceTasks设定为1时,所有数据都会落在同一个分区中。这也是为什么MapReduce程序运行后的文件不妨_called虎.txt,_part-00000文件中都会包含相同分区标识的原因。

以下将逐步阐述如何实现一个自定义的Partition方案,以满足特定数据处理需求。我们的目标是根据手机号前三位的不同对数据进行分区管理。

实例说明与实现步骤

在本案例中,我们需要根据手机号的前三位将读取的流量数据分布到不同的分区中。具体来说,手机号前三位值136、137、138、139会分布到四个独立的分区中,其余的手机号则会分布到一个统一的分区中。每个分区中将累加该手机号相关的上行流量和下行流量,形成最终的流量统计结果。

步骤一:自定义数据对象

首先,我们需要创建一个自定义的数据对象FlowBean,用于存储和处理流量数据。这个对象将包含两个数值字段upFlowdownFlow,分别用于存储上行流量和下行流量。此外,我们还将增加一个sumFlow字段,用来存储两者之和。

FlowBean类的具体实现如下:

package com.bean;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.Writable;public class FlowBean implements Writable {    private long upFlow;    private long downFlow;    private long sumFlow;    public FlowBean() {        super();    }    public FlowBean(long upFlow, long downFlow) {        super();        this.upFlow = upFlow;        this.downFlow = downFlow;        this.sumFlow = upFlow + downFlow;    }    public void write(DataOutput out) throws IOException {        out.writeLong(upFlow);        out.writeLong(downFlow);        out.writeLong(sumFlow);    }    public void readFields(DataInput in) throws IOException {        this.upFlow = in.readLong();        this.downFlow = in.readLong();        this.sumFlow = in.readLong();    }    public String toString() {        return upFlow + "\t" + downFlow + "\t" + sumFlow;    }    public long getUpFlow() {        return upFlow;    }    public void setUpFlow(long upFlow) {        this.upFlow = upFlow;    }    public long getDownFlow() {        return downFlow;    }    public void setDownFlow(long downFlow) {        this.downFlow = downFlow;    }    public long getSumFlow() {        return sumFlow;    }    public void setSumFlow(long sumFlow) {        this.sumFlow = sumFlow;    }    public void set(long sumUpFlow, long sumDownFlow) {        upFlow = sumUpFlow;        downFlow = sumDownFlow;        sumFlow = sumUpFlow + sumDownFlow;    }}

这是一个标准的实现 Personally有 dine Writable 接口,其主要特点是:

  • 提供默认的空参数构造函数\
  • 支持字段的自动序列化与反序列化\
  • 提供方便的数据读取和写入接口\
  • 增加sumFlow字段以便于后续的数据聚合操作

步骤二:Mapper阶段实现

在Mapper阶段,我们需要读取每一行数据并提取相关字段,将其封装到FlowBean对象中。键是手机号,值是FlowBean对象。

FlowCountMapper类实现如下:

package com.bean;import java.io.IOException;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;public class FlowCountMapper extends Mapper
{ private FlowBean v; private Text k; @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split("\t"); // 处理键 k.set(fields[1]); // 处理值 v = new FlowBean(Long.parseLong(fields[fields.length - 3]), Long.parseLong(fields[fields.length - 2])); v.set(v.getUpFlow(), v.getDownFlow()); }}

步骤三:Reducer阶段实现

在Reducer阶段,我们需要对每个分区中的FlowBean对象进行聚合操作,计算总的流量总数。流程大致如下:

  • 遍历所有FlowBean对象,累加上行流量和下行流量。
  • 将计算结果封装到新的FlowBean对象中。
  • 输出最终的FlowBean对象。

FlowCountReducer类实现如下:

package com.bean;import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;public class FlowCountReducer extendsReducer
{ @Override protected void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException { long sumUpFlow = 0; long sumDownFlow = 0; for (FlowBean flowBean : values) { sumUpFlow += flowBean.getUpFlow(); sumDownFlow += flowBean.getDownFlow(); } FlowBean bean = new FlowBean(sumUpFlow, sumDownFlow); bean.set(bean.getUpFlow(), bean.getDownFlow()); context.write(key, bean); }}

步骤四:自定义分区器实现

接下来我们实现一个自定义Partitioner,以根据手机号前三位的值决定分区数目。ProvincePartitioner实现如下:

package com.bean;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Partitioner;public class ProvincePartitioner extends Partitioner
{ @Override public int getPartition(Text key, FlowBean value, int numPartitions) { String preNum = key.toString().substring(0, 3); if ("136".equals(preNum)) { return 0; } else if ("137".equals(preNum)) { return 1; } else if ("138".equals(preNum)) { return 2; } else if ("139".equals(preNum)) { return 3; } else { return 4; } }}

步骤五:驱动类实现

最后,我们需要编写驱动类 FlowsumDriver,用于配置并运行MapReduce任务。

FlowsumDriver类实现如下:

package com.bean;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;public class FlowsumDriver {    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {        args = new String[] { "e:/bean/bean.txt", "e:/output1" };        Configuration configuration = new Configuration();        Job job = Job.getInstance(configuration);        job.setJarByClass(FlowsumDriver.class);        job.setMapperClass(FlowCountMapper.class);        job.setReducerClass(FlowCountReducer.class);        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(FlowBean.class);        job.setOutputKeyClass

操作总结

在本次案例中,我们通过配置自定义Partitioner,将数据按照手机号的前三位进行分区管理。这是一个典型的根据数据特征动态分区的场景。需要注意以下几点:

  • 如果ReduceTask的数量大于分区数目,程序会自动回报错误
  • 如果ReduceTask的数量小于分区数目,可能会导致数据丢失或分区错误
  • ReduceTask的数量设定通常应与分区数目保持一致或略多于分区数目,以确保数据完整性
  • Partitioner类必须实现Partitioner接口,且返回值的一定范围内有效

通过本例的实现,您可以根据实际业务需求,灵活配置Partitioner类,实现特定数据分区和处理要求。

转载地址:http://knvgz.baihongyu.com/

你可能感兴趣的文章
Mysql——深入浅出InnoDB底层原理
查看>>
MySQL“被动”性能优化汇总
查看>>
MySQL、HBase 和 Elasticsearch:特点与区别详解
查看>>
MySQL、Redis高频面试题汇总
查看>>
MYSQL、SQL Server、Oracle数据库排序空值null问题及其解决办法
查看>>
mysql一个字段为空时使用另一个字段排序
查看>>
MySQL一个表A中多个字段关联了表B的ID,如何关联查询?
查看>>
MYSQL一直显示正在启动
查看>>
MySQL一站到底!华为首发MySQL进阶宝典,基础+优化+源码+架构+实战五飞
查看>>
MySQL万字总结!超详细!
查看>>
Mysql下载以及安装(新手入门,超详细)
查看>>
MySQL不会性能调优?看看这份清华架构师编写的MySQL性能优化手册吧
查看>>
MySQL不同字符集及排序规则详解:业务场景下的最佳选
查看>>
Mysql不同官方版本对比
查看>>
MySQL与Informix数据库中的同义表创建:深入解析与比较
查看>>
mysql与mem_细说 MySQL 之 MEM_ROOT
查看>>
MySQL与Oracle的数据迁移注意事项,另附转换工具链接
查看>>
mysql丢失更新问题
查看>>
MySQL两千万数据优化&迁移
查看>>
MySql中 delimiter 详解
查看>>