0 目标:
自定义OutputFormat, 指定输出文件名,并对输出的key-value在不同key下分别输出到不同目标文件
1 代码:
package outputformat; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; /** * 自定义OutputFormat, 指定输出文件名,并对输出的key-value在不同key下分别输出到不同目标文件 * 执行结果: * [root@master hadoop]# hadoop fs -lsr / Warning: $HADOOP_HOME is deprecated. -rw-r--r-- 3 zm supergroup 19 2014-12-02 04:17 /hello -rw-r--r-- 3 zm supergroup 19 2014-12-02 04:16 /hello2 drwxr-xr-x - zm supergroup 0 2014-12-04 00:17 /out -rw-r--r-- 3 zm supergroup 8 2014-12-04 00:17 /out/abc -rw-r--r-- 3 zm supergroup 11 2014-12-04 00:17 /out/other [root@master hadoop]# hadoop fs -text /out/other Warning: $HADOOP_HOME is deprecated. me 1 you 1 [root@master hadoop]# hadoop fs -text /out/abc Warning: $HADOOP_HOME is deprecated. hello 2 * @author zm * */ public class MySlefOutputFormatApp { private static final String INPUT_PATH = "hdfs://master:9000/hello"; private static final String OUT_PATH = "hdfs://master:9000/out"; private static final String OUT_FIE_NAME = "/abc"; private static final String OUT_FIE_NAME1 = "/other"; public static void main(String[] args) throws Exception{ // 定义conf Configuration conf = new Configuration(); final FileSystem filesystem = FileSystem.get(new URI(OUT_PATH), conf); if(filesystem.exists(new Path(OUT_PATH))){ filesystem.delete(new Path(OUT_PATH), true); } // 定义job final Job job = new Job(conf , MySlefOutputFormatApp.class.getSimpleName()); job.setJarByClass(MySlefOutputFormatApp.class); // 定义InputFormat FileInputFormat.setInputPaths(job, INPUT_PATH); job.setInputFormatClass(TextInputFormat.class); // 定义map job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); // 定义reduce job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); // 定义OutputFormat 自定义的MySelfTextOutputFormat中指定了输出文件名称 job.setOutputFormatClass(MySelfTextOutputFormat.class); job.waitForCompletion(true); } public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{ // protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,Text,LongWritable>.Context context) throws java.io.IOException ,InterruptedException { // final String line = value.toString(); final String[] splited = line.split("\t"); // for (String word : splited) { context.write(new Text(word), new LongWritable(1)); } }; } //map产生的<k,v>分发到reduce的过程称作shuffle public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{ protected void reduce(Text key, java.lang.Iterable<LongWritable> values, org.apache.hadoop.mapreduce.Reducer<Text,LongWritable,Text,LongWritable>.Context context) throws java.io.IOException ,InterruptedException { // long count = 0L; for (LongWritable times : values) { count += times.get(); } context.write(key, new LongWritable(count)); }; } // 自定义OutputFormat,定义好输出数据的FSDataOutputStream位置 public static class MySelfTextOutputFormat extends OutputFormat<Text, LongWritable>{ FSDataOutputStream outputStream = null;// 自定义OutputFormat中,通过FSDataOutputStream将数据写出到hdfs中 FSDataOutputStream outputStream1 = null; @Override public RecordWriter<Text, LongWritable> getRecordWriter( TaskAttemptContext context) throws IOException, InterruptedException { try { final FileSystem fileSystem = FileSystem.get(new URI(MySlefOutputFormatApp.OUT_PATH), context.getConfiguration()); //指定的是输出文件路径 final Path opath = new Path(MySlefOutputFormatApp.OUT_PATH+OUT_FIE_NAME); final Path opath1 = new Path(MySlefOutputFormatApp.OUT_PATH+OUT_FIE_NAME1); if(fileSystem.exists(opath)){ fileSystem.delete(opath,true); } if(fileSystem.exists(opath1)){ fileSystem.delete(opath1,true); } this.outputStream = fileSystem.create(opath, false); this.outputStream1 = fileSystem.create(opath1, false); } catch (URISyntaxException e) { e.printStackTrace(); } // 自定义 RecordWriter return new MySlefRecordWriter(outputStream,outputStream1); } @Override public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException { } @Override public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException { return new FileOutputCommitter(new Path(MySlefOutputFormatApp.OUT_PATH), context); } } // 自定义 RecordWriter public static class MySlefRecordWriter extends RecordWriter<Text, LongWritable>{ FSDataOutputStream outputStream = null; FSDataOutputStream outputStream1 = null; public MySlefRecordWriter(FSDataOutputStream outputStream,FSDataOutputStream outputStream1) { this.outputStream = outputStream; this.outputStream1 = outputStream1; } @Override public void write(Text key, LongWritable value) throws IOException, InterruptedException { if(key.toString().equals("hello")){ outToHdfs(outputStream,key, value); }else{ outToHdfs(outputStream1,key, value); } } private void outToHdfs(FSDataOutputStream outputStream,Text key, LongWritable value) throws IOException { outputStream.writeBytes(key.toString()); outputStream.writeBytes("\t"); //this.outputStream.writeLong(value.get()); 这种方式输出的话,在hdfs中是不会正常显示的 需要用如下方式才可 outputStream.writeBytes(value.toString()); outputStream.writeBytes("\n"); } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { this.outputStream.close(); } } }
2 说明:
MySelfTextOutputFormat 指定输出文件名路径并创建,后和outputStream关联上,并将outputStream传递给MySlefRecordWriter
MySlefRecordWriter.write(key,value)中内做业务判断,将每次传来的key做判断,将key,value通过outputStream输出到不同文件中
相关推荐
MapReduce之自定义 OutPutFormat,通过一个案例,实现自定义的一个OutPutFormat,来更加的深刻的理解MR的过程
主要介绍了java 中 自定义OutputFormat的实例详解的相关资料,这里提供实例帮助大家学习理解这部分内容,希望通过本文能帮助到大家,需要的朋友可以参考下
1.使用场景 2.自定义 OutputFormat步骤 3.实际需求
自定义inputFormat&&outputFormat1
结合案例讲解mr重要知识点1.1 多表连接1.2 mr各组件之间数据传递1.3 mr中压缩设置1.4 多个job之间有序执行1.5 自定义outputFormat
MapReduce自定义OutputFormat和RecordWriter实现 Pig自定义LoadFunc加载和解析Apache HTTP日志事件 Pig的自定义EvalFunc使用MaxMind GEO API将IP地址转换为位置 另外,请查看,了解如何实现MapReduce联接。 包装库 ...
Hadoop常常被用作大型数据处理生态系统中的一部分。它的优势在于能够批量地处理大量数据,并将结果以最好的方式与其他系统相集成。...OutputFormat将Map/Reduce作业的输出结果转换为其他应用程序可读的方式,从而
形式呈现器目录灵感贡献贡献者执照 介绍 ... 特征用JSON渲染表单支持与自定义组件集成使用updateForm方法支持批量更新表单数据支持setOptions方法,动态更改选择选项内容支持inputFormat , outputFormat , trim以处理
它功能完整且有详细的功能自定义。 基于LGPL协议开源,.NET 2.0 C#源代码)它的思路清淅,所以非常容易就上手. 几个注意点: 图片的保存路径设置:RenderedImagePath属性中设置,程序对该文件夹应该是有写和修改权限...
它功能完整且有详细的功能自定义。 基于LGPL协议开源,.NET 2.0 C#源代码)它的思路清淅,所以非常容易就上手. 几个注意点: 图片的保存路径设置:RenderedImagePath属性中设置,程序对该文件夹应该是有写和修改权限...
TensorFlow生态系统 该存储库包含将TensorFlow与其他开源框架集成的示例。... -Hadoop MapReduce和Spark的TFRecord文件InputFormat / OutputFormat。 -Spark TensorFlow连接器 -Python软件包,可帮助用户使用TensorF
或者,使用outputFormat config选项创建时间表的可打印PDF版本。 所有人的无障碍环境 gtfs-to-html正确设置时间表的格式,以确保它们可供屏幕阅读器访问并符合WCAG 2.0。 内置的移动响应能力 内置样式使gtfs-to-...
它功能完整且有详细的功能自定义。 基于LGPL协议开源,.NET 2.0 C#源代码)它的思路清淅,所以非常容易就上手.几个注意点: 图片的保存路径设置:RenderedImagePath属性中设置,程序对该文件夹应该是有写和修改...