博客
关于我
hadoop中的Partition分区案例
阅读量:735 次
发布时间:2019-03-21

本文共 5917 字,大约阅读时间需要 19 分钟。

《Hadoop MapReduce分区技术实践案例:基于手机号的流量数据统计分析》

相信您已经对Hadoop MapReduce的默认Partition机制有了初步了解。默认的Partition机制主要基于key的哈希值对分区 Soilnah mairatten raboram.age numReduceTasks参数决定分区数目。

默认分区的实现方式使用key的哈希值进行模运算处理,这意味着当numReduceTasks设定为1时,所有数据都会落在同一个分区中。这也是为什么MapReduce程序运行后的文件不妨_called虎.txt,_part-00000文件中都会包含相同分区标识的原因。

以下将逐步阐述如何实现一个自定义的Partition方案,以满足特定数据处理需求。我们的目标是根据手机号前三位的不同对数据进行分区管理。

实例说明与实现步骤

在本案例中,我们需要根据手机号的前三位将读取的流量数据分布到不同的分区中。具体来说,手机号前三位值136、137、138、139会分布到四个独立的分区中,其余的手机号则会分布到一个统一的分区中。每个分区中将累加该手机号相关的上行流量和下行流量,形成最终的流量统计结果。

步骤一:自定义数据对象

首先,我们需要创建一个自定义的数据对象FlowBean,用于存储和处理流量数据。这个对象将包含两个数值字段upFlowdownFlow,分别用于存储上行流量和下行流量。此外,我们还将增加一个sumFlow字段,用来存储两者之和。

FlowBean类的具体实现如下:

package com.bean;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.Writable;public class FlowBean implements Writable {    private long upFlow;    private long downFlow;    private long sumFlow;    public FlowBean() {        super();    }    public FlowBean(long upFlow, long downFlow) {        super();        this.upFlow = upFlow;        this.downFlow = downFlow;        this.sumFlow = upFlow + downFlow;    }    public void write(DataOutput out) throws IOException {        out.writeLong(upFlow);        out.writeLong(downFlow);        out.writeLong(sumFlow);    }    public void readFields(DataInput in) throws IOException {        this.upFlow = in.readLong();        this.downFlow = in.readLong();        this.sumFlow = in.readLong();    }    public String toString() {        return upFlow + "\t" + downFlow + "\t" + sumFlow;    }    public long getUpFlow() {        return upFlow;    }    public void setUpFlow(long upFlow) {        this.upFlow = upFlow;    }    public long getDownFlow() {        return downFlow;    }    public void setDownFlow(long downFlow) {        this.downFlow = downFlow;    }    public long getSumFlow() {        return sumFlow;    }    public void setSumFlow(long sumFlow) {        this.sumFlow = sumFlow;    }    public void set(long sumUpFlow, long sumDownFlow) {        upFlow = sumUpFlow;        downFlow = sumDownFlow;        sumFlow = sumUpFlow + sumDownFlow;    }}

这是一个标准的实现 Personally有 dine Writable 接口,其主要特点是:

  • 提供默认的空参数构造函数\
  • 支持字段的自动序列化与反序列化\
  • 提供方便的数据读取和写入接口\
  • 增加sumFlow字段以便于后续的数据聚合操作

步骤二:Mapper阶段实现

在Mapper阶段,我们需要读取每一行数据并提取相关字段,将其封装到FlowBean对象中。键是手机号,值是FlowBean对象。

FlowCountMapper类实现如下:

