重庆分公司,新征程启航

为企业提供网站建设、域名注册、服务器等服务

OutputFormat在java中怎么实现自定义

本篇文章给大家分享的是有关OutputFormat在java 中怎么实现自定义,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。

昌图网站制作公司哪家好,找成都创新互联!从网页设计、网站建设、微信开发、APP开发、成都响应式网站建设公司等网站项目制作,到程序开发,运营维护。成都创新互联自2013年创立以来到现在10年的时间,我们拥有了丰富的建站经验和运维经验,来保证我们的工作的顺利进行。专注于网站建设就选成都创新互联

java 中 自定义OutputFormat

实例代码:

package com.ccse.hadoop.outputformat; 
 
import java.io.IOException; 
import java.net.URI; 
import java.net.URISyntaxException; 
import java.util.StringTokenizer; 
 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FSDataOutputStream; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.JobContext; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.OutputCommitter; 
import org.apache.hadoop.mapreduce.OutputFormat; 
import org.apache.hadoop.mapreduce.RecordWriter; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.TaskAttemptContext; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; 
 
 
public class MySelfOutputFormatApp { 
   
  public final static String INPUT_PATH = "hdfs://chaoren1:9000/mapinput"; 
  public final static String OUTPUT_PATH = "hdfs://chaoren1:9000/mapoutput"; 
  public final static String OUTPUT_FILENAME = "/abc"; 
   
  public static void main(String[] args) throws IOException, URISyntaxException,  
    ClassNotFoundException, InterruptedException { 
    Configuration conf = new Configuration(); 
    FileSystem fileSystem = FileSystem.get(new URI(OUTPUT_PATH), conf); 
    fileSystem.delete(new Path(OUTPUT_PATH), true); 
     
    Job job = new Job(conf, MySelfOutputFormatApp.class.getSimpleName()); 
    job.setJarByClass(MySelfOutputFormatApp.class); 
     
    FileInputFormat.setInputPaths(job, new Path(INPUT_PATH)); 
    job.setMapperClass(MyMapper.class); 
    job.setMapOutputKeyClass(Text.class); 
    job.setMapOutputValueClass(LongWritable.class); 
     
    job.setReducerClass(MyReducer.class); 
    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(LongWritable.class); 
    job.setOutputFormatClass(MyselfOutputFormat.class); 
     
    job.waitForCompletion(true); 
  } 
   
  public static class MyMapper extends Mapper { 
 
    private Text word = new Text(); 
    private LongWritable writable = new LongWritable(1); 
     
    @Override 
    protected void map(LongWritable key, Text value, 
        Mapper.Context context) 
        throws IOException, InterruptedException { 
      if (value != null) { 
        String line = value.toString(); 
        StringTokenizer tokenizer = new StringTokenizer(line); 
        while (tokenizer.hasMoreTokens()) { 
          word.set(tokenizer.nextToken()); 
          context.write(word, writable); 
        } 
      } 
    } 
     
  } 
   
  public static class MyReducer extends Reducer { 
 
    @Override 
    protected void reduce(Text key, Iterable values, 
        Reducer.Context context) 
        throws IOException, InterruptedException { 
      long sum = 0;  
      for (LongWritable value : values) { 
        sum += value.get(); 
      } 
      context.write(key, new LongWritable(sum)); 
    } 
  } 
 
  public static class MyselfOutputFormat extends OutputFormat { 
 
    private FSDataOutputStream outputStream = null; 
     
    @Override 
    public RecordWriter getRecordWriter( 
        TaskAttemptContext context) throws IOException, 
        InterruptedException { 
      try { 
        FileSystem fileSystem = FileSystem.get(new URI(MySelfOutputFormatApp.OUTPUT_PATH), context.getConfiguration()); 
        //指定文件的输出路径 
        final Path path = new Path(MySelfOutputFormatApp.OUTPUT_PATH  
                     + MySelfOutputFormatApp.OUTPUT_FILENAME); 
        this.outputStream = fileSystem.create(path, false); 
      } catch (URISyntaxException e) { 
        e.printStackTrace(); 
      } 
      return new MySelfRecordWriter(outputStream); 
    } 
 
    @Override 
    public void checkOutputSpecs(JobContext context) throws IOException, 
        InterruptedException { 
    } 
 
    @Override 
    public OutputCommitter getOutputCommitter(TaskAttemptContext context) 
        throws IOException, InterruptedException { 
      return new FileOutputCommitter(new Path(MySelfOutputFormatApp.OUTPUT_PATH), context); 
    } 
     
  } 
   
  public static class MySelfRecordWriter extends RecordWriter { 
 
    private FSDataOutputStream outputStream = null; 
     
    public MySelfRecordWriter(FSDataOutputStream outputStream) { 
      this.outputStream = outputStream; 
    } 
     
    @Override 
    public void write(Text key, LongWritable value) throws IOException, 
        InterruptedException { 
      this.outputStream.writeBytes(key.toString()); 
      this.outputStream.writeBytes("\t"); 
      this.outputStream.writeLong(value.get()); 
    } 
 
    @Override 
    public void close(TaskAttemptContext context) throws IOException, 
        InterruptedException { 
      this.outputStream.close(); 
    } 
     
  } 
   
} 

 2.OutputFormat是用于处理各种输出目的地的。

2.1 OutputFormat需要写出去的键值对,是来自于Reducer类,是通过RecordWriter获得的。

2.2 RecordWriter中的write(...)方法只有k和v,写到哪里去哪?这要通过单独传入OutputStream来处理。write就是把k和v写入到OutputStream中的。

2.3 RecordWriter类位于OutputFormat中的。因此,我们自定义的OutputFromat必须继承OutputFormat类型。那么,流对象必须在getRecordWriter(...)方法中获得。

以上就是OutputFormat在java 中怎么实现自定义,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注创新互联行业资讯频道。


分享名称:OutputFormat在java中怎么实现自定义
网站URL:http://cqcxhl.cn/article/jhdhhc.html

其他资讯

在线咨询
服务热线
服务热线:028-86922220
TOP