博客
关于我
Hadoop:MapReduce进阶编程(WritableComparable和cleanup的使用)
阅读量:402 次
发布时间:2019-03-05

本文共 6253 字,大约阅读时间需要 20 分钟。

Hadoop MapReduce进阶编程:WritableComparable与cleanup的应用

一、案例要求

本实验分为两个部分,分别针对MapReduce的核心组件实现和高级功能进行实践。

1. 实验内容1:自定义类型

数据格式

数据格式为:用户名\t 收入\t 支出\t 日期

示例数据:

zhangsan 6000 0 2016-05-01lisi 2000 0 2016-05-01lisi 0 100 2016-05-01zhangsan 3000 0 2016-05-01wangwu 9000 0 2016-05-01wangwu 0 200 2016-05-01zhangsan 200 400 2016-05-01

实验目标

  • 计算每个用户的收入、支出及利润情况。
  • 按利润降序(若利润相同则按收入降序)输出结果。
  • 实验说明

  • WritableComparable接口

    • Writable接口用于实现序列化协议,MapReduce中的键值类型都需实现该接口。
    • WritableComparable接口不仅实现序列化,还需支持比较操作,用于排序。
  • MapReduce设计步骤

    • A. 定义自定义类TradeBean并实现必要接口。
    • B. Map阶段处理数据,提取用户信息并写入Value。
    • C. Reduce阶段汇总用户数据并输出结果。
  • 2. 实验内容2:Top N求解

    数据集

    文件中每行包含一个整数值,例如:

    4567891234567896661500

    实验目标

    从多个文件中读取数据,找出所有数值中的前三个最大值,按降序输出。

    实验思路

    • 在Mapper阶段提取数据并存储。
    • Combiner阶段对当前数据块预先获取前三个最大值。
    • Reduce阶段对所有数据集中的小数据集进行汇总,得到最终的Top3结果。

    二、实现过程

    1. IntelliJ IDEA创建Maven工程

    项目结构如下:

    ├── src│   ├── main│   │   ├── java│   │   │   ├── subExp1│   │   │   │   ├── TradeBean.java│   │   │   │   ├── TradeMapper.java│   │   │   │   ├── TradeReducer.java│   │   │   │   └── TradeJobMain.java│   │   │   └── subExp2│   │   │       ├── TopMapper.java│   │   │       ├── TopCombiner.java│   │   │       ├── TopReducer.java│   │   │       └── TopJobMain.java│   └── test│       └── java│           └── subExp1│               └── TradeBeanTest.java

    2. 代码实现

    子实验1:TradeBean类

    package subExp1;import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;public class TradeBean implements WritableComparable
    { private String username; private int income; private int outcome; private int profit; @Override public String toString() { return username + "\t" + income + "\t" + outcome + "\t" + profit; } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(username); dataOutput.writeInt(income); dataOutput.writeInt(outcome); dataOutput.writeInt(profit); } @Override public void readFields(DataInput dataInput) throws IOException { username = dataInput.readUTF(); income = dataInput.readInt(); outcome = dataInput.readInt(); profit = dataInput.readInt(); } @Override public int compareTo(TradeBean bean) { int result = bean.profit - this.profit; if (result == 0) { return bean.income - this.income; } return result; }}

    子实验1:TradeMapper类

    package subExp1;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class TradeMapper extends Mapper
    { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] words = value.toString().split(" "); TradeBean bean = new TradeBean(); bean.setUsername(words[0]); bean.setIncome(Integer.parseInt(words[1])); bean.setOutcome(Integer.parseInt(words[2])); bean.setProfit(Integer.parseInt(words[1]) - Integer.parseInt(words[2])); context.write(new Text(words[0]), bean); }}

    子实验1:TradeReducer类

    package subExp1;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.util.TreeMap;public class TradeReducer extends Reducer
    { TreeMap
    treeMap = new TreeMap<>(); @Override protected void reduce(Text key, Iterable
    values, Context context) throws IOException, InterruptedException { int income = 0, outcome = 0, profit = 0; for (TradeBean bean : values) { income += bean.getIncome(); outcome += bean.getOutcome(); profit += bean.getProfit(); } TradeBean tradeBean = new TradeBean(); tradeBean.setUsername(key.toString()); tradeBean.setIncome(income); tradeBean.setOutcome(outcome); tradeBean.setProfit(profit); treeMap.put(tradeBean, ""); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { for (TradeBean bean : treeMap.keySet()) { context.write(bean, NullWritable.get()); } }}

    子实验2:TopMapper类

    package subExp2;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;import java.util.TreeSet;public class TopMapper extends Mapper
    { TreeSet
    set = new TreeSet<>(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { int num = Integer.parseInt(value.toString().trim()); set.add(-num); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { set.stream().limit(3).forEach(num -> { IntWritable outKey = new IntWritable(num); context.write(outKey, IntWritable.get()); }); }}

    子实验2:TopCombiner类

    package subExp2;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class TopCombiner extends Reducer
    { int count = 0; @Override protected void reduce(IntWritable key, Iterable
    values, Context context) throws IOException, InterruptedException { if (count <= 3) { IntWritable outKey = new IntWritable(-key.get()); context.write(outKey, values.iterator().next()); count++; } }}

    子实验2:TopReducer类

    package subExp2;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class TopReducer extends Reducer
    { int count = 0; @Override protected void reduce(IntWritable key, Iterable
    values, Context context) throws IOException, InterruptedException { if (count <= 3) { IntWritable outKey = new IntWritable(key.get()); context.write(outKey, NullWritable.get()); count++; } }}

    3. Maven打包

    如需了解Maven打包配置,可参考前一案例的 pom.xml 文件。

    4. Hadoop集群运行

    运行Hadoop集群时,需确保所有节点已正确配置并启动。运行完成后,可通过浏览器访问头节点查看结果。

    转载地址:http://umnzz.baihongyu.com/

    你可能感兴趣的文章
    Mysql8.0的特性
    查看>>
    MySQL8修改密码报错ERROR 1819 (HY000): Your password does not satisfy the current policy requirements
    查看>>
    MySQL8修改密码的方法
    查看>>
    Mysql8在Centos上安装后忘记root密码如何重新设置
    查看>>
    Mysql8在Windows上离线安装时忘记root密码
    查看>>
    MySQL8找不到my.ini配置文件以及报sql_mode=only_full_group_by解决方案
    查看>>
    mysql8的安装与卸载
    查看>>
    MySQL8,体验不一样的安装方式!
    查看>>
    MySQL: Host '127.0.0.1' is not allowed to connect to this MySQL server
    查看>>
    Mysql: 对换(替换)两条记录的同一个字段值
    查看>>
    mysql:Can‘t connect to local MySQL server through socket ‘/var/run/mysqld/mysqld.sock‘解决方法
    查看>>
    MYSQL:基础——3N范式的表结构设计
    查看>>
    MYSQL:基础——触发器
    查看>>
    Mysql:连接报错“closing inbound before receiving peer‘s close_notify”
    查看>>
    mysqlbinlog报错unknown variable ‘default-character-set=utf8mb4‘
    查看>>
    mysqldump 参数--lock-tables浅析
    查看>>
    mysqldump 导出中文乱码
    查看>>
    mysqldump 导出数据库中每张表的前n条
    查看>>
    mysqldump: Got error: 1044: Access denied for user ‘xx’@’xx’ to database ‘xx’ when using LOCK TABLES
    查看>>
    Mysqldump参数大全(参数来源于mysql5.5.19源码)
    查看>>