From fc79540ecc90a10fdbd3c1a31ffb1aa937c32709 Mon Sep 17 00:00:00 2001 From: Adam Stull Date: Thu, 22 Jan 2015 15:25:14 -0500 Subject: [PATCH 1/3] Add support for splits on descending shard keys and add support for specifying min and max split keys --- .../splitter/StandaloneMongoSplitter.java | 15 +++++--- .../mongodb/hadoop/util/MongoConfigUtil.java | 35 ++++++++++++++++++- 2 files changed, 45 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/com/mongodb/hadoop/splitter/StandaloneMongoSplitter.java b/core/src/main/java/com/mongodb/hadoop/splitter/StandaloneMongoSplitter.java index 947656a3..ce1f8bb3 100644 --- a/core/src/main/java/com/mongodb/hadoop/splitter/StandaloneMongoSplitter.java +++ b/core/src/main/java/com/mongodb/hadoop/splitter/StandaloneMongoSplitter.java @@ -66,6 +66,9 @@ public StandaloneMongoSplitter(final Configuration conf) { public List calculateSplits() throws SplitFailedException { final DBObject splitKey = MongoConfigUtil.getInputSplitKey(getConfiguration()); final int splitSize = MongoConfigUtil.getSplitSize(getConfiguration()); + final BasicDBObject splitMin = (BasicDBObject)MongoConfigUtil.getDBObject(getConfiguration(), MongoConfigUtil.SPLITS_MIN_KEY); + final BasicDBObject splitMax = splitMin == null ? null : (BasicDBObject)MongoConfigUtil.getDBObject(getConfiguration(), MongoConfigUtil.SPLITS_MAX_KEY); + final boolean splitKeyDescending = MongoConfigUtil.isSplitKeyDescending(getConfiguration()); MongoClientURI inputURI; DBCollection inputCollection = null; final ArrayList returnVal; @@ -77,12 +80,16 @@ public List calculateSplits() throws SplitFailedException { final String ns = inputCollection.getFullName(); LOG.info("Running splitvector to check splits against " + inputURI); - final DBObject cmd = BasicDBObjectBuilder.start("splitVector", ns) + final BasicDBObjectBuilder builder = BasicDBObjectBuilder.start("splitVector", ns) .add("keyPattern", splitKey) // force:True is misbehaving it seems .add("force", false) - .add("maxChunkSize", splitSize) - .get(); + .add("maxChunkSize", splitSize); + if(splitMin != null) { + builder.add( "min", splitMin ); + builder.add( "max", splitMax ); + } + final DBObject cmd = builder.get(); CommandResult data; boolean ok = true; @@ -154,7 +161,7 @@ public List calculateSplits() throws SplitFailedException { + " small, try lowering 'mongo.input.split_size' if this is undesirable."); } - BasicDBObject lastKey = null; // Lower boundary of the first min split + BasicDBObject lastKey = splitKeyDescending ? splitMin : splitMax; // Lower boundary of the first min split for (Object aSplitData : splitData) { BasicDBObject currentKey = (BasicDBObject) aSplitData; diff --git a/core/src/main/java/com/mongodb/hadoop/util/MongoConfigUtil.java b/core/src/main/java/com/mongodb/hadoop/util/MongoConfigUtil.java index 22b65313..b3cc3271 100644 --- a/core/src/main/java/com/mongodb/hadoop/util/MongoConfigUtil.java +++ b/core/src/main/java/com/mongodb/hadoop/util/MongoConfigUtil.java @@ -140,6 +140,13 @@ public final class MongoConfigUtil { */ public static final String INPUT_SPLIT_KEY_PATTERN = "mongo.input.split.split_key_pattern"; + /** + * If the key specified for {@link #INPUT_SPLIT_KEY_PATTERN} is a descending index or + * is a compound key that contains a descending value then this should be set to {@code true}.

+ * Defaults to {@code false}. + **/ + public static final String SPLIT_KEY_DESCENDING = "mongo.input.split.split_key_descending"; + /** * If {@code true}, the driver will attempt to split the MongoDB Input data (if reading from Mongo) into multiple InputSplits to allow * parallelism/concurrency in processing within Hadoop. That is to say, Hadoop will assign one InputSplit per mapper. @@ -175,6 +182,24 @@ public final class MongoConfigUtil { */ public static final String SPLITS_USE_RANGEQUERY = "mongo.input.split.use_range_queries"; + /** + * The lower bound shard key to use when creating the input splits.

+ * Defaults to {@code null}. Values must be provided for both this key + * and {@link #SPLITS_MAX_KEY} in order for the range to be used. Remember + * that if your index is ordered descending then the value for this key + * will actually be greater than the value specified at {@link #SPLITS_MAX_KEY}. + **/ + public static final String SPLITS_MIN_KEY = "mongo.input.split.split_key_min"; + + /** + * The upper bound shard key to use when creating the input splits.

+ * Defaults to {@code null}. Values must be provided for both this key + * and {@link #SPLITS_MIN_KEY} in order for the range to be used. Remember + * that if your index is ordered descending then the value for this key + * will actually be less than the value specified at {@link #SPLITS_MIN_KEY}. + **/ + public static final String SPLITS_MAX_KEY = "mongo.input.split.split_key_max"; + /** * Shared MongoClient instance cache. */ @@ -599,6 +624,14 @@ public static void setRangeQueryEnabled(final Configuration conf, final boolean conf.setBoolean(SPLITS_USE_RANGEQUERY, value); } + public static boolean isSplitKeyDescending(final Configuration conf) { + return conf.getBoolean(SPLIT_KEY_DESCENDING, false); + } + + public static void setSplitKeyDescending(final Configuration conf, final boolean value) { + conf.setBoolean(SPLIT_KEY_DESCENDING, value); + } + /** * if TRUE, Splits will be read by connecting to the individual shard servers, Only use this ( issue has to do with chunks moving / * relocating during balancing phases) @@ -816,4 +849,4 @@ private static MongoClient getMongoClient(final MongoClientURI uri) throws Unkno return mongoClient; } } -} \ No newline at end of file +} From 1d6a0da85f8fc91411dc44612f8e9db9d2b453fa Mon Sep 17 00:00:00 2001 From: Adam Stull Date: Thu, 22 Jan 2015 16:54:14 -0500 Subject: [PATCH 2/3] Refactored code to make it more testable and added test case --- .../splitter/StandaloneMongoSplitter.java | 32 +++---- .../mongodb/hadoop/util/MongoConfigUtil.java | 5 +- .../splitter/StandaloneMongoSplitterTest.java | 84 +++++++++++++++++++ 3 files changed, 106 insertions(+), 15 deletions(-) diff --git a/core/src/main/java/com/mongodb/hadoop/splitter/StandaloneMongoSplitter.java b/core/src/main/java/com/mongodb/hadoop/splitter/StandaloneMongoSplitter.java index ce1f8bb3..8e322adf 100644 --- a/core/src/main/java/com/mongodb/hadoop/splitter/StandaloneMongoSplitter.java +++ b/core/src/main/java/com/mongodb/hadoop/splitter/StandaloneMongoSplitter.java @@ -71,12 +71,12 @@ public List calculateSplits() throws SplitFailedException { final boolean splitKeyDescending = MongoConfigUtil.isSplitKeyDescending(getConfiguration()); MongoClientURI inputURI; DBCollection inputCollection = null; - final ArrayList returnVal; + final List returnVal; try { inputURI = MongoConfigUtil.getInputURI(getConfiguration()); inputCollection = MongoConfigUtil.getCollection(inputURI); - returnVal = new ArrayList(); + //returnVal = new ArrayList(); final String ns = inputCollection.getFullName(); LOG.info("Running splitvector to check splits against " + inputURI); @@ -160,18 +160,7 @@ public List calculateSplits() throws SplitFailedException { LOG.warn("WARNING: No Input Splits were calculated by the split code. Proceeding with a *single* split. Data may be too" + " small, try lowering 'mongo.input.split_size' if this is undesirable."); } - - BasicDBObject lastKey = splitKeyDescending ? splitMin : splitMax; // Lower boundary of the first min split - - for (Object aSplitData : splitData) { - BasicDBObject currentKey = (BasicDBObject) aSplitData; - returnVal.add(createSplitFromBounds(lastKey, currentKey)); - lastKey = currentKey; - } - - // Last max split, with empty upper boundary - MongoInputSplit lastSplit = createSplitFromBounds(lastKey, null); - returnVal.add(lastSplit); + returnVal = createSplits(splitData, splitMin, splitMax, splitKeyDescending); } finally { if (inputCollection != null) { MongoConfigUtil.close(inputCollection.getDB().getMongo()); @@ -181,4 +170,19 @@ public List calculateSplits() throws SplitFailedException { return returnVal; } + protected List createSplits( BasicDBList splitData, + BasicDBObject splitMin, BasicDBObject splitMax, boolean descending ) throws SplitFailedException { + final ArrayList returnVal = new ArrayList(); + + BasicDBObject minKey = descending ? splitMax : splitMin; + BasicDBObject maxKey = descending ? splitMin : splitMax; + for( Object aSplitData : splitData ) { + BasicDBObject currentKey = (BasicDBObject)aSplitData; + returnVal.add(createSplitFromBounds(maxKey, currentKey)); + maxKey = currentKey; + } + returnVal.add(createSplitFromBounds(maxKey,minKey)); + + return returnVal; + } } diff --git a/core/src/main/java/com/mongodb/hadoop/util/MongoConfigUtil.java b/core/src/main/java/com/mongodb/hadoop/util/MongoConfigUtil.java index b3cc3271..672011fd 100644 --- a/core/src/main/java/com/mongodb/hadoop/util/MongoConfigUtil.java +++ b/core/src/main/java/com/mongodb/hadoop/util/MongoConfigUtil.java @@ -142,7 +142,10 @@ public final class MongoConfigUtil { /** * If the key specified for {@link #INPUT_SPLIT_KEY_PATTERN} is a descending index or - * is a compound key that contains a descending value then this should be set to {@code true}.

+ * is a compound key that contains a descending value then this should be set to {@code true}. + * If {@link #SPLITS_MIN_KEY} and {@link #SPLITS_MAX_KEY} are specified and the max key value + * is less than the min key value then this should be set to {@code true}. In all other + * cases this should either not be set or be set to {@code false}.

* Defaults to {@code false}. **/ public static final String SPLIT_KEY_DESCENDING = "mongo.input.split.split_key_descending"; diff --git a/core/src/test/java/com/mongodb/hadoop/splitter/StandaloneMongoSplitterTest.java b/core/src/test/java/com/mongodb/hadoop/splitter/StandaloneMongoSplitterTest.java index 26e1eab2..99514bb1 100644 --- a/core/src/test/java/com/mongodb/hadoop/splitter/StandaloneMongoSplitterTest.java +++ b/core/src/test/java/com/mongodb/hadoop/splitter/StandaloneMongoSplitterTest.java @@ -1,6 +1,7 @@ package com.mongodb.hadoop.splitter; import com.mongodb.BasicDBObject; +import com.mongodb.BasicDBList; import com.mongodb.DBCollection; import com.mongodb.MongoClient; import com.mongodb.MongoClientURI; @@ -16,6 +17,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; public class StandaloneMongoSplitterTest { @@ -79,4 +81,86 @@ public void testLowerUpperBounds() throws Exception { assertEquals(0, split.getMin().get("a")); assertEquals(10, split.getMax().get("a")); } + + @Test + public void testCreateSplitsNoRange() throws Exception { + Configuration config = new Configuration(); + StandaloneMongoSplitter splitter = new StandaloneMongoSplitter(config); + + BasicDBList list = new BasicDBList(); + list.add(new BasicDBObject( "frame", "1500")); + list.add(new BasicDBObject( "frame", "1000")); + list.add(new BasicDBObject( "frame", "500")); + + List splits = splitter.createSplits(list, null, null, false); + assertEquals(4, splits.size()); + + MongoInputSplit s1 = (MongoInputSplit)splits.get(0); + assertNull(s1.getMin().get("frame")); + assertEquals("1500", s1.getMax().get("frame")); + + MongoInputSplit s2 = (MongoInputSplit)splits.get(1); + assertEquals("1500", s2.getMin().get("frame")); + assertEquals("1000", s2.getMax().get("frame")); + + MongoInputSplit s4 = (MongoInputSplit)splits.get(3); + assertEquals("500", s4.getMin().get("frame")); + assertNull(s4.getMax().get("frame")); + } + + @Test + public void testCreateSplitsRange() throws Exception { + Configuration config = new Configuration(); + StandaloneMongoSplitter splitter = new StandaloneMongoSplitter(config); + + BasicDBList list = new BasicDBList(); + list.add(new BasicDBObject( "frame", "2000")); + list.add(new BasicDBObject( "frame", "1500")); + list.add(new BasicDBObject( "frame", "1000")); + + BasicDBObject splitMin = new BasicDBObject("frame", "500"); + BasicDBObject splitMax = new BasicDBObject("frame", "2500"); + List splits = splitter.createSplits(list, splitMin, splitMax, false); + assertEquals(4, splits.size()); + + MongoInputSplit s1 = (MongoInputSplit)splits.get(0); + assertEquals("2500", s1.getMin().get("frame")); + assertEquals("2000", s1.getMax().get("frame")); + + MongoInputSplit s2 = (MongoInputSplit)splits.get(1); + assertEquals("2000", s2.getMin().get("frame")); + assertEquals("1500", s2.getMax().get("frame")); + + MongoInputSplit s3 = (MongoInputSplit)splits.get(3); + assertEquals("1000", s3.getMin().get("frame")); + assertEquals("500", s3.getMax().get("frame")); + } + + @Test + public void testCreateSplitsRangeDescending() throws Exception { + Configuration config = new Configuration(); + StandaloneMongoSplitter splitter = new StandaloneMongoSplitter(config); + + BasicDBList list = new BasicDBList(); + list.add(new BasicDBObject( "frame", "2000")); + list.add(new BasicDBObject( "frame", "1500")); + list.add(new BasicDBObject( "frame", "1000")); + + BasicDBObject splitMin = new BasicDBObject("frame", "2500"); + BasicDBObject splitMax = new BasicDBObject("frame", "500"); + List splits = splitter.createSplits(list, splitMin, splitMax, true); + assertEquals(4, splits.size()); + + MongoInputSplit s1 = (MongoInputSplit)splits.get(0); + assertEquals("2500", s1.getMin().get("frame")); + assertEquals("2000", s1.getMax().get("frame")); + + MongoInputSplit s2 = (MongoInputSplit)splits.get(1); + assertEquals("2000", s2.getMin().get("frame")); + assertEquals("1500", s2.getMax().get("frame")); + + MongoInputSplit s3 = (MongoInputSplit)splits.get(3); + assertEquals("1000", s3.getMin().get("frame")); + assertEquals("500", s3.getMax().get("frame")); + } } From 78731acb27abe20e5f61fed0c157fc11c4b64b0d Mon Sep 17 00:00:00 2001 From: Adam Stull Date: Mon, 30 Mar 2015 13:55:02 -0400 Subject: [PATCH 3/3] Remove "split_key_descending" property since its unecessary --- .../splitter/StandaloneMongoSplitter.java | 16 ++++---- .../mongodb/hadoop/util/MongoConfigUtil.java | 18 --------- .../splitter/StandaloneMongoSplitterTest.java | 37 ++++++++++++++----- 3 files changed, 34 insertions(+), 37 deletions(-) diff --git a/core/src/main/java/com/mongodb/hadoop/splitter/StandaloneMongoSplitter.java b/core/src/main/java/com/mongodb/hadoop/splitter/StandaloneMongoSplitter.java index 8e322adf..c0a27f50 100644 --- a/core/src/main/java/com/mongodb/hadoop/splitter/StandaloneMongoSplitter.java +++ b/core/src/main/java/com/mongodb/hadoop/splitter/StandaloneMongoSplitter.java @@ -68,7 +68,6 @@ public List calculateSplits() throws SplitFailedException { final int splitSize = MongoConfigUtil.getSplitSize(getConfiguration()); final BasicDBObject splitMin = (BasicDBObject)MongoConfigUtil.getDBObject(getConfiguration(), MongoConfigUtil.SPLITS_MIN_KEY); final BasicDBObject splitMax = splitMin == null ? null : (BasicDBObject)MongoConfigUtil.getDBObject(getConfiguration(), MongoConfigUtil.SPLITS_MAX_KEY); - final boolean splitKeyDescending = MongoConfigUtil.isSplitKeyDescending(getConfiguration()); MongoClientURI inputURI; DBCollection inputCollection = null; final List returnVal; @@ -160,7 +159,7 @@ public List calculateSplits() throws SplitFailedException { LOG.warn("WARNING: No Input Splits were calculated by the split code. Proceeding with a *single* split. Data may be too" + " small, try lowering 'mongo.input.split_size' if this is undesirable."); } - returnVal = createSplits(splitData, splitMin, splitMax, splitKeyDescending); + returnVal = createSplits(splitData, splitMin, splitMax); } finally { if (inputCollection != null) { MongoConfigUtil.close(inputCollection.getDB().getMongo()); @@ -170,18 +169,17 @@ public List calculateSplits() throws SplitFailedException { return returnVal; } - protected List createSplits( BasicDBList splitData, - BasicDBObject splitMin, BasicDBObject splitMax, boolean descending ) throws SplitFailedException { + protected List createSplits(BasicDBList splitData, + BasicDBObject splitMin, BasicDBObject splitMax) throws SplitFailedException { final ArrayList returnVal = new ArrayList(); - BasicDBObject minKey = descending ? splitMax : splitMin; - BasicDBObject maxKey = descending ? splitMin : splitMax; + BasicDBObject lastKey = splitMin; for( Object aSplitData : splitData ) { BasicDBObject currentKey = (BasicDBObject)aSplitData; - returnVal.add(createSplitFromBounds(maxKey, currentKey)); - maxKey = currentKey; + returnVal.add(createSplitFromBounds(lastKey,currentKey)); + lastKey = currentKey; } - returnVal.add(createSplitFromBounds(maxKey,minKey)); + returnVal.add(createSplitFromBounds(lastKey,splitMax)); return returnVal; } diff --git a/core/src/main/java/com/mongodb/hadoop/util/MongoConfigUtil.java b/core/src/main/java/com/mongodb/hadoop/util/MongoConfigUtil.java index 672011fd..6b400202 100644 --- a/core/src/main/java/com/mongodb/hadoop/util/MongoConfigUtil.java +++ b/core/src/main/java/com/mongodb/hadoop/util/MongoConfigUtil.java @@ -140,16 +140,6 @@ public final class MongoConfigUtil { */ public static final String INPUT_SPLIT_KEY_PATTERN = "mongo.input.split.split_key_pattern"; - /** - * If the key specified for {@link #INPUT_SPLIT_KEY_PATTERN} is a descending index or - * is a compound key that contains a descending value then this should be set to {@code true}. - * If {@link #SPLITS_MIN_KEY} and {@link #SPLITS_MAX_KEY} are specified and the max key value - * is less than the min key value then this should be set to {@code true}. In all other - * cases this should either not be set or be set to {@code false}.

- * Defaults to {@code false}. - **/ - public static final String SPLIT_KEY_DESCENDING = "mongo.input.split.split_key_descending"; - /** * If {@code true}, the driver will attempt to split the MongoDB Input data (if reading from Mongo) into multiple InputSplits to allow * parallelism/concurrency in processing within Hadoop. That is to say, Hadoop will assign one InputSplit per mapper. @@ -627,14 +617,6 @@ public static void setRangeQueryEnabled(final Configuration conf, final boolean conf.setBoolean(SPLITS_USE_RANGEQUERY, value); } - public static boolean isSplitKeyDescending(final Configuration conf) { - return conf.getBoolean(SPLIT_KEY_DESCENDING, false); - } - - public static void setSplitKeyDescending(final Configuration conf, final boolean value) { - conf.setBoolean(SPLIT_KEY_DESCENDING, value); - } - /** * if TRUE, Splits will be read by connecting to the individual shard servers, Only use this ( issue has to do with chunks moving / * relocating during balancing phases) diff --git a/core/src/test/java/com/mongodb/hadoop/splitter/StandaloneMongoSplitterTest.java b/core/src/test/java/com/mongodb/hadoop/splitter/StandaloneMongoSplitterTest.java index 99514bb1..e71056e4 100644 --- a/core/src/test/java/com/mongodb/hadoop/splitter/StandaloneMongoSplitterTest.java +++ b/core/src/test/java/com/mongodb/hadoop/splitter/StandaloneMongoSplitterTest.java @@ -92,7 +92,7 @@ public void testCreateSplitsNoRange() throws Exception { list.add(new BasicDBObject( "frame", "1000")); list.add(new BasicDBObject( "frame", "500")); - List splits = splitter.createSplits(list, null, null, false); + List splits = splitter.createSplits(list, null, null); assertEquals(4, splits.size()); MongoInputSplit s1 = (MongoInputSplit)splits.get(0); @@ -114,26 +114,26 @@ public void testCreateSplitsRange() throws Exception { StandaloneMongoSplitter splitter = new StandaloneMongoSplitter(config); BasicDBList list = new BasicDBList(); - list.add(new BasicDBObject( "frame", "2000")); - list.add(new BasicDBObject( "frame", "1500")); list.add(new BasicDBObject( "frame", "1000")); + list.add(new BasicDBObject( "frame", "1500")); + list.add(new BasicDBObject( "frame", "2000")); BasicDBObject splitMin = new BasicDBObject("frame", "500"); BasicDBObject splitMax = new BasicDBObject("frame", "2500"); - List splits = splitter.createSplits(list, splitMin, splitMax, false); + List splits = splitter.createSplits(list, splitMin, splitMax); assertEquals(4, splits.size()); MongoInputSplit s1 = (MongoInputSplit)splits.get(0); - assertEquals("2500", s1.getMin().get("frame")); - assertEquals("2000", s1.getMax().get("frame")); + assertEquals("500", s1.getMin().get("frame")); + assertEquals("1000", s1.getMax().get("frame")); MongoInputSplit s2 = (MongoInputSplit)splits.get(1); - assertEquals("2000", s2.getMin().get("frame")); + assertEquals("1000", s2.getMin().get("frame")); assertEquals("1500", s2.getMax().get("frame")); MongoInputSplit s3 = (MongoInputSplit)splits.get(3); - assertEquals("1000", s3.getMin().get("frame")); - assertEquals("500", s3.getMax().get("frame")); + assertEquals("2000", s3.getMin().get("frame")); + assertEquals("2500", s3.getMax().get("frame")); } @Test @@ -148,7 +148,7 @@ public void testCreateSplitsRangeDescending() throws Exception { BasicDBObject splitMin = new BasicDBObject("frame", "2500"); BasicDBObject splitMax = new BasicDBObject("frame", "500"); - List splits = splitter.createSplits(list, splitMin, splitMax, true); + List splits = splitter.createSplits(list, splitMin, splitMax); assertEquals(4, splits.size()); MongoInputSplit s1 = (MongoInputSplit)splits.get(0); @@ -163,4 +163,21 @@ public void testCreateSplitsRangeDescending() throws Exception { assertEquals("1000", s3.getMin().get("frame")); assertEquals("500", s3.getMax().get("frame")); } + + @Test + public void testCreateSingleSplitAscending() throws Exception { + Configuration config = new Configuration(); + StandaloneMongoSplitter splitter = new StandaloneMongoSplitter(config); + + BasicDBList list = new BasicDBList(); + BasicDBObject splitMin = new BasicDBObject("frame", "500"); + BasicDBObject splitMax = new BasicDBObject("frame", "2500"); + + List splits = splitter.createSplits(list, splitMin, splitMax); + assertEquals(1, splits.size()); + + MongoInputSplit s1 = (MongoInputSplit)splits.get(0); + assertEquals("500", s1.getMin().get("frame")); + assertEquals("2500", s1.getMax().get("frame")); + } }