博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Hadoop0.20+ custom MultipleOutputFormat
阅读量:2395 次
发布时间:2019-05-10

本文共 6169 字,大约阅读时间需要 20 分钟。

Hadoop0.20.2中无法使用MultipleOutputFormat,多文件输出这个方法。尽管0.19.2中的方法老的方法org.apache.hadoop.mapred.lib.MultipleOutputFormat还是可以继续在0.20.2中使用,但是org.apache.hadoop.mapred下的方法都是标记为“已过时”,在hadoop下个版本中可能就不能使用了。hadoop 0.20.2中是推荐使用Configuration替换JobConf,而这个老的方法org.apache.hadoop.mapred.lib.MultipleOutputFormat中还是使用的JobConf,就是说还没有新的可替换API。

此外hadoop 0.20.2还只是一个中间版本,并不是所有API都升级到最新了,没有提供的API只能自己写。

 

重写MultipleOutputFormat需要2个类:

LineRecordWriter

MultipleOutputFormat

 

PartitionByFilenameOutputFormat是实验中需要自定义的每个文件各自输出结果

 

LineRecordWriter:

 

package cn.xmu.dm;import java.io.DataOutputStream;import java.io.IOException;import java.io.UnsupportedEncodingException;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;public class LineRecordWriter
extends RecordWriter
{ private static final String utf8 = "UTF-8"; protected DataOutputStream out; private final byte[] keyValueSeparator; public LineRecordWriter(DataOutputStream out, String keyValueSeparator) { this.out = out; try { this.keyValueSeparator = keyValueSeparator.getBytes(utf8); } catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can't find " + utf8 + " encoding"); } } public LineRecordWriter(DataOutputStream out) { this(out, "/t"); } private void writeObject(Object o) throws IOException { if (o instanceof Text) { Text to = (Text) o; out.write(to.getBytes(), 0, to.getLength()); } else { out.write(o.toString().getBytes(utf8)); } } public synchronized void write(K key, V value) throws IOException { boolean nullKey = key == null || key instanceof NullWritable; boolean nullValue = value == null || value instanceof NullWritable; if (nullKey && nullValue) { return; } if (!nullKey) { writeObject(key); } if (!(nullKey || nullValue)) { out.write(keyValueSeparator); } if (!nullValue) { writeObject(value); } out.write("\r\n".getBytes()); } public synchronized void close(TaskAttemptContext context) throws IOException { out.close(); }}

 

 

MultipleOutputFormat:

 

 

package cn.xmu.dm;import java.io.DataOutputStream;import java.io.IOException;import java.util.HashMap;import java.util.Iterator;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Writable;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.compress.CompressionCodec;import org.apache.hadoop.io.compress.GzipCodec;import org.apache.hadoop.mapreduce.OutputCommitter;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.ReflectionUtils;public abstract class MultipleOutputFormat
, V extends Writable> extends FileOutputFormat
{ private MultiRecordWriter writer = null; public RecordWriter
getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { if (writer == null) { writer = new MultiRecordWriter(job, getTaskOutputPath(job)); } return writer; } private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException { Path workPath = null; OutputCommitter committer = super.getOutputCommitter(conf); if (committer instanceof FileOutputCommitter) { workPath = ((FileOutputCommitter) committer).getWorkPath(); } else { Path outputPath = super.getOutputPath(conf); if (outputPath == null) { throw new IOException("Undefined job output-path"); } workPath = outputPath; } return workPath; } protected abstract String generateFileNameForKeyValue(K key, V value, Configuration conf); public class MultiRecordWriter extends RecordWriter
{ private HashMap
> recordWriters = null; private TaskAttemptContext job = null; private Path workPath = null; public MultiRecordWriter(TaskAttemptContext job, Path workPath) { super(); this.job = job; this.workPath = workPath; recordWriters = new HashMap
>(); } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { Iterator
> values = this.recordWriters.values().iterator(); while (values.hasNext()) { values.next().close(context); } this.recordWriters.clear(); } @Override public void write(K key, V value) throws IOException, InterruptedException { String baseName = generateFileNameForKeyValue(key, value, job.getConfiguration()); RecordWriter
rw = this.recordWriters.get(baseName); if (rw == null) { rw = getBaseRecordWriter(job, baseName); this.recordWriters.put(baseName, rw); } rw.write(key, value); } private RecordWriter
getBaseRecordWriter(TaskAttemptContext job, String baseName) throws IOException, InterruptedException { Configuration conf = job.getConfiguration(); boolean isCompressed = getCompressOutput(job); String keyValueSeparator = ","; RecordWriter
recordWriter = null; if (isCompressed) { Class
codecClass = getOutputCompressorClass(job, GzipCodec.class); CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf); Path file = new Path(workPath, baseName + codec.getDefaultExtension()); FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false); recordWriter = new LineRecordWriter
(new DataOutputStream(codec .createOutputStream(fileOut)), keyValueSeparator); } else { Path file = new Path(workPath, baseName); FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false); recordWriter = new LineRecordWriter
(fileOut, keyValueSeparator); } return recordWriter; } }}

 

 

PartitionByFilenameOutputFormat:

 

package cn.xmu.dm;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.io.Text;public class PartitionByFilenameOutputFormat extends MultipleOutputFormat
{ @Override protected String generateFileNameForKeyValue(Text key, Text value, Configuration conf) { return value.toString().substring(0, value.toString().indexOf("\t")); }}

 

转载地址:http://ktwob.baihongyu.com/

你可能感兴趣的文章
人工智能资料库:第23辑(20170201)
查看>>
MongoDB-初体验
查看>>
TensorFlow中四种-Cross-Entropy-算法实现和应用
查看>>
不可不知的python陷阱
查看>>
进程管理工具--supervisor
查看>>
使用virtualenv在ubuntu上搭建python-3开发环境
查看>>
详解-Python-的-“==”-和-“is”
查看>>
Tensorflow-Python-API-翻译(array_ops)
查看>>
Tensorflow-Python-API-翻译(constant_op)
查看>>
Tensorflow-Python-API-翻译(framework)
查看>>
Tensorflow-Python-API-翻译(math_ops)(第二部分)
查看>>
Tensorflow-Python-API-翻译(math_ops)(第一部分)
查看>>
论文阅读---An-Artificial-Neural-Network-based-Stock-Trading-System-Using-T
查看>>
A-Paper-A-Day--#1-Convolutional-Sequence-to-Sequence-Learning
查看>>
7个很棒的-chatbot-应用场景
查看>>
标记问题:词性标注(POS)和命名实体识别(NER)
查看>>
标记问题:介绍
查看>>
标记问题:生成模型和噪声通道模型
查看>>
机器学习算法在文本分类中的应用综述
查看>>
利用-TensorFlow-构建卷积神经网络
查看>>