中文字幕日韩精品一区二区免费_精品一区二区三区国产精品无卡在_国精品无码专区一区二区三区_国产αv三级中文在线

hadoop中mapreduce如何自定義InputFormat

這篇文章主要介紹了hadoop中mapreduce如何自定義InputFormat,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。

網(wǎng)站建設(shè)哪家好,找成都創(chuàng)新互聯(lián)公司!專注于網(wǎng)頁設(shè)計、網(wǎng)站建設(shè)、微信開發(fā)、小程序制作、集團企業(yè)網(wǎng)站建設(shè)等服務(wù)項目。為回饋新老客戶創(chuàng)新互聯(lián)還提供了石鼓免費建站歡迎大家使用!

    首先我們要先定義一個類繼承FileInputFormat,并重寫createRecordReader方法返回RecordReader,然后定義一個類繼承RecordReader,createRecordReader方法返回也就是我們定義的RecordReader的子類的對象。

代碼如下

public class TrackInputFormat extends FileInputFormat<LongWritable, Text> {

	@Override
	public RecordReader<LongWritable, Text> createRecordReader(InputSplit arg0,
			TaskAttemptContext arg1) throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		return new TrackRecordReader();
		
	}
}

package input;


import java.io.IOException;
import java.io.InputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.log4j.Logger;


/**
 * Treats keys as offset in file and value as line.
 * 
 * @deprecated Use
 *             {@link org.apache.hadoop.mapreduce.lib.input.LineRecordReader}
 *             instead.
 */
public class TrackRecordReader extends RecordReader<LongWritable, Text> {
	Logger logger = Logger.getLogger(TrackRecordReader.class.getName());
	private CompressionCodecFactory compressionCodecs = null;
	private long start;
	private long pos;
	private long end;
	private NewLineReader in;
	private int maxLineLength;
	private LongWritable key = null;
	private Text value = null;
	// ----------------------
	// 行分隔符,即一條記錄的分隔符
	private byte[] separator = "]@\n".getBytes();

	// --------------------

	public void initialize(InputSplit genericSplit, TaskAttemptContext context)
			throws IOException {
		FileSplit split = (FileSplit) genericSplit;
		Configuration job = context.getConfiguration();
		//mapreduce.input.linerecordreader.line.maxlength
		this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",Integer.MAX_VALUE);
		start = split.getStart();
		end = start + split.getLength();
		final Path file = split.getPath();
		//logger.info("path========================="+file.toString());	
		compressionCodecs = new CompressionCodecFactory(job);
		final CompressionCodec codec = compressionCodecs.getCodec(file);

		FileSystem fs = file.getFileSystem(job);
		FSDataInputStream fileIn = fs.open(split.getPath());
		boolean skipFirstLine = false;
		//logger.info("codec========================="+codec);	
		if (codec != null) {
			in = new NewLineReader(codec.createInputStream(fileIn), job);
			end = Long.MAX_VALUE;
		} else {
			if (start != 0) {
				skipFirstLine = true;
				this.start -= separator.length;//
				// --start;
				fileIn.seek(start);
			}
			in = new NewLineReader(fileIn, job);
		}
		if (skipFirstLine) { // skip first line and re-establish "start".
				start += in.readLine(new Text(), 0,
					(int) Math.min((long) Integer.MAX_VALUE, end - start));
		}
		this.pos = start;
		/*if (skipFirstLine) { 
			int newSize = in.readLine(new Text(), 0, (int) Math.min( (long) Integer.MAX_VALUE, end - start));
		    if(newSize > 0){
		    	start += newSize;
		    }
		}*/

	}

