Skip to content

Commit 59d8832

Browse files
committed
code consistency / minor cleanups, README updates
1 parent 5992713 commit 59d8832

File tree

7 files changed

+100
-55
lines changed

7 files changed

+100
-55
lines changed

README.rst

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,12 +90,26 @@ Configuration options
9090
---------------------
9191

9292
``region``
93-
AWS region for the source DynamoDB.
93+
AWS region for DynamoDB.
9494

9595
* Type: string
9696
* Default: ""
9797
* Importance: high
9898

99+
``access.key.id``
100+
Explicit AWS access key ID. Leave empty to utilize the default credential provider chain.
101+
102+
* Type: password
103+
* Default: [hidden]
104+
* Importance: low
105+
106+
``secret.key``
107+
Explicit AWS secret access key. Leave empty to utilize the default credential provider chain.
108+
109+
* Type: password
110+
* Default: [hidden]
111+
* Importance: low
112+
99113
``batch.size``
100114
Batch size between 1 (dedicated ``PutItemRequest`` for each record) and 25 (which is the maximum number of items in a ``BatchWriteItemRequest``)
101115

@@ -186,12 +200,26 @@ Configuration options
186200
---------------------
187201

188202
``region``
189-
AWS region for the source DynamoDB.
203+
AWS region for DynamoDB.
190204

191205
* Type: string
192206
* Default: ""
193207
* Importance: high
194208

209+
``access.key.id``
210+
Explicit AWS access key ID. Leave empty to utilize the default credential provider chain.
211+
212+
* Type: password
213+
* Default: [hidden]
214+
* Importance: low
215+
216+
``secret.key``
217+
Explicit AWS secret access key. Leave empty to utilize the default credential provider chain.
218+
219+
* Type: password
220+
* Default: [hidden]
221+
* Importance: low
222+
195223
``topic.format``
196224
Format string for destination Kafka topic, use ``${table}`` as placeholder for source table name.
197225

src/main/java/dynamok/sink/ConnectorConfig.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.kafka.common.config.ConfigDef;
2222
import org.apache.kafka.common.config.ConfigException;
2323
import org.apache.kafka.common.config.types.Password;
24-
import org.apache.kafka.common.metrics.stats.Rate;
2524

