Skip to content

Commit f2c7920

Browse files
Fix for #157
- mongodb.filter support with or without "o." prefix
1 parent acef547 commit f2c7920

File tree

6 files changed

+250
-16
lines changed

6 files changed

+250
-16
lines changed

src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ public void start() {
154154
logger.info(
155155
"starting mongodb stream. options: secondaryreadpreference [{}], drop_collection [{}], include_collection [{}], throttlesize [{}], gridfs [{}], filter [{}], db [{}], collection [{}], script [{}], indexing to [{}]/[{}]",
156156
definition.isMongoSecondaryReadPreference(), definition.isDropCollection(), definition.getIncludeCollection(),
157-
definition.getThrottleSize(), definition.isMongoGridFS(), definition.getMongoFilter(), definition.getMongoDb(),
157+
definition.getThrottleSize(), definition.isMongoGridFS(), definition.getMongoOplogFilter(), definition.getMongoDb(),
158158
definition.getMongoCollection(), definition.getScript(), definition.getIndexName(), definition.getTypeName());
159159

160160
// Create the index if it does not exist

src/main/java/org/elasticsearch/river/mongodb/MongoDBRiverDefinition.java

Lines changed: 74 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import javax.net.ssl.TrustManager;
1717
import javax.net.ssl.X509TrustManager;
1818

19+
import org.bson.BasicBSONObject;
1920
import org.bson.types.BSONTimestamp;
2021
import org.elasticsearch.common.Preconditions;
2122
import org.elasticsearch.common.collect.Maps;
@@ -27,9 +28,12 @@
2728
import org.elasticsearch.script.ExecutableScript;
2829
import org.elasticsearch.script.ScriptService;
2930

31+
import com.mongodb.BasicDBObject;
32+
import com.mongodb.DBObject;
3033
import com.mongodb.MongoClientOptions;
3134
import com.mongodb.ReadPreference;
3235
import com.mongodb.ServerAddress;
36+
import com.mongodb.util.JSON;
3337

3438
public class MongoDBRiverDefinition {
3539

@@ -86,7 +90,8 @@ public class MongoDBRiverDefinition {
8690
private final String mongoDb;
8791
private final String mongoCollection;
8892
private final boolean mongoGridFS;
89-
private final String mongoFilter;
93+
private final BasicDBObject mongoOplogFilter;
94+
private final BasicDBObject mongoCollectionFilter;
9095
// mongodb.credentials
9196
private final String mongoAdminUser;
9297
private final String mongoAdminPassword;
@@ -127,7 +132,8 @@ public static class Builder {
127132
private String mongoDb;
128133
private String mongoCollection;
129134
private boolean mongoGridFS;
130-
private String mongoFilter = "";
135+
private BasicDBObject mongoOplogFilter = new BasicDBObject();
136+
private BasicDBObject mongoCollectionFilter = new BasicDBObject();
131137
// mongodb.credentials
132138
private String mongoAdminUser = "";
133139
private String mongoAdminPassword = "";
@@ -187,8 +193,13 @@ public Builder mongoGridFS(boolean mongoGridFS) {
187193
return this;
188194
}
189195

190-
public Builder mongoFilter(String mongoFilter) {
191-
this.mongoFilter = mongoFilter;
196+
public Builder mongoOplogFilter(BasicDBObject mongoOplogFilter) {
197+
this.mongoOplogFilter = mongoOplogFilter;
198+
return this;
199+
}
200+
201+
public Builder mongoCollectionFilter(BasicDBObject mongoCollectionFilter) {
202+
this.mongoCollectionFilter = mongoCollectionFilter;
192203
return this;
193204
}
194205

@@ -516,9 +527,13 @@ public synchronized static MongoDBRiverDefinition parseSettings(String riverName
516527
builder.mongoCollection(XContentMapValues.nodeStringValue(mongoSettings.get(COLLECTION_FIELD), riverName));
517528
builder.mongoGridFS(XContentMapValues.nodeBooleanValue(mongoSettings.get(GRIDFS_FIELD), false));
518529
if (mongoSettings.containsKey(FILTER_FIELD)) {
519-
builder.mongoFilter(XContentMapValues.nodeStringValue(mongoSettings.get(FILTER_FIELD), ""));
520-
} else {
521-
builder.mongoFilter("");
530+
String filter = XContentMapValues.nodeStringValue(mongoSettings.get(FILTER_FIELD), "");
531+
filter = removePrefix("o.", filter);
532+
builder.mongoCollectionFilter(convertToBasicDBObject(filter));
533+
// DBObject bsonObject = (DBObject) JSON.parse(filter);
534+
builder.mongoOplogFilter(convertToBasicDBObject(addPrefix("o.", filter)));
535+
// } else {
536+
// builder.mongoOplogFilter("");
522537
}
523538

524539
if (mongoSettings.containsKey(SCRIPT_FIELD)) {
@@ -596,6 +611,50 @@ public void checkClientTrusted(X509Certificate[] chain, String authType) throws
596611
return SSLSocketFactory.getDefault();
597612
}
598613

614+
static BasicDBObject convertToBasicDBObject
615+
(String object) {
616+
if (object == null || object.length() == 0) {
617+
return new BasicDBObject();
618+
} else {
619+
return (BasicDBObject)JSON.parse(object);
620+
}
621+
}
622+
623+
static String removePrefix(String prefix, String object) {
624+
return addRemovePrefix(prefix, object, false);
625+
}
626+
627+
static String addPrefix(String prefix, String object) {
628+
return addRemovePrefix(prefix, object, true);
629+
}
630+
631+
static String addRemovePrefix(String prefix, String object, boolean add) {
632+
if (prefix == null) {
633+
throw new IllegalArgumentException("prefix");
634+
}
635+
if (object == null) {
636+
throw new NullPointerException("object");
637+
}
638+
if (object.length() == 0) {
639+
return "";
640+
}
641+
DBObject bsonObject = (DBObject) JSON.parse(object);
642+
643+
BasicBSONObject newObject = new BasicBSONObject();
644+
for (String key : bsonObject.keySet()) {
645+
if (add) {
646+
newObject.put(prefix + key, bsonObject.get(key));
647+
} else {
648+
if (key.startsWith(prefix)) {
649+
newObject.put(key.substring(prefix.length()), bsonObject.get(key));
650+
} else {
651+
newObject.put(key, bsonObject.get(key));
652+
}
653+
}
654+
}
655+
return newObject.toString();
656+
}
657+
599658
private MongoDBRiverDefinition(final Builder builder) {
600659
// river
601660
this.riverName = builder.riverName;
@@ -607,7 +666,8 @@ private MongoDBRiverDefinition(final Builder builder) {
607666
this.mongoDb = builder.mongoDb;
608667
this.mongoCollection = builder.mongoCollection;
609668
this.mongoGridFS = builder.mongoGridFS;
610-
this.mongoFilter = builder.mongoFilter;
669+
this.mongoOplogFilter = builder.mongoOplogFilter;
670+
this.mongoCollectionFilter = builder.mongoCollectionFilter;
611671
// mongodb.credentials
612672
this.mongoAdminUser = builder.mongoAdminUser;
613673
this.mongoAdminPassword = builder.mongoAdminPassword;
@@ -664,8 +724,12 @@ public boolean isMongoGridFS() {
664724
return mongoGridFS;
665725
}
666726

667-
public String getMongoFilter() {
668-
return mongoFilter;
727+
public BasicDBObject getMongoOplogFilter() {
728+
return mongoOplogFilter;
729+
}
730+
731+
public BasicDBObject getMongoCollectionFilter() {
732+
return mongoCollectionFilter;
669733
}
670734

671735
public String getMongoAdminUser() {

src/main/java/org/elasticsearch/river/mongodb/Slurper.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import com.mongodb.gridfs.GridFS;
3131
import com.mongodb.gridfs.GridFSDBFile;
3232
import com.mongodb.gridfs.GridFSFile;
33-
import com.mongodb.util.JSON;
3433

3534
class Slurper implements Runnable {
3635

@@ -130,7 +129,7 @@ protected boolean isIndexEmpty() {
130129
* Does an initial sync the same way MongoDB does.
131130
* https://groups.google.com/
132131
* forum/?fromgroups=#!topic/mongodb-user/sOKlhD_E2ns
133-
*
132+
*
134133
* @return the last oplog timestamp before the import began
135134
* @throws InterruptedException
136135
* if the blocking queue stream is interrupted while waiting
@@ -142,7 +141,7 @@ protected BSONTimestamp doInitialImport() throws InterruptedException {
142141
DBCursor cursor = null;
143142
try {
144143
if (!definition.isMongoGridFS()) {
145-
cursor = slurpedCollection.find();
144+
cursor = slurpedCollection.find(definition.getMongoCollectionFilter());
146145
while (cursor.hasNext()) {
147146
DBObject object = cursor.next();
148147
addToStream(MongoDBRiver.OPLOG_INSERT_OPERATION, null, applyFieldFilter(object));
@@ -352,7 +351,7 @@ private DBObject getOplogFilter(final BSONTimestamp time) {
352351
+ MongoDBRiver.OPLOG_NAMESPACE_COMMAND));
353352
filter.put(MongoDBRiver.MONGODB_OR_OPERATOR, values2);
354353
}
355-
if (!definition.getMongoFilter().isEmpty()) {
354+
if (definition.getMongoOplogFilter().size() > 0) {
356355
filter.putAll(getMongoFilter());
357356
}
358357
if (logger.isDebugEnabled()) {
@@ -376,7 +375,7 @@ private DBObject getMongoFilter() {
376375
filters2.add(new BasicDBObject(MongoDBRiver.MONGODB_OR_OPERATOR, filters3));
377376

378377
// include custom filter in filters2
379-
filters2.add((DBObject) JSON.parse(definition.getMongoFilter()));
378+
filters2.add(definition.getMongoOplogFilter());
380379

381380
filters.add(new BasicDBObject(MongoDBRiver.MONGODB_AND_OPERATOR, filters2));
382381

src/test/java/org/elasticsearch/river/mongodb/MongoDBRiverDefinitionTest.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@
1111
import org.testng.Assert;
1212
import org.testng.annotations.Test;
1313

14+
import com.mongodb.BasicDBObject;
1415
import com.mongodb.ServerAddress;
16+
import com.mongodb.util.JSON;
1517

1618
public class MongoDBRiverDefinitionTest {
1719

@@ -67,4 +69,28 @@ public void testLoadMongoDBRiverDefinitionIssue159() {
6769
Assert.fail("testLoadMongoDBRiverDefinitionIssue159 failed", t);
6870
}
6971
}
72+
73+
@Test
74+
public void parseFilter() {
75+
String filter = "{\"o.lang\":\"de\"}";
76+
BasicDBObject bsonFilter = (BasicDBObject) JSON.parse(filter);
77+
String filterNoPrefix = MongoDBRiverDefinition.removePrefix("o.", filter);
78+
Assert.assertNotNull(filterNoPrefix);
79+
80+
BasicDBObject bsonFilterNoPrefix = (BasicDBObject) JSON.parse(filterNoPrefix);
81+
Assert.assertNotNull(bsonFilterNoPrefix);
82+
83+
// call a second time trimPrefix has no effect
84+
String filterNoPrefix2 = MongoDBRiverDefinition.removePrefix("o.", bsonFilterNoPrefix.toString());
85+
Assert.assertNotNull(filterNoPrefix2);
86+
BasicDBObject bsonFilterNoPrefix2 = (BasicDBObject) JSON.parse(filterNoPrefix2);
87+
Assert.assertEquals(bsonFilterNoPrefix, bsonFilterNoPrefix2);
88+
89+
String filterWithPrefix = MongoDBRiverDefinition.addPrefix("o.", filterNoPrefix);
90+
BasicDBObject bsonFilterWithPrefix = (BasicDBObject) JSON.parse(filterWithPrefix);
91+
Assert.assertNotNull(bsonFilterWithPrefix);
92+
// trimPrefix + addPrefix returns the original bson
93+
Assert.assertEquals(bsonFilter, bsonFilterWithPrefix);
94+
}
95+
7096
}
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Licensed to Elastic Search and Shay Banon under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. Elastic Search licenses this
6+
* file to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.river.mongodb.simple;
20+
21+
import static org.elasticsearch.client.Requests.countRequest;
22+
import static org.hamcrest.MatcherAssert.assertThat;
23+
import static org.hamcrest.Matchers.equalTo;
24+
25+
import org.elasticsearch.action.ActionFuture;
26+
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
27+
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
28+
import org.elasticsearch.river.mongodb.RiverMongoDBTestAbstract;
29+
import org.testng.Assert;
30+
import org.testng.annotations.Test;
31+
32+
import com.google.common.collect.ImmutableMap;
33+
import com.mongodb.BasicDBObject;
34+
import com.mongodb.DB;
35+
import com.mongodb.DBCollection;
36+
import com.mongodb.DBObject;
37+
import com.mongodb.WriteConcern;
38+
import com.mongodb.WriteResult;
39+
40+
@Test
41+
public class RiverMongoCollectionFilterTest extends RiverMongoDBTestAbstract {
42+
43+
private static final String TEST_SIMPLE_MONGODB_RIVER_COLLECTION_FILTER_JSON = "/org/elasticsearch/river/mongodb/simple/test-simple-mongodb-river-collection-filter.json";
44+
private DB mongoDB;
45+
private DBCollection mongoCollection;
46+
private Object collectionFilterWithPrefix = "{'o.lang':'de'}";
47+
private Object collectionFilterNoPrefix = "{'lang':'de'}";
48+
49+
protected RiverMongoCollectionFilterTest() {
50+
super("testmongodb-" + System.currentTimeMillis(), "testriver-" + System.currentTimeMillis(), "person-"
51+
+ System.currentTimeMillis(), "personindex-" + System.currentTimeMillis());
52+
}
53+
54+
@Test
55+
public void collectionFilterWithPrefixTest() throws Throwable {
56+
collectionFilterTest(collectionFilterWithPrefix);
57+
}
58+
59+
@Test
60+
public void collectionFilterNoPrefixTest() throws Throwable {
61+
collectionFilterTest(collectionFilterNoPrefix);
62+
}
63+
64+
private void collectionFilterTest(Object filter) throws Throwable {
65+
logger.debug("Start CollectionFilter");
66+
try {
67+
createDatabase();
68+
69+
DBObject dbObject1 = new BasicDBObject(ImmutableMap.of("name", "Bernd", "lang", "de"));
70+
WriteResult result1 = mongoCollection.insert(dbObject1);
71+
logger.info("WriteResult: {}", result1.toString());
72+
dbObject1 = new BasicDBObject(ImmutableMap.of("name", "Richard", "lang", "fr"));
73+
result1 = mongoCollection.insert(dbObject1);
74+
logger.info("WriteResult: {}", result1.toString());
75+
Thread.sleep(wait);
76+
77+
createRiver(filter);
78+
Thread.sleep(wait);
79+
80+
ActionFuture<IndicesExistsResponse> response = getNode().client().admin().indices()
81+
.exists(new IndicesExistsRequest(getIndex()));
82+
assertThat(response.actionGet().isExists(), equalTo(true));
83+
refreshIndex();
84+
assertThat(getNode().client().count(countRequest(getIndex())).actionGet().getCount(), equalTo(1l));
85+
86+
deleteRiver();
87+
} catch (Throwable t) {
88+
logger.error("CollectionFilter failed.", t);
89+
t.printStackTrace();
90+
throw t;
91+
} finally {
92+
cleanUp();
93+
}
94+
}
95+
96+
private void createDatabase() {
97+
logger.debug("createDatabase {}", getDatabase());
98+
try {
99+
mongoDB = getMongo().getDB(getDatabase());
100+
mongoDB.setWriteConcern(WriteConcern.REPLICAS_SAFE);
101+
logger.info("Start createCollection");
102+
mongoCollection = mongoDB.createCollection(getCollection(), null);
103+
Assert.assertNotNull(mongoCollection);
104+
} catch (Throwable t) {
105+
logger.error("createDatabase failed.", t);
106+
}
107+
}
108+
109+
private void createRiver(Object filter) throws Exception {
110+
super.createRiver(TEST_SIMPLE_MONGODB_RIVER_COLLECTION_FILTER_JSON, getRiver(), (Object) String.valueOf(getMongoPort1()),
111+
(Object) String.valueOf(getMongoPort2()), (Object) String.valueOf(getMongoPort3()), (Object) getDatabase(),
112+
(Object) getCollection(), filter, (Object) getIndex(), (Object) getDatabase());
113+
}
114+
115+
private void cleanUp() {
116+
super.deleteRiver();
117+
logger.info("Drop database " + mongoDB.getName());
118+
mongoDB.dropDatabase();
119+
}
120+
121+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
{
2+
"type": "mongodb",
3+
"mongodb": {
4+
"servers": [{
5+
"host": "localhost",
6+
"port": %s
7+
},
8+
{
9+
"host": "localhost",
10+
"port": %s
11+
},
12+
{
13+
"host": "localhost",
14+
"port": %s
15+
}],
16+
"db": "%s",
17+
"collection": "%s",
18+
"filter": "%s"
19+
},
20+
"index": {
21+
"name": "%s",
22+
"throttle_size": 2000
23+
}
24+
}

0 commit comments

Comments
 (0)