本文共 6169 字,大约阅读时间需要 20 分钟。
Hadoop0.20.2中无法使用MultipleOutputFormat,多文件输出这个方法。尽管0.19.2中的方法老的方法org.apache.hadoop.mapred.lib.MultipleOutputFormat还是可以继续在0.20.2中使用,但是org.apache.hadoop.mapred下的方法都是标记为“已过时”,在hadoop下个版本中可能就不能使用了。hadoop 0.20.2中是推荐使用Configuration替换JobConf,而这个老的方法org.apache.hadoop.mapred.lib.MultipleOutputFormat中还是使用的JobConf,就是说还没有新的可替换API。
此外hadoop 0.20.2还只是一个中间版本,并不是所有API都升级到最新了,没有提供的API只能自己写。
重写MultipleOutputFormat需要2个类:
LineRecordWriter
MultipleOutputFormat
PartitionByFilenameOutputFormat是实验中需要自定义的每个文件各自输出结果
LineRecordWriter:
package cn.xmu.dm;import java.io.DataOutputStream;import java.io.IOException;import java.io.UnsupportedEncodingException;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;public class LineRecordWriterextends RecordWriter { private static final String utf8 = "UTF-8"; protected DataOutputStream out; private final byte[] keyValueSeparator; public LineRecordWriter(DataOutputStream out, String keyValueSeparator) { this.out = out; try { this.keyValueSeparator = keyValueSeparator.getBytes(utf8); } catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can't find " + utf8 + " encoding"); } } public LineRecordWriter(DataOutputStream out) { this(out, "/t"); } private void writeObject(Object o) throws IOException { if (o instanceof Text) { Text to = (Text) o; out.write(to.getBytes(), 0, to.getLength()); } else { out.write(o.toString().getBytes(utf8)); } } public synchronized void write(K key, V value) throws IOException { boolean nullKey = key == null || key instanceof NullWritable; boolean nullValue = value == null || value instanceof NullWritable; if (nullKey && nullValue) { return; } if (!nullKey) { writeObject(key); } if (!(nullKey || nullValue)) { out.write(keyValueSeparator); } if (!nullValue) { writeObject(value); } out.write("\r\n".getBytes()); } public synchronized void close(TaskAttemptContext context) throws IOException { out.close(); }}
MultipleOutputFormat:
package cn.xmu.dm;import java.io.DataOutputStream;import java.io.IOException;import java.util.HashMap;import java.util.Iterator;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Writable;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.compress.CompressionCodec;import org.apache.hadoop.io.compress.GzipCodec;import org.apache.hadoop.mapreduce.OutputCommitter;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.ReflectionUtils;public abstract class MultipleOutputFormat, V extends Writable> extends FileOutputFormat { private MultiRecordWriter writer = null; public RecordWriter getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { if (writer == null) { writer = new MultiRecordWriter(job, getTaskOutputPath(job)); } return writer; } private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException { Path workPath = null; OutputCommitter committer = super.getOutputCommitter(conf); if (committer instanceof FileOutputCommitter) { workPath = ((FileOutputCommitter) committer).getWorkPath(); } else { Path outputPath = super.getOutputPath(conf); if (outputPath == null) { throw new IOException("Undefined job output-path"); } workPath = outputPath; } return workPath; } protected abstract String generateFileNameForKeyValue(K key, V value, Configuration conf); public class MultiRecordWriter extends RecordWriter { private HashMap > recordWriters = null; private TaskAttemptContext job = null; private Path workPath = null; public MultiRecordWriter(TaskAttemptContext job, Path workPath) { super(); this.job = job; this.workPath = workPath; recordWriters = new HashMap >(); } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { Iterator > values = this.recordWriters.values().iterator(); while (values.hasNext()) { values.next().close(context); } this.recordWriters.clear(); } @Override public void write(K key, V value) throws IOException, InterruptedException { String baseName = generateFileNameForKeyValue(key, value, job.getConfiguration()); RecordWriter rw = this.recordWriters.get(baseName); if (rw == null) { rw = getBaseRecordWriter(job, baseName); this.recordWriters.put(baseName, rw); } rw.write(key, value); } private RecordWriter getBaseRecordWriter(TaskAttemptContext job, String baseName) throws IOException, InterruptedException { Configuration conf = job.getConfiguration(); boolean isCompressed = getCompressOutput(job); String keyValueSeparator = ","; RecordWriter recordWriter = null; if (isCompressed) { Class codecClass = getOutputCompressorClass(job, GzipCodec.class); CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf); Path file = new Path(workPath, baseName + codec.getDefaultExtension()); FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false); recordWriter = new LineRecordWriter (new DataOutputStream(codec .createOutputStream(fileOut)), keyValueSeparator); } else { Path file = new Path(workPath, baseName); FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false); recordWriter = new LineRecordWriter (fileOut, keyValueSeparator); } return recordWriter; } }}
PartitionByFilenameOutputFormat:
package cn.xmu.dm;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.io.Text;public class PartitionByFilenameOutputFormat extends MultipleOutputFormat{ @Override protected String generateFileNameForKeyValue(Text key, Text value, Configuration conf) { return value.toString().substring(0, value.toString().indexOf("\t")); }}
转载地址:http://ktwob.baihongyu.com/