Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class MongoDBRiverDefinition {
public final static String ADVANCED_TRANSFORMATION_FIELD = "advanced_transformation";
public final static String PARENT_TYPES_FIELD = "parent_types";
public final static String FILTER_FIELD = "filter";
public final static String IMPORT_FILTER_FIELD = "import_filter";
public final static String CREDENTIALS_FIELD = "credentials";
public final static String USER_FIELD = "user";
public final static String PASSWORD_FIELD = "password";
Expand Down Expand Up @@ -87,6 +88,7 @@ public class MongoDBRiverDefinition {
private final String mongoCollection;
private final boolean mongoGridFS;
private final String mongoFilter;
private final String mongoImportFilter;
// mongodb.credentials
private final String mongoAdminUser;
private final String mongoAdminPassword;
Expand Down Expand Up @@ -128,6 +130,7 @@ public static class Builder {
private String mongoCollection;
private boolean mongoGridFS;
private String mongoFilter = "";
private String mongoImportFilter = "";
// mongodb.credentials
private String mongoAdminUser = "";
private String mongoAdminPassword = "";
Expand Down Expand Up @@ -192,6 +195,11 @@ public Builder mongoFilter(String mongoFilter) {
return this;
}

public Builder mongoImportFilter(String mongoImportFilter) {
this.mongoImportFilter = mongoImportFilter;
return this;
}

public Builder mongoAdminUser(String mongoAdminUser) {
this.mongoAdminUser = mongoAdminUser;
return this;
Expand Down Expand Up @@ -521,6 +529,12 @@ public synchronized static MongoDBRiverDefinition parseSettings(String riverName
builder.mongoFilter("");
}

if (mongoSettings.containsKey(IMPORT_FILTER_FIELD)) {
builder.mongoImportFilter(XContentMapValues.nodeStringValue(mongoSettings.get(IMPORT_FILTER_FIELD), ""));
} else {
builder.mongoImportFilter("");
}

if (mongoSettings.containsKey(SCRIPT_FIELD)) {
String scriptType = "js";
builder.script(mongoSettings.get(SCRIPT_FIELD).toString());
Expand Down Expand Up @@ -608,6 +622,7 @@ private MongoDBRiverDefinition(final Builder builder) {
this.mongoCollection = builder.mongoCollection;
this.mongoGridFS = builder.mongoGridFS;
this.mongoFilter = builder.mongoFilter;
this.mongoImportFilter = builder.mongoImportFilter;
// mongodb.credentials
this.mongoAdminUser = builder.mongoAdminUser;
this.mongoAdminPassword = builder.mongoAdminPassword;
Expand Down Expand Up @@ -668,6 +683,10 @@ public String getMongoFilter() {
return mongoFilter;
}

public String getMongoImportFilter() {
return mongoImportFilter;
}

public String getMongoAdminUser() {
return mongoAdminUser;
}
Expand Down
8 changes: 7 additions & 1 deletion src/main/java/org/elasticsearch/river/mongodb/Slurper.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,15 @@ protected BSONTimestamp doInitialImport() throws InterruptedException {
logger.info("MongoDBRiver is beginning initial import of " + slurpedCollection.getFullName());
BSONTimestamp startTimestamp = getCurrentOplogTimestamp();
DBCursor cursor = null;
DBObject filter = new BasicDBObject();

if (!definition.getMongoImportFilter().isEmpty()) {
filter = (DBObject) JSON.parse(definition.getMongoImportFilter());
}

try {
if (!definition.isMongoGridFS()) {
cursor = slurpedCollection.find();
cursor = slurpedCollection.find(filter);
while (cursor.hasNext()) {
DBObject object = cursor.next();
addToStream(MongoDBRiver.OPLOG_INSERT_OPERATION, null, applyFieldFilter(object));
Expand Down