diff --git a/core/src/main/java/com/mongodb/hadoop/splitter/StandaloneMongoSplitter.java b/core/src/main/java/com/mongodb/hadoop/splitter/StandaloneMongoSplitter.java index 88b805df..c55907eb 100644 --- a/core/src/main/java/com/mongodb/hadoop/splitter/StandaloneMongoSplitter.java +++ b/core/src/main/java/com/mongodb/hadoop/splitter/StandaloneMongoSplitter.java @@ -66,9 +66,12 @@ public StandaloneMongoSplitter(final Configuration conf) { public List calculateSplits() throws SplitFailedException { final DBObject splitKey = MongoConfigUtil.getInputSplitKey(getConfiguration()); final int splitSize = MongoConfigUtil.getSplitSize(getConfiguration()); + final BasicDBObject splitMin = (BasicDBObject)MongoConfigUtil.getDBObject(getConfiguration(), MongoConfigUtil.SPLITS_MIN_KEY); + final BasicDBObject splitMax = splitMin == null ? null : (BasicDBObject)MongoConfigUtil.getDBObject(getConfiguration(), MongoConfigUtil.SPLITS_MAX_KEY); final MongoClientURI inputURI; + DBCollection inputCollection = null; - final ArrayList returnVal; + final List returnVal; try { inputURI = MongoConfigUtil.getInputURI(getConfiguration()); MongoClientURI authURI = MongoConfigUtil.getAuthURI(getConfiguration()); @@ -79,16 +82,20 @@ public List calculateSplits() throws SplitFailedException { inputCollection = MongoConfigUtil.getCollection(inputURI); } - returnVal = new ArrayList(); + //returnVal = new ArrayList(); final String ns = inputCollection.getFullName(); LOG.info("Running splitvector to check splits against " + inputURI); - final DBObject cmd = BasicDBObjectBuilder.start("splitVector", ns) + final BasicDBObjectBuilder builder = BasicDBObjectBuilder.start("splitVector", ns) .add("keyPattern", splitKey) // force:True is misbehaving it seems .add("force", false) - .add("maxChunkSize", splitSize) - .get(); + .add("maxChunkSize", splitSize); + if(splitMin != null) { + builder.add( "min", splitMin ); + builder.add( "max", splitMax ); + } + final DBObject cmd = builder.get(); CommandResult data; boolean ok = true; @@ -151,6 +158,8 @@ public List calculateSplits() throws SplitFailedException { } } if (data != null && !data.get("ok").equals(1.0)) { + System.out.println("Unable to calculate input splits for cmd:" ); + System.out.println(""+cmd ); throw new SplitFailedException("Unable to calculate input splits: " + data.get("errmsg")); } @@ -164,18 +173,7 @@ public List calculateSplits() throws SplitFailedException { LOG.warn("WARNING: No Input Splits were calculated by the split code. Proceeding with a *single* split. Data may be too" + " small, try lowering 'mongo.input.split_size' if this is undesirable."); } - - BasicDBObject lastKey = null; // Lower boundary of the first min split - - for (final Object aSplitData : splitData) { - final BasicDBObject currentKey = (BasicDBObject) aSplitData; - returnVal.add(createSplitFromBounds(lastKey, currentKey)); - lastKey = currentKey; - } - - // Last max split, with empty upper boundary - final MongoInputSplit lastSplit = createSplitFromBounds(lastKey, null); - returnVal.add(lastSplit); + returnVal = createSplits(splitData, splitMin, splitMax); } finally { if (inputCollection != null) { MongoConfigUtil.close(inputCollection.getDB().getMongo()); @@ -185,4 +183,18 @@ public List calculateSplits() throws SplitFailedException { return returnVal; } + protected List createSplits(BasicDBList splitData, + BasicDBObject splitMin, BasicDBObject splitMax) throws SplitFailedException { + final ArrayList returnVal = new ArrayList(); + + BasicDBObject lastKey = splitMin; + for( Object aSplitData : splitData ) { + BasicDBObject currentKey = (BasicDBObject)aSplitData; + returnVal.add(createSplitFromBounds(lastKey,currentKey)); + lastKey = currentKey; + } + returnVal.add(createSplitFromBounds(lastKey,splitMax)); + + return returnVal; + } } diff --git a/core/src/main/java/com/mongodb/hadoop/util/MongoConfigUtil.java b/core/src/main/java/com/mongodb/hadoop/util/MongoConfigUtil.java index 5c7d6381..9179997c 100644 --- a/core/src/main/java/com/mongodb/hadoop/util/MongoConfigUtil.java +++ b/core/src/main/java/com/mongodb/hadoop/util/MongoConfigUtil.java @@ -202,6 +202,25 @@ public final class MongoConfigUtil { public static final String SPLITS_USE_RANGEQUERY = "mongo.input.split.use_range_queries"; /** + * The lower bound shard key to use when creating the input splits.

+ * Defaults to {@code null}. Values must be provided for both this key + * and {@link #SPLITS_MAX_KEY} in order for the range to be used. Remember + * that if your index is ordered descending then the value for this key + * will actually be greater than the value specified at {@link #SPLITS_MAX_KEY}. + **/ + public static final String SPLITS_MIN_KEY = "mongo.input.split.split_key_min"; + + /** + * The upper bound shard key to use when creating the input splits.

+ * Defaults to {@code null}. Values must be provided for both this key + * and {@link #SPLITS_MIN_KEY} in order for the range to be used. Remember + * that if your index is ordered descending then the value for this key + * will actually be less than the value specified at {@link #SPLITS_MIN_KEY}. + **/ + public static final String SPLITS_MAX_KEY = "mongo.input.split.split_key_max"; + + /** + * Shared MongoClient instance cache. * One client per thread */ private static final ThreadLocal> CLIENTS = @@ -880,4 +899,4 @@ private static MongoClient getMongoClient(final MongoClientURI uri) throws Unkno } return mongoClient; } -} + } diff --git a/core/src/test/java/com/mongodb/hadoop/splitter/StandaloneMongoSplitterTest.java b/core/src/test/java/com/mongodb/hadoop/splitter/StandaloneMongoSplitterTest.java index 26e1eab2..e71056e4 100644 --- a/core/src/test/java/com/mongodb/hadoop/splitter/StandaloneMongoSplitterTest.java +++ b/core/src/test/java/com/mongodb/hadoop/splitter/StandaloneMongoSplitterTest.java @@ -1,6 +1,7 @@ package com.mongodb.hadoop.splitter; import com.mongodb.BasicDBObject; +import com.mongodb.BasicDBList; import com.mongodb.DBCollection; import com.mongodb.MongoClient; import com.mongodb.MongoClientURI; @@ -16,6 +17,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; public class StandaloneMongoSplitterTest { @@ -79,4 +81,103 @@ public void testLowerUpperBounds() throws Exception { assertEquals(0, split.getMin().get("a")); assertEquals(10, split.getMax().get("a")); } + + @Test + public void testCreateSplitsNoRange() throws Exception { + Configuration config = new Configuration(); + StandaloneMongoSplitter splitter = new StandaloneMongoSplitter(config); + + BasicDBList list = new BasicDBList(); + list.add(new BasicDBObject( "frame", "1500")); + list.add(new BasicDBObject( "frame", "1000")); + list.add(new BasicDBObject( "frame", "500")); + + List splits = splitter.createSplits(list, null, null); + assertEquals(4, splits.size()); + + MongoInputSplit s1 = (MongoInputSplit)splits.get(0); + assertNull(s1.getMin().get("frame")); + assertEquals("1500", s1.getMax().get("frame")); + + MongoInputSplit s2 = (MongoInputSplit)splits.get(1); + assertEquals("1500", s2.getMin().get("frame")); + assertEquals("1000", s2.getMax().get("frame")); + + MongoInputSplit s4 = (MongoInputSplit)splits.get(3); + assertEquals("500", s4.getMin().get("frame")); + assertNull(s4.getMax().get("frame")); + } + + @Test + public void testCreateSplitsRange() throws Exception { + Configuration config = new Configuration(); + StandaloneMongoSplitter splitter = new StandaloneMongoSplitter(config); + + BasicDBList list = new BasicDBList(); + list.add(new BasicDBObject( "frame", "1000")); + list.add(new BasicDBObject( "frame", "1500")); + list.add(new BasicDBObject( "frame", "2000")); + + BasicDBObject splitMin = new BasicDBObject("frame", "500"); + BasicDBObject splitMax = new BasicDBObject("frame", "2500"); + List splits = splitter.createSplits(list, splitMin, splitMax); + assertEquals(4, splits.size()); + + MongoInputSplit s1 = (MongoInputSplit)splits.get(0); + assertEquals("500", s1.getMin().get("frame")); + assertEquals("1000", s1.getMax().get("frame")); + + MongoInputSplit s2 = (MongoInputSplit)splits.get(1); + assertEquals("1000", s2.getMin().get("frame")); + assertEquals("1500", s2.getMax().get("frame")); + + MongoInputSplit s3 = (MongoInputSplit)splits.get(3); + assertEquals("2000", s3.getMin().get("frame")); + assertEquals("2500", s3.getMax().get("frame")); + } + + @Test + public void testCreateSplitsRangeDescending() throws Exception { + Configuration config = new Configuration(); + StandaloneMongoSplitter splitter = new StandaloneMongoSplitter(config); + + BasicDBList list = new BasicDBList(); + list.add(new BasicDBObject( "frame", "2000")); + list.add(new BasicDBObject( "frame", "1500")); + list.add(new BasicDBObject( "frame", "1000")); + + BasicDBObject splitMin = new BasicDBObject("frame", "2500"); + BasicDBObject splitMax = new BasicDBObject("frame", "500"); + List splits = splitter.createSplits(list, splitMin, splitMax); + assertEquals(4, splits.size()); + + MongoInputSplit s1 = (MongoInputSplit)splits.get(0); + assertEquals("2500", s1.getMin().get("frame")); + assertEquals("2000", s1.getMax().get("frame")); + + MongoInputSplit s2 = (MongoInputSplit)splits.get(1); + assertEquals("2000", s2.getMin().get("frame")); + assertEquals("1500", s2.getMax().get("frame")); + + MongoInputSplit s3 = (MongoInputSplit)splits.get(3); + assertEquals("1000", s3.getMin().get("frame")); + assertEquals("500", s3.getMax().get("frame")); + } + + @Test + public void testCreateSingleSplitAscending() throws Exception { + Configuration config = new Configuration(); + StandaloneMongoSplitter splitter = new StandaloneMongoSplitter(config); + + BasicDBList list = new BasicDBList(); + BasicDBObject splitMin = new BasicDBObject("frame", "500"); + BasicDBObject splitMax = new BasicDBObject("frame", "2500"); + + List splits = splitter.createSplits(list, splitMin, splitMax); + assertEquals(1, splits.size()); + + MongoInputSplit s1 = (MongoInputSplit)splits.get(0); + assertEquals("500", s1.getMin().get("frame")); + assertEquals("2500", s1.getMax().get("frame")); + } }