	public boolean nextKeyValue() throws IOException {
		if (key == null) {
			key = new LongWritable();
		}
		key.set(pos);
		if (value == null) {
			value = new Text();
		}
		int newSize = 0;
		while (pos < end) {
			newSize = in.readLine(value, maxLineLength,
					Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),
							maxLineLength));
			if (newSize == 0) {
				break;
			}
			pos += newSize;
			if (newSize < maxLineLength) {
				break;
			}
		}
		if (newSize == 0) {
			//讀取下一個buffer
			key = null;
			value = null;
			return false;
		} else {
			//讀同一個buffer的下一個記錄
			return true;
		}
	}

	@Override
	public LongWritable getCurrentKey() {
		return key;
	}

	@Override
	public Text getCurrentValue() {
		return value;
	}

	/**
	 * Get the progress within the split
	 */
	public float getProgress() {
		if (start == end) {
			return 0.0f;
		} else {
			return Math.min(1.0f, (pos - start) / (float) (end - start));
		}
	}

	public synchronized void close() throws IOException {
		if (in != null) {
			in.close();
		}
	}

	public class NewLineReader {
		private static final int DEFAULT_BUFFER_SIZE = 256 * 1024* 1024;
		private int bufferSize = DEFAULT_BUFFER_SIZE;
		private InputStream in;
		private byte[] buffer;
		private int bufferLength = 0;
		private int bufferPosn = 0;

		public NewLineReader(InputStream in) {
			this(in, DEFAULT_BUFFER_SIZE);
		}

		public NewLineReader(InputStream in, int bufferSize) {
			this.in = in;
			this.bufferSize = bufferSize;
			this.buffer = new byte[this.bufferSize];
		}

		public NewLineReader(InputStream in, Configuration conf)
				throws IOException {
			this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
		}

		public void close() throws IOException {
			in.close();
		}

		public int readLine(Text str, int maxLineLength, int maxBytesToConsume)
				throws IOException {
			str.clear();
			Text record = new Text();
			int txtLength = 0;
			long bytesConsumed = 0L;
			boolean newline = false;
			int sepPosn = 0;
			do {
				// 已經(jīng)讀到buffer的末尾了,讀下一個buffer
				if (this.bufferPosn >= this.bufferLength) {
					bufferPosn = 0;
					bufferLength = in.read(buffer);
					// 讀到文件末尾了,則跳出,進行下一個文件的讀取
					if (bufferLength <= 0) {
						break;
					}
				}
				int startPosn = this.bufferPosn;
				for (; bufferPosn < bufferLength; bufferPosn++) {
					// 處理上一個buffer的尾巴被切成了兩半的分隔符(如果分隔符中重復(fù)字符過多在這里會有問題)
					if (sepPosn > 0 && buffer[bufferPosn] != separator[sepPosn]) {
						sepPosn = 0;
					}
					// 遇到行分隔符的第一個字符
					if (buffer[bufferPosn] == separator[sepPosn]) {
						bufferPosn++;
						int i = 0;
						// 判斷接下來的字符是否也是行分隔符中的字符
						for (++sepPosn; sepPosn < separator.length; i++, sepPosn++) {
							// buffer的最后剛好是分隔符,且分隔符被不幸地切成了兩半
							if (bufferPosn + i >= bufferLength) {
								bufferPosn += i - 1;
								break;
							}
							// 一旦其中有一個字符不相同,就判定為不是分隔符
							if (this.buffer[this.bufferPosn + i] != separator[sepPosn]) {
								sepPosn = 0;
								break;
							}
						}
						// 的確遇到了行分隔符
						if (sepPosn == separator.length) {
							bufferPosn += i;
							newline = true;
							sepPosn = 0;
							break;
						}
					}
				}
				int readLength = this.bufferPosn - startPosn;
				bytesConsumed += readLength;
				// 行分隔符不放入塊中
				if (readLength > maxLineLength - txtLength) {
					readLength = maxLineLength - txtLength;
				}
				if (readLength > 0) {
					record.append(this.buffer, startPosn, readLength);
					txtLength += readLength;
					// 去掉記錄的分隔符
					if (newline) {
						str.set(record.getBytes(), 0, record.getLength() - separator.length);
					}
				}
			} 
			while (!newline && (bytesConsumed < maxBytesToConsume));
			if (bytesConsumed > (long) Integer.MAX_VALUE) {
				throw new IOException("Too many bytes before newline: "
						+ bytesConsumed);
			}

			return (int) bytesConsumed;
		}

		public int readLine(Text str, int maxLineLength) throws IOException {
			return readLine(str, maxLineLength, Integer.MAX_VALUE);
		}

		public int readLine(Text str) throws IOException {
			return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
		}
	}
}

private byte[] separator = "]@\n".getBytes();

感謝你能夠認真閱讀完這篇文章,希望小編分享的“hadoop中mapreduce如何自定義InputFormat”這篇文章對大家有幫助,同時也希望大家多多支持創(chuàng)新互聯(lián),關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,更多相關(guān)知識等著你來學(xué)習(xí)!

網(wǎng)頁名稱:hadoop中mapreduce如何自定義InputFormat
文章分享:http://www.rwnh.cn/article4/jeesoe.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供品牌網(wǎng)站設(shè)計、定制開發(fā)、響應(yīng)式網(wǎng)站、商城網(wǎng)站靜態(tài)網(wǎng)站小程序開發(fā)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

微信小程序開發(fā)
星座| 深水埗区| 海阳市| 运城市| 岚皋县| 曲阜市| 临夏市| 类乌齐县| 土默特左旗| 灵台县| 定日县| 泾源县| 读书| 陇南市| 新营市| 康马县| 沂水县| 资兴市| 高尔夫| 琼中| 宜兰县| 枣阳市| 和田市| 东阿县| 顺平县| 江津市| 黄冈市| 翁牛特旗| 漾濞| 定日县| 定日县| 广安市| 安西县| 平山县| 凤台县| 韶山市| 鄄城县| 全椒县| 渭南市| 黄骅市| 聂拉木县|