重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
本篇文章给大家分享的是有关OutputFormat在java 中怎么实现自定义,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。
昌图网站制作公司哪家好,找成都创新互联!从网页设计、网站建设、微信开发、APP开发、成都响应式网站建设公司等网站项目制作,到程序开发,运营维护。成都创新互联自2013年创立以来到现在10年的时间,我们拥有了丰富的建站经验和运维经验,来保证我们的工作的顺利进行。专注于网站建设就选成都创新互联。
java 中 自定义OutputFormat
实例代码:
package com.ccse.hadoop.outputformat; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; public class MySelfOutputFormatApp { public final static String INPUT_PATH = "hdfs://chaoren1:9000/mapinput"; public final static String OUTPUT_PATH = "hdfs://chaoren1:9000/mapoutput"; public final static String OUTPUT_FILENAME = "/abc"; public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); FileSystem fileSystem = FileSystem.get(new URI(OUTPUT_PATH), conf); fileSystem.delete(new Path(OUTPUT_PATH), true); Job job = new Job(conf, MySelfOutputFormatApp.class.getSimpleName()); job.setJarByClass(MySelfOutputFormatApp.class); FileInputFormat.setInputPaths(job, new Path(INPUT_PATH)); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); job.setOutputFormatClass(MyselfOutputFormat.class); job.waitForCompletion(true); } public static class MyMapper extends Mapper{ private Text word = new Text(); private LongWritable writable = new LongWritable(1); @Override protected void map(LongWritable key, Text value, Mapper .Context context) throws IOException, InterruptedException { if (value != null) { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); context.write(word, writable); } } } } public static class MyReducer extends Reducer { @Override protected void reduce(Text key, Iterable values, Reducer .Context context) throws IOException, InterruptedException { long sum = 0; for (LongWritable value : values) { sum += value.get(); } context.write(key, new LongWritable(sum)); } } public static class MyselfOutputFormat extends OutputFormat { private FSDataOutputStream outputStream = null; @Override public RecordWriter getRecordWriter( TaskAttemptContext context) throws IOException, InterruptedException { try { FileSystem fileSystem = FileSystem.get(new URI(MySelfOutputFormatApp.OUTPUT_PATH), context.getConfiguration()); //指定文件的输出路径 final Path path = new Path(MySelfOutputFormatApp.OUTPUT_PATH + MySelfOutputFormatApp.OUTPUT_FILENAME); this.outputStream = fileSystem.create(path, false); } catch (URISyntaxException e) { e.printStackTrace(); } return new MySelfRecordWriter(outputStream); } @Override public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException { } @Override public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException { return new FileOutputCommitter(new Path(MySelfOutputFormatApp.OUTPUT_PATH), context); } } public static class MySelfRecordWriter extends RecordWriter { private FSDataOutputStream outputStream = null; public MySelfRecordWriter(FSDataOutputStream outputStream) { this.outputStream = outputStream; } @Override public void write(Text key, LongWritable value) throws IOException, InterruptedException { this.outputStream.writeBytes(key.toString()); this.outputStream.writeBytes("\t"); this.outputStream.writeLong(value.get()); } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { this.outputStream.close(); } } }
2.OutputFormat是用于处理各种输出目的地的。
2.1 OutputFormat需要写出去的键值对,是来自于Reducer类,是通过RecordWriter获得的。
2.2 RecordWriter中的write(...)方法只有k和v,写到哪里去哪?这要通过单独传入OutputStream来处理。write就是把k和v写入到OutputStream中的。
2.3 RecordWriter类位于OutputFormat中的。因此,我们自定义的OutputFromat必须继承OutputFormat类型。那么,流对象必须在getRecordWriter(...)方法中获得。
以上就是OutputFormat在java 中怎么实现自定义,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注创新互联行业资讯频道。