這篇文章主要介紹了hadoop中mapreduce如何自定義InputFormat,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
網(wǎng)站建設(shè)哪家好,找成都創(chuàng)新互聯(lián)公司!專注于網(wǎng)頁設(shè)計、網(wǎng)站建設(shè)、微信開發(fā)、小程序制作、集團企業(yè)網(wǎng)站建設(shè)等服務(wù)項目。為回饋新老客戶創(chuàng)新互聯(lián)還提供了石鼓免費建站歡迎大家使用!
首先我們要先定義一個類繼承FileInputFormat,并重寫createRecordReader方法返回RecordReader,然后定義一個類繼承RecordReader,createRecordReader方法返回也就是我們定義的RecordReader的子類的對象。
代碼如下
public class TrackInputFormat extends FileInputFormat<LongWritable, Text> { @Override public RecordReader<LongWritable, Text> createRecordReader(InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException { // TODO Auto-generated method stub return new TrackRecordReader(); } }
package input; import java.io.IOException; import java.io.InputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.log4j.Logger; /** * Treats keys as offset in file and value as line. * * @deprecated Use * {@link org.apache.hadoop.mapreduce.lib.input.LineRecordReader} * instead. */ public class TrackRecordReader extends RecordReader<LongWritable, Text> { Logger logger = Logger.getLogger(TrackRecordReader.class.getName()); private CompressionCodecFactory compressionCodecs = null; private long start; private long pos; private long end; private NewLineReader in; private int maxLineLength; private LongWritable key = null; private Text value = null; // ---------------------- // 行分隔符,即一條記錄的分隔符 private byte[] separator = "]@\n".getBytes(); // -------------------- public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { FileSplit split = (FileSplit) genericSplit; Configuration job = context.getConfiguration(); //mapreduce.input.linerecordreader.line.maxlength this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",Integer.MAX_VALUE); start = split.getStart(); end = start + split.getLength(); final Path file = split.getPath(); //logger.info("path========================="+file.toString()); compressionCodecs = new CompressionCodecFactory(job); final CompressionCodec codec = compressionCodecs.getCodec(file); FileSystem fs = file.getFileSystem(job); FSDataInputStream fileIn = fs.open(split.getPath()); boolean skipFirstLine = false; //logger.info("codec========================="+codec); if (codec != null) { in = new NewLineReader(codec.createInputStream(fileIn), job); end = Long.MAX_VALUE; } else { if (start != 0) { skipFirstLine = true; this.start -= separator.length;// // --start; fileIn.seek(start); } in = new NewLineReader(fileIn, job); } if (skipFirstLine) { // skip first line and re-establish "start". start += in.readLine(new Text(), 0, (int) Math.min((long) Integer.MAX_VALUE, end - start)); } this.pos = start; /*if (skipFirstLine) { int newSize = in.readLine(new Text(), 0, (int) Math.min( (long) Integer.MAX_VALUE, end - start)); if(newSize > 0){ start += newSize; } }*/ } public boolean nextKeyValue() throws IOException { if (key == null) { key = new LongWritable(); } key.set(pos); if (value == null) { value = new Text(); } int newSize = 0; while (pos < end) { newSize = in.readLine(value, maxLineLength, Math.max((int) Math.min(Integer.MAX_VALUE, end - pos), maxLineLength)); if (newSize == 0) { break; } pos += newSize; if (newSize < maxLineLength) { break; } } if (newSize == 0) { //讀取下一個buffer key = null; value = null; return false; } else { //讀同一個buffer的下一個記錄 return true; } } @Override public LongWritable getCurrentKey() { return key; } @Override public Text getCurrentValue() { return value; } /** * Get the progress within the split */ public float getProgress() { if (start == end) { return 0.0f; } else { return Math.min(1.0f, (pos - start) / (float) (end - start)); } } public synchronized void close() throws IOException { if (in != null) { in.close(); } } public class NewLineReader { private static final int DEFAULT_BUFFER_SIZE = 256 * 1024* 1024; private int bufferSize = DEFAULT_BUFFER_SIZE; private InputStream in; private byte[] buffer; private int bufferLength = 0; private int bufferPosn = 0; public NewLineReader(InputStream in) { this(in, DEFAULT_BUFFER_SIZE); } public NewLineReader(InputStream in, int bufferSize) { this.in = in; this.bufferSize = bufferSize; this.buffer = new byte[this.bufferSize]; } public NewLineReader(InputStream in, Configuration conf) throws IOException { this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE)); } public void close() throws IOException { in.close(); } public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException { str.clear(); Text record = new Text(); int txtLength = 0; long bytesConsumed = 0L; boolean newline = false; int sepPosn = 0; do { // 已經(jīng)讀到buffer的末尾了,讀下一個buffer if (this.bufferPosn >= this.bufferLength) { bufferPosn = 0; bufferLength = in.read(buffer); // 讀到文件末尾了,則跳出,進行下一個文件的讀取 if (bufferLength <= 0) { break; } } int startPosn = this.bufferPosn; for (; bufferPosn < bufferLength; bufferPosn++) { // 處理上一個buffer的尾巴被切成了兩半的分隔符(如果分隔符中重復(fù)字符過多在這里會有問題) if (sepPosn > 0 && buffer[bufferPosn] != separator[sepPosn]) { sepPosn = 0; } // 遇到行分隔符的第一個字符 if (buffer[bufferPosn] == separator[sepPosn]) { bufferPosn++; int i = 0; // 判斷接下來的字符是否也是行分隔符中的字符 for (++sepPosn; sepPosn < separator.length; i++, sepPosn++) { // buffer的最后剛好是分隔符,且分隔符被不幸地切成了兩半 if (bufferPosn + i >= bufferLength) { bufferPosn += i - 1; break; } // 一旦其中有一個字符不相同,就判定為不是分隔符 if (this.buffer[this.bufferPosn + i] != separator[sepPosn]) { sepPosn = 0; break; } } // 的確遇到了行分隔符 if (sepPosn == separator.length) { bufferPosn += i; newline = true; sepPosn = 0; break; } } } int readLength = this.bufferPosn - startPosn; bytesConsumed += readLength; // 行分隔符不放入塊中 if (readLength > maxLineLength - txtLength) { readLength = maxLineLength - txtLength; } if (readLength > 0) { record.append(this.buffer, startPosn, readLength); txtLength += readLength; // 去掉記錄的分隔符 if (newline) { str.set(record.getBytes(), 0, record.getLength() - separator.length); } } } while (!newline && (bytesConsumed < maxBytesToConsume)); if (bytesConsumed > (long) Integer.MAX_VALUE) { throw new IOException("Too many bytes before newline: " + bytesConsumed); } return (int) bytesConsumed; } public int readLine(Text str, int maxLineLength) throws IOException { return readLine(str, maxLineLength, Integer.MAX_VALUE); } public int readLine(Text str) throws IOException { return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE); } } }
private byte[] separator = "]@\n".getBytes();
感謝你能夠認真閱讀完這篇文章,希望小編分享的“hadoop中mapreduce如何自定義InputFormat”這篇文章對大家有幫助,同時也希望大家多多支持創(chuàng)新互聯(lián),關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,更多相關(guān)知識等著你來學(xué)習(xí)!
網(wǎng)頁名稱:hadoop中mapreduce如何自定義InputFormat
文章分享:http://www.rwnh.cn/article4/jeesoe.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供品牌網(wǎng)站設(shè)計、定制開發(fā)、響應(yīng)式網(wǎng)站、商城網(wǎng)站、靜態(tài)網(wǎng)站、小程序開發(fā)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)