package com.lyz.hdfs.mr.ii;
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 倒排索引第二步Map Reduce程序,此处程序将所有的Map/Reduce/Runner程序放在一个类中
* @author liuyazhuang
*
*/
public class InverseIndexStepTwo {
/**
* 完成倒排索引第二步的mapper程序
*
* 从第一步MR程序中得到的输入信息为:
* hello–>a.txt 3
hello–>b.txt 2
hello–>c.txt 2
jerry–>a.txt 1
jerry–>b.txt 3
jerry–>c.txt 1
tom–>a.txt 2
tom–>b.txt 1
tom–>c.txt 1
* @author liuyazhuang
*
*/
public static class StepTwoMapper extends Mapper<LongWritable, Text, Text, Text>{
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] fields = StringUtils.split(line, “\t”);
String[] wordAndFileName = StringUtils.split(fields[0], “–>”);
String word = wordAndFileName[0];
String fileName = wordAndFileName[1];
long counter = Long.parseLong(fields[1]);
context.write(new Text(word), new Text(fileName + “–>” + counter));
}
}
/**
* 完成倒排索引第二步的Reducer程序
* 得到的输入信息格式为:
* <“hello”, {“a.txt->3”, “b.txt->2”, “c.txt->2”}>,
* 最终输出结果如下:
* hello c.txt–>2 b.txt–>2 a.txt–>3
jerry c.txt–>1 b.txt–>3 a.txt–>1
tom c.txt–>1 b.txt–>1 a.txt–>2
* @author liuyazhuang
*
*/
public static class StepTwoReducer extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String result = “”;
for(Text value : values){
result += value + ” “;
}
context.write(key, new Text(result));
}
}
//运行第一步的MR程序
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(InverseIndexStepTwo.class);
job.setMapperClass(StepTwoMapper.class);
job.setReducerClass(StepTwoReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(“D:/hadoop_data/ii/result/part-r-00000”));
FileOutputFormat.setOutputPath(job, new Path(“D:/hadoop_data/ii/result/final”));
job.waitForCompletion(true);
}
}