OutputFormat
可以说是MapReduce
处理过程的最后一步,由它负责把输出的信息输送到哪个文件或者哪个数据库,等等
OutputFormat
是MapReduce
输出的基类,所有实现MapReduce
输出都实现了OutputFormat
接口,下图为OutputFormat
的几个常见实现类(请忽略画横线的,那是我自定义的):
它的默认输出格式为TextOutputFormat
- 自定义一个类继承
FileOutputFormat
,实现RecordWriter
方法,实际上只需要调用RecordWriter
最终返回就可以了 - 改写
RecordWriter
方法,需要实现write
和close
方法
需求:过滤输入的log
日志,==包含atguigu的网站输出到atguigu.log文件,其他的输出到other.log文件==
因为该案例只输出字符串,所以Mapper
的参数应该这样写,map
方法只需要把Text
写到context
就好了:
public class LogMapper extends Mapper<LongWritable, Text,Text, NullWritable>
同理,Reducer
应该这样定义:
public class LogReducer extends Reducer<Text, NullWritable,Text,NullWritable>
因为FileOutputFormat
需要加两个参数,它对应最终输出的类型,所以:
public class LogOutputFormat extends FileOutputFormat<Text, NullWritable>
同理,RecordWriter
的参数:
public class LogRecordWriter extends RecordWriter<Text, NullWritable>
Mapper
:
package com.wzq.mapreduce.outputformat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class LogMapper extends Mapper<LongWritable, Text,Text, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//只需要写到context里就好了,什么都不需要做
context.write(value,NullWritable.get());
}
}
Reducer
:
package com.wzq.mapreduce.outputformat;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class LogReducer extends Reducer<Text, NullWritable,Text,NullWritable> {
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
//为了防止重复的数据,需要使用for遍历写
for (NullWritable value : values) {
context.write(key,NullWritable.get());
}
}
}
OutputFormat
:
package com.wzq.mapreduce.outputformat;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class LogOutputFormat extends FileOutputFormat<Text, NullWritable> {
@Override
public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
//创建一个LogRecordWriter,把job传递给它,然后返回就好了
LogRecordWriter lrw = new LogRecordWriter(job);
return lrw;
}
}
RecordWriter
:
package com.wzq.mapreduce.outputformat;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.IOException;
public class LogRecordWriter extends RecordWriter<Text, NullWritable> {
//两个流
private FSDataOutputStream atguiguOut;
private FSDataOutputStream otherOut;
public LogRecordWriter(TaskAttemptContext job) {
try {
//在这里创建FileSystem,这里的Configuration应该是job的Configuration
FileSystem fs = FileSystem.get(job.getConfiguration());
//设置atguigu的输出路径
atguiguOut = fs.create(new Path("D:\\BigData_workspace\\output\\logOutput\\atguigu.log"));
//设置其他的输出路径
otherOut = fs.create(new Path("D:\\BigData_workspace\\output\\logOutput\\other.log"));
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void write(Text key, NullWritable value) throws IOException, InterruptedException {
String log = key.toString();
//判断是否包含atguigu
if (log.contains("atguigu")) {
//如果包含则使用atguigu流
atguiguOut.writeBytes(log + "\n");
} else {
otherOut.writeBytes(log + "\n");
}
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
//关闭两个流
IOUtils.closeStream(atguiguOut);
IOUtils.closeStream(otherOut);
}
}
Driver
:
package com.wzq.mapreduce.outputformat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class LogDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
//1、获取Job
Job job = Job.getInstance(new Configuration());
//2、设置jar包路径
job.setJarByClass(LogDriver.class);
//3、关联Mapper和Reducer
job.setMapperClass(LogMapper.class);
job.setReducerClass(LogReducer.class);
//4、设置Map输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
//5、设置最终输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//关联OutPutFormat
job.setOutputFormatClass(LogOutputFormat.class);
//6、设置输入输出路径
FileInputFormat.setInputPaths(job, new Path("D:\\BigData_workspace\\input\\inputoutputformat"));
//因为mapreduce要输出计算成功信息,所以需要设置一个路径
FileOutputFormat.setOutputPath(job, new Path("D:\\BigData_workspace\\output\\logOutput\\info"));
//7、提交Job
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
测试: