diff --git a/core/src/main/java/com/mongodb/hadoop/input/BSONFileRecordReader.java b/core/src/main/java/com/mongodb/hadoop/input/BSONFileRecordReader.java index b70edfcd..79b70597 100644 --- a/core/src/main/java/com/mongodb/hadoop/input/BSONFileRecordReader.java +++ b/core/src/main/java/com/mongodb/hadoop/input/BSONFileRecordReader.java @@ -30,6 +30,12 @@ import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.fs.Seekable; +import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.io.compress.CodecPool; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.io.compress.CompressionCodec; + import org.bson.*; import java.io.DataInputStream; @@ -65,22 +71,41 @@ public class BSONFileRecordReader extends RecordReader private Object key; private BSONObject value; byte[] headerBuf = new byte[4]; - private FSDataInputStream in; + private InputStream in; private int numDocsRead = 0; private boolean finished = false; private BSONCallback callback; private BSONDecoder decoder; + private CompressionCodecFactory compressionCodecs = null; + private CompressionCodec codec; + private Decompressor decompressor; + private Seekable filePosition; @Override public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException { this.fileSplit = (FileSplit) inputSplit; this.conf = context.getConfiguration(); - log.info("reading split " + this.fileSplit.toString()); Path file = fileSplit.getPath(); + compressionCodecs = new CompressionCodecFactory(this.conf); + codec = compressionCodecs.getCodec(file); FileSystem fs = file.getFileSystem(conf); - in = fs.open(file, 16*1024*1024); - in.seek(fileSplit.getStart()); + FSDataInputStream fileIn = fs.open(file, 16*1024*1024); + if (codec == null) { + log.info("reading split " + this.fileSplit.toString()); + fileIn.seek(fileSplit.getStart()); + in = fileIn; + } else { + if (fileSplit.getStart() > 0) { + throw new IOException("File is not seekable but start of split is non-zero"); + } + decompressor = CodecPool.getDecompressor(codec); + in = codec.createInputStream(fileIn, decompressor); + log.info("reading compressed split " + this.fileSplit.toString()); + // start is ignored. as the file is not really seekable. + // but then the splits should be starting at 0 + } + filePosition = fileIn; if (MongoConfigUtil.getLazyBSON(this.conf)) { callback = new LazyBSONCallback(); @@ -94,7 +119,8 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws @Override public boolean nextKeyValue() throws IOException, InterruptedException { try{ - if(in.getPos() >= this.fileSplit.getStart() + this.fileSplit.getLength()){ + if (filePosition.getPos() >= this.fileSplit.getStart() + this.fileSplit.getLength() + && (codec == null || in.available() == 0)) { try{ this.close(); }catch(Exception e){ @@ -108,7 +134,7 @@ public boolean nextKeyValue() throws IOException, InterruptedException { value = (BSONObject) callback.get(); numDocsRead++; if(numDocsRead % 10000 == 0){ - log.debug("read " + numDocsRead + " docs from " + this.fileSplit.toString() + " at " + in.getPos()); + log.debug("read " + numDocsRead + " docs from " + this.fileSplit.toString() + " at " + filePosition.getPos()); } return true; }catch(Exception e){ @@ -138,7 +164,7 @@ public float getProgress() throws IOException, InterruptedException { if(this.finished) return 1f; if(in != null) - return new Float(in.getPos() - this.fileSplit.getStart()) / this.fileSplit.getLength(); + return new Float(filePosition.getPos() - this.fileSplit.getStart()) / this.fileSplit.getLength(); return 0f; } @@ -148,6 +174,9 @@ public void close() throws IOException { if(this.in != null){ in.close(); } + if (codec != null){ + ((FSDataInputStream)this.filePosition).close(); + } } } diff --git a/core/src/main/java/com/mongodb/hadoop/mapred/input/BSONFileRecordReader.java b/core/src/main/java/com/mongodb/hadoop/mapred/input/BSONFileRecordReader.java index ba35a9c3..7dc1d90b 100644 --- a/core/src/main/java/com/mongodb/hadoop/mapred/input/BSONFileRecordReader.java +++ b/core/src/main/java/com/mongodb/hadoop/mapred/input/BSONFileRecordReader.java @@ -30,6 +30,11 @@ import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.TaskAttemptContext; import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.fs.Seekable; +import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.io.compress.CodecPool; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.io.compress.CompressionCodec; import org.bson.*; import java.io.DataInputStream; @@ -49,7 +54,7 @@ public class BSONFileRecordReader implements RecordReader 0) { + throw new IOException("File is not seekable but start of split is non-zero"); + } + decompressor = CodecPool.getDecompressor(codec); + in = codec.createInputStream(fileIn, decompressor); + log.info("reading compressed split " + this.fileSplit.toString()); + // start is ignored. as the file is not really seekable. + // but then the splits should be starting at 0 + } + filePosition = fileIn; + if (MongoConfigUtil.getLazyBSON(conf)) { callback = new LazyBSONCallback(); decoder = new LazyBSONDecoder(); @@ -77,7 +104,8 @@ public void initialize(InputSplit inputSplit, Configuration conf) throws IOExcep @Override public boolean next(NullWritable key, BSONWritable value) throws IOException { try{ - if(in.getPos() >= this.fileSplit.getStart() + this.fileSplit.getLength()){ + if (filePosition.getPos() >= this.fileSplit.getStart() + this.fileSplit.getLength() + && (codec == null || in.available() == 0)) { try{ this.close(); }catch(Exception e){ @@ -93,7 +121,7 @@ public boolean next(NullWritable key, BSONWritable value) throws IOException { numDocsRead++; if(numDocsRead % 5000 == 0){ - log.debug("read " + numDocsRead + " docs from " + this.fileSplit.toString() + " at " + in.getPos()); + log.debug("read " + numDocsRead + " docs from " + this.fileSplit.toString() + " at " + filePosition.getPos()); } return true; }catch(Exception e){ @@ -111,15 +139,15 @@ public float getProgress() throws IOException { if(this.finished) return 1f; if(in != null) - return new Float(in.getPos() - this.fileSplit.getStart()) / this.fileSplit.getLength(); + return new Float(filePosition.getPos() - this.fileSplit.getStart()) / this.fileSplit.getLength(); return 0f; } public long getPos() throws IOException { if(this.finished) return this.fileSplit.getStart() + this.fileSplit.getLength(); - if(in != null ) - return in.getPos(); + if (in != null) + return filePosition.getPos(); return this.fileSplit.getStart(); } @@ -140,6 +168,9 @@ public void close() throws IOException { if(this.in != null){ in.close(); } + if (codec != null) { + ((FSDataInputStream)this.filePosition).close(); + } } } diff --git a/core/src/main/java/com/mongodb/hadoop/splitter/BSONSplitter.java b/core/src/main/java/com/mongodb/hadoop/splitter/BSONSplitter.java index 5a0ec7fd..c1613368 100644 --- a/core/src/main/java/com/mongodb/hadoop/splitter/BSONSplitter.java +++ b/core/src/main/java/com/mongodb/hadoop/splitter/BSONSplitter.java @@ -26,7 +26,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; - import org.apache.commons.logging.*; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.BlockLocation; @@ -45,6 +44,8 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.io.compress.CompressionCodecFactory; + import org.bson.BSONObject; import org.bson.BasicBSONCallback; @@ -153,8 +154,18 @@ public void readSplitsForFile(FileStatus file) throws IOException{ ArrayList splits = new ArrayList(); FileSystem fs = path.getFileSystem(getConf()); long length = file.getLen(); - if(!getConf().getBoolean("bson.split.read_splits", true)){ + boolean dosplits = true; + if (!getConf().getBoolean("bson.split.read_splits", true)) { log.info("Reading splits is disabled - constructing single split for " + file); + dosplits = false; + } else { + CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(getConf()); + if (compressionCodecs.getCodec(file.getPath()) != null) { + dosplits = false; + } + log.info("File is compressed - constructing single split for " + file); + } + if (!dosplits) { FileSplit onesplit = createFileSplit(file, fs, 0, length); splits.add(onesplit); this.splitsList = splits;