太原网站建设优化济宁百度推广价格
3.7.1Reduce Join
1、工作原理
Map端的主要工作:为来自不同表或文件的key/value对,打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。
Reduce端的主要工作:在Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在Map阶段已经达标)分开,最后进行合并就ok了。
3.7.2Reduce Join案例实操
1、需求
将商品信息表中数据根据商品pid合并到订单数据表中。
1)输入
order.txt
1001 01 1
1002 02 2
1003 03 3
1004 01 4
1005 02 5
1006 03 6
pd.txt
01 小米
02 华为
03 格力
2)输出
1001 小米 1
1001 小米 1
1002 华为 2
1002 华为 2
1003 格力 3
1003 格力 3
2、需求分析
通过将关联条件作为Map输出的key,将两表满足Join条件的数据并携带数据所来源的文件信息,发往同一个ReduceTask,在Reduce中进行数据的串联,如图4-20所示。
3、代码实现
1)创建商品和订合并后的Bean类
package com.cuiyf41.join;import org.apache.hadoop.io.Writable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class TableBean implements Writable {private String order_id; // 订单idprivate String p_id; // 产品idprivate int amount; // 产品数量private String pname; // 产品名称private String flag; // 表的标记public TableBean() {super();}public TableBean(String order_id, String p_id, int amount, String pname, String flag) {super();this.order_id = order_id;this.p_id = p_id;this.amount = amount;this.pname = pname;this.flag = flag;}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(order_id);out.writeUTF(p_id);out.writeInt(amount);out.writeUTF(pname);out.writeUTF(flag);}@Overridepublic void readFields(DataInput in) throws IOException {order_id = in.readUTF();p_id = in.readUTF();amount = in.readInt();pname = in.readUTF();flag = in.readUTF();}@Overridepublic String toString() {return order_id + "\t" + pname + "\t" + amount;}public String getOrder_id() {return order_id;}public void setOrder_id(String order_id) {this.order_id = order_id;}public String getP_id() {return p_id;}public void setP_id(String p_id) {this.p_id = p_id;}public int getAmount() {return amount;}public void setAmount(int amount) {this.amount = amount;}public String getPname() {return pname;}public void setPname(String pname) {this.pname = pname;}public String getFlag() {return flag;}public void setFlag(String flag) {this.flag = flag;}
}
2)编写TableMapper类
package com.cuiyf41.join;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.IOException;public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean> {String name;TableBean bean = new TableBean();Text k = new Text();@Overrideprotected void setup(Mapper<LongWritable, Text, Text, TableBean>.Context context) throws IOException, InterruptedException {// 1 获取输入文件切片FileSplit split = (FileSplit) context.getInputSplit();// 2 获取输入文件名称name = split.getPath().getName();}@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, TableBean>.Context context) throws IOException, InterruptedException {// 1 获取输入数据String line = value.toString();// 2切割String[] fields = line.split("\t");// 3 不同文件分别处理if (name.startsWith("order")) {// 订单表处理// 3.1 封装bean对象bean.setOrder_id(fields[0]);bean.setP_id(fields[1]);bean.setAmount(Integer.parseInt(fields[2]));bean.setPname("");bean.setFlag("order");k.set(fields[1]);} else {// 产品表处理// 3.2 封装bean对象bean.setP_id(fields[0]);bean.setPname(fields[1]);bean.setFlag("pd");bean.setAmount(0);bean.setOrder_id("");k.set(fields[0]);}// 4 写出context.write(k, bean);}
}
3)编写TableReducer类
package com.cuiyf41.join;import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;
import java.util.ArrayList;public class TableReducer extends Reducer<Text,TableBean, TableBean, NullWritable> {@Overrideprotected void reduce(Text key, Iterable<TableBean> values, Reducer<Text, TableBean, TableBean, NullWritable>.Context context) throws IOException, InterruptedException {// 1准备存储订单的集合ArrayList<TableBean> orderBeans = new ArrayList<>();// 2 准备bean对象TableBean pdBean = new TableBean();for (TableBean bean : values) {if ("order".equals(bean.getFlag())) {// 订单表// 拷贝传递过来的每条订单数据到集合中TableBean tmpBean = new TableBean();try {BeanUtils.copyProperties(tmpBean, bean);} catch (Exception e) {e.printStackTrace();}orderBeans.add(tmpBean);} else {// 产品表try {// 拷贝传递过来的产品表到内存中BeanUtils.copyProperties(pdBean, bean);} catch (Exception e) {e.printStackTrace();}}}// 3 表的拼接for(TableBean bean:orderBeans) {bean.setPname(pdBean.getPname());// 4 数据写出去context.write(bean, NullWritable.get());}}
}
4)编写TableDriver类
package com.cuiyf41.join;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class TableDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {// 0 根据自己电脑路径重新配置args = new String[]{"e:/input/inputtable","e:/output1"};// 1 获取配置信息,或者job对象实例Configuration conf = new Configuration();Job job = Job.getInstance(conf);// 2 指定本程序的jar包所在的本地路径job.setJarByClass(TableDriver.class);// 3 指定本业务job要使用的Mapper/Reducer业务类job.setMapperClass(TableMapper.class);job.setReducerClass(TableReducer.class);// 4 指定Mapper输出数据的kv类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(TableBean.class);// 5 指定最终输出的数据的kv类型job.setOutputKeyClass(TableBean.class);job.setOutputValueClass(NullWritable.class);// 6 指定job的输入原始文件所在目录Path input = new Path(args[0]);Path output = new Path(args[1]);// 如果输出路径存在,则进行删除FileSystem fs = FileSystem.get(conf);if (fs.exists(output)) {fs.delete(output,true);}FileInputFormat.setInputPaths(job, input);FileOutputFormat.setOutputPath(job, output);// 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行boolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}
}
4、测试
运行程序查看结果
1001 小米 1
1001 小米 1
1002 华为 2
1002 华为 2
1003 格力 3
1003 格力 3
5、总结
缺点:这种方式中,合并的操作是在Reduce阶段完成的,Reduce端的处理压力太大,Map节点的运算负载则很低,资源利用率不高,且在Reduce杰顿极易产生数据倾斜。
解决方案:Map端实现数据合并。
Mapper:// 1 获取输入数据
3.7.3Map Join
一、概述
1)使用场景
Map Join适用于一张表十分小、一张表很大的场景。
2)优点
思考:在Reduce端处理过多的表,非常容易产生数据倾斜。怎么办?
在Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据的压力,尽可能的减少数据倾斜。
3)具体办法:采用DistributedCache
(1)在Mapper的setup阶段,将文件读取到缓存集合中。
(2)在驱动函数中加载缓存。
// 缓存普通文件到Task运行节点。
job.addCacheFile(new URI("file://e:/cache/pd.txt"))
二、Map Join案例实操
1、需求
将商品信息表中数据根据商品pid合并到订单数据表中。
1)输入数据
order.txt
1001 01 1
1002 02 2
1003 03 3
1004 01 4
1005 02 5
1006 03 6
pd.txt
01 小米
02 华为
03 格力
2)输出数据
2、需求分析
MapJoin适用于关联表中有小表的情形。
3、实现代码
(1)先在驱动模块中添加缓存文件
package com.cuiyf41.mapjoin;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;public class DistributedCacheDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException {// 0 根据自己电脑路径重新配置args = new String[]{"e:/input/inputtable2", "e:/output1"};// 1 获取job信息Configuration configuration = new Configuration();Job job = Job.getInstance(configuration);// 2 设置加载jar包路径job.setJarByClass(DistributedCacheDriver.class);// 3 关联mapjob.setMapperClass(DistributedCacheMapper.class);// 4 设置最终输出数据类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);// 5 设置输入输出路径FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));// 6 加载缓存数据job.addCacheFile(new URI("file:///e:/input/inputcache/pd.txt"));// 7 Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0job.setNumReduceTasks(0);// 8 提交boolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}
}
(2)读取缓存的文件数据
package com.cuiyf41.mapjoin;import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;public class DistributedCacheMapper extends Mapper<LongWritable, Text, Text, NullWritable> {Map<String, String> pdMap = new HashMap<>();Text k = new Text();@Overrideprotected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {// 1 获取缓存的文件URI[] cacheFiles = context.getCacheFiles();String path = cacheFiles[0].getPath().toString();BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(path), "UTF-8"));String line;while(StringUtils.isNotEmpty(line = reader.readLine())){// 2 切割String[] fields = line.split("\t");// 3 缓存数据到集合pdMap.put(fields[0], fields[1]);}// 4 关流reader.close();}@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {// 1 获取一行String line = value.toString();// 2 截取String[] fields = line.split("\t");// 3 获取产品idString pId = fields[1];// 4 获取商品名称String pdName = pdMap.get(pId);// 5 拼接k.set(line + "\t"+ pdName);// 6 写出context.write(k, NullWritable.get());}
}