2625
import java.util.Arrays;
2726
import java.util.Iterator;
@@ -34,7 +33,7 @@ private enum Keys {
3433
;
3534
static final String REGION = "region";
3635
static final String ACCESS_KEY_ID = "access.key.id";
37-
static final String SECRET_KEY_ID = "secret.key.id";
36+
static final String SECRET_KEY = "secret.key";
3837
static final String TABLE_FORMAT = "table.format";
3938
static final String BATCH_SIZE = "batch.size";
4039
static final String KAFKA_ATTRIBUTES = "kafka.attributes";
@@ -48,16 +47,16 @@ private enum Keys {
4847

4948
static final ConfigDef CONFIG_DEF = new ConfigDef()
5049
.define(Keys.REGION, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, (key, regionName) -> {
51-
if (!Arrays.stream(Regions.values()).anyMatch(x -> x.getName().equals(regionName))) {
50+
if (Arrays.stream(Regions.values()).noneMatch(x -> x.getName().equals(regionName))) {
5251
throw new ConfigException("Invalid AWS region: " + regionName);
5352
}
54-
}, ConfigDef.Importance.HIGH, "AWS region for the source DynamoDB.")
53+
}, ConfigDef.Importance.HIGH, "AWS region for DynamoDB.")
5554
.define(Keys.ACCESS_KEY_ID, ConfigDef.Type.PASSWORD, "",
56-
ConfigDef.Importance.LOW, "Explicit AWS Access Credentials. " +
57-
"Leave empty to utilize the default credential provider chain")
58-
.define(Keys.SECRET_KEY_ID, ConfigDef.Type.PASSWORD, "",
59-
ConfigDef.Importance.LOW, "Explicit AWS Secret Access Credentials. " +
60-
"Leave empty to utilize the default credential provider chain")
55+
ConfigDef.Importance.LOW, "Explicit AWS access key ID. " +
56+
"Leave empty to utilize the default credential provider chain.")
57+
.define(Keys.SECRET_KEY, ConfigDef.Type.PASSWORD, "",
58+
ConfigDef.Importance.LOW, "Explicit AWS secret access key. " +
59+
"Leave empty to utilize the default credential provider chain.")
6160
.define(Keys.TABLE_FORMAT, ConfigDef.Type.STRING, "${topic}",
6261
ConfigDef.Importance.HIGH, "Format string for destination DynamoDB table name, use ``${topic}`` as placeholder for source topic.")
6362
.define(Keys.BATCH_SIZE, ConfigDef.Type.INT, 1, ConfigDef.Range.between(1, 25),
@@ -85,7 +84,7 @@ private enum Keys {
8584

8685
final Regions region;
8786
final Password accessKeyId;
88-
final Password secretKeyId;
87+
final Password secretKey;
8988
final String tableFormat;
9089
final int batchSize;
9190
final KafkaCoordinateNames kafkaCoordinateNames;
@@ -100,7 +99,7 @@ private enum Keys {
10099
super(config, parsedConfig);
101100
region = Regions.fromName(getString(Keys.REGION));
102101
accessKeyId = getPassword(Keys.ACCESS_KEY_ID);
103-
secretKeyId = getPassword(Keys.SECRET_KEY_ID);
102+
secretKey = getPassword(Keys.SECRET_KEY);
104103
tableFormat = getString(Keys.TABLE_FORMAT);
105104
batchSize = getInt(Keys.BATCH_SIZE);
106105
kafkaCoordinateNames = kafkaCoordinateNamesFromConfig(getList(Keys.KAFKA_ATTRIBUTES));

src/main/java/dynamok/sink/DynamoDbSinkTask.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,20 @@
1616

1717
package dynamok.sink;
1818

19-
import com.amazonaws.auth.AWSCredentials;
2019
import com.amazonaws.auth.BasicAWSCredentials;
20+
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
2121
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
22-
import com.amazonaws.services.dynamodbv2.model.*;
22+
import com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException;
23+
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
24+
import com.amazonaws.services.dynamodbv2.model.BatchWriteItemRequest;
25+
import com.amazonaws.services.dynamodbv2.model.BatchWriteItemResult;
26+
import com.amazonaws.services.dynamodbv2.model.LimitExceededException;
27+
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException;
28+
import com.amazonaws.services.dynamodbv2.model.PutRequest;
29+
import com.amazonaws.services.dynamodbv2.model.WriteRequest;
2330
import dynamok.Version;
2431
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2532
import org.apache.kafka.common.TopicPartition;
26-
import org.apache.kafka.common.serialization.IntegerDeserializer;
27-
import org.apache.kafka.common.utils.SystemTime;
2833
import org.apache.kafka.connect.data.Schema;
2934
import org.apache.kafka.connect.errors.ConnectException;
3035
import org.apache.kafka.connect.errors.DataException;
@@ -71,11 +76,11 @@ String topAttributeName(ConnectorConfig config) {
7176
public void start(Map<String, String> props) {
7277
config = new ConnectorConfig(props);
7378

74-
if (config.accessKeyId.value().isEmpty() || config.secretKeyId.value().isEmpty()) {
75-
client = new AmazonDynamoDBClient();
76-
log.debug("AmazonDynamoDBClient created with default credentials");
79+
if (config.accessKeyId.value().isEmpty() || config.secretKey.value().isEmpty()) {
80+
client = new AmazonDynamoDBClient(DefaultAWSCredentialsProviderChain.getInstance());
81+
log.debug("AmazonDynamoDBStreamsClient created with DefaultAWSCredentialsProviderChain");
7782
} else {
78-
BasicAWSCredentials awsCreds = new BasicAWSCredentials(config.accessKeyId.value(), config.secretKeyId.value());
83+
final BasicAWSCredentials awsCreds = new BasicAWSCredentials(config.accessKeyId.value(), config.secretKey.value());
7984
client = new AmazonDynamoDBClient(awsCreds);
8085
log.debug("AmazonDynamoDBClient created with AWS credentials from connector configuration");
8186
}

src/main/java/dynamok/source/ConnectorConfig.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ private enum Keys {
3333
;
3434
static final String REGION = "region";
3535
static final String ACCESS_KEY_ID = "access.key.id";
36-
static final String SECRET_KEY_ID = "secret.key.id";
36+
static final String SECRET_KEY = "secret.key";
3737
static final String TABLES_PREFIX = "tables.prefix";
3838
static final String TABLES_WHITELIST = "tables.whitelist";
3939
static final String TABLES_BLACKLIST = "tables.blacklist";
@@ -42,16 +42,16 @@ private enum Keys {
4242

4343
static final ConfigDef CONFIG_DEF = new ConfigDef()
4444
.define(Keys.REGION, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, (key, regionName) -> {
45-
if (!Arrays.stream(Regions.values()).anyMatch(x -> x.getName().equals(regionName))) {
45+
if (Arrays.stream(Regions.values()).noneMatch(x -> x.getName().equals(regionName))) {
4646
throw new ConfigException("Invalid AWS region: " + regionName);
4747
}
48-
}, ConfigDef.Importance.HIGH, "AWS region for the source DynamoDB.")
48+
}, ConfigDef.Importance.HIGH, "AWS region for DynamoDB.")
4949
.define(Keys.ACCESS_KEY_ID, ConfigDef.Type.PASSWORD, "",
50-
ConfigDef.Importance.LOW, "Explicit AWS Access Credentials. " +
51-
"Leave empty to utilize the default credential provider chain")
52-
.define(Keys.SECRET_KEY_ID, ConfigDef.Type.PASSWORD, "",
53-
ConfigDef.Importance.LOW, "Explicit AWS Secret Access Credentials. " +
54-
"Leave empty to utilize the default credential provider chain")
50+
ConfigDef.Importance.LOW, "Explicit AWS access key ID. " +
51+
"Leave empty to utilize the default credential provider chain.")
52+
.define(Keys.SECRET_KEY, ConfigDef.Type.PASSWORD, "",
53+
ConfigDef.Importance.LOW, "Explicit AWS secret access key. " +
54+
"Leave empty to utilize the default credential provider chain.")
5555
.define(Keys.TABLES_PREFIX, ConfigDef.Type.STRING, "",
5656
ConfigDef.Importance.MEDIUM, "Prefix for DynamoDB tables to source from.")
5757
.define(Keys.TABLES_WHITELIST, ConfigDef.Type.LIST, Collections.emptyList(),
@@ -63,7 +63,7 @@ private enum Keys {
6363

6464
final Regions region;
6565
final Password accessKeyId;
66-
final Password secretKeyId;
66+
final Password secretKey;
6767
final String topicFormat;
6868
final String tablesPrefix;
6969
final List<String> tablesWhitelist;
@@ -73,7 +73,7 @@ private enum Keys {
7373
super(CONFIG_DEF, props);
7474
region = Regions.fromName(getString(Keys.REGION));
7575
accessKeyId = getPassword(Keys.ACCESS_KEY_ID);
76-
secretKeyId = getPassword(Keys.SECRET_KEY_ID);
76+
secretKey = getPassword(Keys.SECRET_KEY);
7777
tablesPrefix = getString(Keys.TABLES_PREFIX);
7878
tablesWhitelist = getList(Keys.TABLES_WHITELIST);
7979
tablesBlacklist = getList(Keys.TABLES_BLACKLIST);

src/main/java/dynamok/source/DynamoDbSourceConnector.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616

1717
package dynamok.source;
1818

19-
import com.amazonaws.auth.AWSCredentials;
2019
import com.amazonaws.auth.BasicAWSCredentials;
20+
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
2121
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
2222
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient;
2323
import com.amazonaws.services.dynamodbv2.model.DescribeStreamRequest;
@@ -80,12 +80,12 @@ public void start(Map<String, String> props) {
8080
final AmazonDynamoDBClient client;
8181
final AmazonDynamoDBStreamsClient streamsClient;
8282

83-
if (config.accessKeyId.value().isEmpty() || config.secretKeyId.value().isEmpty()) {
84-
client = new AmazonDynamoDBClient();
85-
streamsClient = new AmazonDynamoDBStreamsClient();
86-
log.debug("AmazonDynamoDB clients created with default credentials");
83+
if (config.accessKeyId.value().isEmpty() || config.secretKey.value().isEmpty()) {
84+
client = new AmazonDynamoDBClient(DefaultAWSCredentialsProviderChain.getInstance());
85+
streamsClient = new AmazonDynamoDBStreamsClient(DefaultAWSCredentialsProviderChain.getInstance());
86+
log.debug("AmazonDynamoDBStreamsClient created with DefaultAWSCredentialsProviderChain");
8787
} else {
88-
BasicAWSCredentials awsCreds = new BasicAWSCredentials(config.accessKeyId.value(), config.secretKeyId.value());
88+
final BasicAWSCredentials awsCreds = new BasicAWSCredentials(config.accessKeyId.value(), config.secretKey.value());
8989
client = new AmazonDynamoDBClient(awsCreds);
9090
streamsClient = new AmazonDynamoDBStreamsClient(awsCreds);
9191
log.debug("AmazonDynamoDB clients created with AWS credentials from connector configuration");

src/main/java/dynamok/source/DynamoDbSourceTask.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,8 @@
1616

1717
package dynamok.source;
1818

19-
import com.amazonaws.auth.AWSCredentials;
2019
import com.amazonaws.auth.BasicAWSCredentials;
21-
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient;
20+
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
2221
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient;
2322
import com.amazonaws.services.dynamodbv2.model.GetRecordsRequest;
2423
import com.amazonaws.services.dynamodbv2.model.GetRecordsResult;
@@ -61,11 +60,11 @@ private enum Keys {
6160
public void start(Map<String, String> props) {
6261
config = new TaskConfig(props);
6362

64-
if (config.accessKeyId.toString().isEmpty() || config.secretKeyId.toString().isEmpty()) {
65-
streamsClient = new AmazonDynamoDBStreamsClient();
66-
log.debug("AmazonDynamoDBStreamsClient created with default credentials");
63+
if (config.accessKeyId.isEmpty() || config.secretKey.isEmpty()) {
64+
streamsClient = new AmazonDynamoDBStreamsClient(DefaultAWSCredentialsProviderChain.getInstance());
65+
log.debug("AmazonDynamoDBStreamsClient created with DefaultAWSCredentialsProviderChain");
6766
} else {
68-
BasicAWSCredentials awsCreds = new BasicAWSCredentials(config.accessKeyId.toString(), config.secretKeyId.toString());
67+
final BasicAWSCredentials awsCreds = new BasicAWSCredentials(config.accessKeyId, config.secretKey);
6968
streamsClient = new AmazonDynamoDBStreamsClient(awsCreds);
7069
log.debug("AmazonDynamoDBStreamsClient created with AWS credentials from connector configuration");
7170
}

src/main/java/dynamok/source/TaskConfig.java

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,49 +17,63 @@
1717
package dynamok.source;
1818

1919
import com.amazonaws.regions.Regions;
20-
import org.apache.kafka.common.config.AbstractConfig;
2120
import org.apache.kafka.common.config.ConfigException;
2221

2322
import java.util.Arrays;
2423
import java.util.List;
2524
import java.util.Map;
2625
import java.util.stream.Collectors;
2726

28-
class TaskConfig extends AbstractConfig {
27+
class TaskConfig {
2928

3029
enum Keys {
3130
;
3231

3332
static String REGION = "region";
3433
static String ACCESS_KEY_ID = "access.key.id";
35-
static String SECRET_KEY_ID = "secret.key.id";
34+
static String SECRET_KEY = "secret.key";
3635
static String TOPIC_FORMAT = "topic.format";
3736
static String SHARDS = "shards";
3837
static String TABLE = "table";
3938
static String STREAM_ARN = "stream.arn";
4039
}
4140

41+
private final Map<String, String> props;
42+
4243
final Regions region;
4344
final String accessKeyId;
44-
final String secretKeyId;
45+
final String secretKey;
4546
final String topicFormat;
4647
final List<String> shards;
4748

4849
TaskConfig(Map<String, String> props) {
49-
super((Map) props);
50-
region = Regions.fromName(getString(Keys.REGION));
51-
accessKeyId = props.getOrDefault(Keys.ACCESS_KEY_ID, "");
52-
secretKeyId = props.getOrDefault(Keys.SECRET_KEY_ID, "");
53-
topicFormat = getString(Keys.TOPIC_FORMAT);
54-
shards = Arrays.stream(getString(Keys.SHARDS).split(",")).filter(shardId -> !shardId.isEmpty()).collect(Collectors.toList());
50+
this.props = props;
51+
52+
region = Regions.fromName(getConfig(Keys.REGION));
53+
accessKeyId = getConfig(Keys.ACCESS_KEY_ID, "");
54+
secretKey = getConfig(Keys.SECRET_KEY, "");
55+
topicFormat = getConfig(Keys.TOPIC_FORMAT);
56+
shards = Arrays.stream(getConfig(Keys.SHARDS).split(",")).filter(shardId -> !shardId.isEmpty()).collect(Collectors.toList());
5557
}
5658

5759
String tableForShard(String shardId) {
58-
return getString(shardId + "." + Keys.TABLE);
60+
return getConfig(shardId + "." + Keys.TABLE);
5961
}
6062

6163
String streamArnForShard(String shardId) {
62-
return getString(shardId + "." + Keys.STREAM_ARN);
64+
return getConfig(shardId + "." + Keys.STREAM_ARN);
65+
}
66+
67+
private String getConfig(String key) {
68+
final String value = props.get(key);
69+
if (value == null) {
70+
throw new ConfigException(key, "Missing task configuration");
71+
}
72+
return value;
73+
}
74+
75+
private String getConfig(String key, String defaultValue) {
76+
return props.getOrDefault(key, defaultValue);
6377
}
6478

6579
}

0 commit comments

Comments
 (0)