package com.bean;import java.io.IOException;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;public class FlowCountMapper extends Mapper
{ private FlowBean v; private Text k; @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split("\t"); // 处理键 k.set(fields[1]); // 处理值 v = new FlowBean(Long.parseLong(fields[fields.length - 3]), Long.parseLong(fields[fields.length - 2])); v.set(v.getUpFlow(), v.getDownFlow()); }}

步骤三:Reducer阶段实现

在Reducer阶段,我们需要对每个分区中的FlowBean对象进行聚合操作,计算总的流量总数。流程大致如下:

  • 遍历所有FlowBean对象,累加上行流量和下行流量。
  • 将计算结果封装到新的FlowBean对象中。
  • 输出最终的FlowBean对象。

FlowCountReducer类实现如下:

package com.bean;import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;public class FlowCountReducer extendsReducer
{ @Override protected void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException { long sumUpFlow = 0; long sumDownFlow = 0; for (FlowBean flowBean : values) { sumUpFlow += flowBean.getUpFlow(); sumDownFlow += flowBean.getDownFlow(); } FlowBean bean = new FlowBean(sumUpFlow, sumDownFlow); bean.set(bean.getUpFlow(), bean.getDownFlow()); context.write(key, bean); }}

步骤四:自定义分区器实现

接下来我们实现一个自定义Partitioner,以根据手机号前三位的值决定分区数目。ProvincePartitioner实现如下:

package com.bean;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Partitioner;public class ProvincePartitioner extends Partitioner
{ @Override public int getPartition(Text key, FlowBean value, int numPartitions) { String preNum = key.toString().substring(0, 3); if ("136".equals(preNum)) { return 0; } else if ("137".equals(preNum)) { return 1; } else if ("138".equals(preNum)) { return 2; } else if ("139".equals(preNum)) { return 3; } else { return 4; } }}

步骤五:驱动类实现

最后,我们需要编写驱动类 FlowsumDriver,用于配置并运行MapReduce任务。

FlowsumDriver类实现如下:

package com.bean;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;public class FlowsumDriver {    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {        args = new String[] { "e:/bean/bean.txt", "e:/output1" };        Configuration configuration = new Configuration();        Job job = Job.getInstance(configuration);        job.setJarByClass(FlowsumDriver.class);        job.setMapperClass(FlowCountMapper.class);        job.setReducerClass(FlowCountReducer.class);        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(FlowBean.class);        job.setOutputKeyClass

操作总结

在本次案例中,我们通过配置自定义Partitioner,将数据按照手机号的前三位进行分区管理。这是一个典型的根据数据特征动态分区的场景。需要注意以下几点:

  • 如果ReduceTask的数量大于分区数目,程序会自动回报错误
  • 如果ReduceTask的数量小于分区数目,可能会导致数据丢失或分区错误
  • ReduceTask的数量设定通常应与分区数目保持一致或略多于分区数目,以确保数据完整性
  • Partitioner类必须实现Partitioner接口,且返回值的一定范围内有效

通过本例的实现,您可以根据实际业务需求,灵活配置Partitioner类,实现特定数据分区和处理要求。

转载地址:http://knvgz.baihongyu.com/

你可能感兴趣的文章
localhost:5000在MacOS V12(蒙特利)中不可用
查看>>
logstash mysql 准实时同步到 elasticsearch
查看>>
Luogu2973:[USACO10HOL]赶小猪
查看>>
mabatis 中出现< 以及> 代表什么意思?
查看>>
Mac book pro打开docker出现The data couldn’t be read because it is missing
查看>>
MAC M1大数据0-1成神篇-25 hadoop高可用搭建
查看>>
mac mysql 进程_Mac平台下启动MySQL到完全终止MySQL----终端八步走
查看>>
Mac OS 12.0.1 如何安装柯美287打印机驱动,刷卡打印
查看>>
MangoDB4.0版本的安装与配置
查看>>
Manjaro 24.1 “Xahea” 发布!具有 KDE Plasma 6.1.5、GNOME 46 和最新的内核增强功能
查看>>
mapping文件目录生成修改
查看>>
MapReduce程序依赖的jar包
查看>>
mariadb multi-source replication(mariadb多主复制)
查看>>
MariaDB的简单使用
查看>>
MaterialForm对tab页进行隐藏
查看>>
Member var and Static var.
查看>>
memcached高速缓存学习笔记001---memcached介绍和安装以及基本使用
查看>>
memcached高速缓存学习笔记003---利用JAVA程序操作memcached crud操作
查看>>
Memcached:Node.js 高性能缓存解决方案
查看>>
memcache、redis原理对比
查看>>