本文共 6253 字,大约阅读时间需要 20 分钟。
本实验分为两个部分,分别针对MapReduce的核心组件实现和高级功能进行实践。
数据格式为:用户名\t 收入\t 支出\t 日期
示例数据:
zhangsan 6000 0 2016-05-01lisi 2000 0 2016-05-01lisi 0 100 2016-05-01zhangsan 3000 0 2016-05-01wangwu 9000 0 2016-05-01wangwu 0 200 2016-05-01zhangsan 200 400 2016-05-01
WritableComparable接口
MapReduce设计步骤
TradeBean
并实现必要接口。文件中每行包含一个整数值,例如:
4567891234567896661500
从多个文件中读取数据,找出所有数值中的前三个最大值,按降序输出。
项目结构如下:
├── src│ ├── main│ │ ├── java│ │ │ ├── subExp1│ │ │ │ ├── TradeBean.java│ │ │ │ ├── TradeMapper.java│ │ │ │ ├── TradeReducer.java│ │ │ │ └── TradeJobMain.java│ │ │ └── subExp2│ │ │ ├── TopMapper.java│ │ │ ├── TopCombiner.java│ │ │ ├── TopReducer.java│ │ │ └── TopJobMain.java│ └── test│ └── java│ └── subExp1│ └── TradeBeanTest.java
package subExp1;import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;public class TradeBean implements WritableComparable{ private String username; private int income; private int outcome; private int profit; @Override public String toString() { return username + "\t" + income + "\t" + outcome + "\t" + profit; } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(username); dataOutput.writeInt(income); dataOutput.writeInt(outcome); dataOutput.writeInt(profit); } @Override public void readFields(DataInput dataInput) throws IOException { username = dataInput.readUTF(); income = dataInput.readInt(); outcome = dataInput.readInt(); profit = dataInput.readInt(); } @Override public int compareTo(TradeBean bean) { int result = bean.profit - this.profit; if (result == 0) { return bean.income - this.income; } return result; }}
package subExp1;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class TradeMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] words = value.toString().split(" "); TradeBean bean = new TradeBean(); bean.setUsername(words[0]); bean.setIncome(Integer.parseInt(words[1])); bean.setOutcome(Integer.parseInt(words[2])); bean.setProfit(Integer.parseInt(words[1]) - Integer.parseInt(words[2])); context.write(new Text(words[0]), bean); }}
package subExp1;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.util.TreeMap;public class TradeReducer extends Reducer{ TreeMap treeMap = new TreeMap<>(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int income = 0, outcome = 0, profit = 0; for (TradeBean bean : values) { income += bean.getIncome(); outcome += bean.getOutcome(); profit += bean.getProfit(); } TradeBean tradeBean = new TradeBean(); tradeBean.setUsername(key.toString()); tradeBean.setIncome(income); tradeBean.setOutcome(outcome); tradeBean.setProfit(profit); treeMap.put(tradeBean, ""); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { for (TradeBean bean : treeMap.keySet()) { context.write(bean, NullWritable.get()); } }}
package subExp2;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;import java.util.TreeSet;public class TopMapper extends Mapper{ TreeSet set = new TreeSet<>(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { int num = Integer.parseInt(value.toString().trim()); set.add(-num); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { set.stream().limit(3).forEach(num -> { IntWritable outKey = new IntWritable(num); context.write(outKey, IntWritable.get()); }); }}
package subExp2;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class TopCombiner extends Reducer{ int count = 0; @Override protected void reduce(IntWritable key, Iterable values, Context context) throws IOException, InterruptedException { if (count <= 3) { IntWritable outKey = new IntWritable(-key.get()); context.write(outKey, values.iterator().next()); count++; } }}
package subExp2;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class TopReducer extends Reducer{ int count = 0; @Override protected void reduce(IntWritable key, Iterable values, Context context) throws IOException, InterruptedException { if (count <= 3) { IntWritable outKey = new IntWritable(key.get()); context.write(outKey, NullWritable.get()); count++; } }}
如需了解Maven打包配置,可参考前一案例的 pom.xml 文件。
运行Hadoop集群时,需确保所有节点已正确配置并启动。运行完成后,可通过浏览器访问头节点查看结果。
转载地址:http://umnzz.baihongyu.com/