Skip to content

Commit 81156dc

Browse files
authored
DP-3392: Option to lowercase field names (#64)
* DP-3392: Accept a fieldname transformer in RecursiveConversion * DP-3392: Implement conversion parsing * DP-3392: Add tests in record transformer for converting field to lowercase * DP-3392: Clean up RecordTransformerTest
1 parent f020cc8 commit 81156dc

File tree

10 files changed

+218
-115
lines changed

10 files changed

+218
-115
lines changed

connector/src/main/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfigConstants.scala

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -158,42 +158,47 @@ object EmsSinkConfigConstants {
158158
val CLOSE_EVERY_CONNECTION_DOC = "Connection pool - Explicitly close connections"
159159
val CLOSE_EVERY_CONNECTION_DEFAULT_VALUE = false
160160

161-
val FLATTENER_ENABLE_KEY = s"${CONNECTOR_PREFIX}.flattener.enable"
161+
val FLATTENER_ENABLE_KEY = s"$CONNECTOR_PREFIX.flattener.enable"
162162
val FLATTENER_ENABLE_DOC =
163163
s"Enable flattening of nested records. This is likely to be needed if the source data contains nested objects or collections."
164164
val FLATTENER_ENABLE_DEFAULT = true
165165

166-
val FLATTENER_DISCARD_COLLECTIONS_KEY = s"${CONNECTOR_PREFIX}.flattener.collections.discard"
166+
val FLATTENER_DISCARD_COLLECTIONS_KEY = s"$CONNECTOR_PREFIX.flattener.collections.discard"
167167
val FLATTENER_DISCARD_COLLECTIONS_DOC =
168168
"Discard array and map fields at any level of depth. Note that the default handling of collections by the flattener function is to JSON-encode them as nullable STRING fields."
169169
val FLATTENER_DISCARD_COLLECTIONS_DEFAULT = false
170170

171-
val FLATTENER_JSONBLOB_CHUNKS_KEY = s"${CONNECTOR_PREFIX}.flattener.jsonblob.chunks"
171+
val FLATTENER_JSONBLOB_CHUNKS_KEY = s"$CONNECTOR_PREFIX.flattener.jsonblob.chunks"
172172
val FLATTENER_JSONBLOB_CHUNKS_DOC =
173173
"Encodes the record into a JSON blob broken down into N VARCHAR fields (e.g. `payload_chunk1`, `payload_chunk2`, `...`, `payload_chunkN`)."
174174
val FLATTENER_JSONBLOB_CHUNKS_DEFAULT = null
175175

176-
val DECIMAL_CONVERSION_KEY = s"${CONNECTOR_PREFIX}.convert.decimals.to.double"
177-
val DECIMAL_CONVERSION_KEY_DOC =
176+
val DECIMAL_CONVERSION_KEY = s"$CONNECTOR_PREFIX.convert.decimals.to.double"
177+
val DECIMAL_CONVERSION_DOC =
178178
s"Convert decimal values into doubles. Valid only for formats with schema (AVRO, Protobuf, JsonSchema)"
179-
val DECIMAL_CONVERSION_KEY_DEFAULT = false
179+
val DECIMAL_CONVERSION_DEFAULT = false
180180

181-
val NULL_PK_KEY = s"${CONNECTOR_PREFIX}.allow.null.pk"
181+
val TRANSFORM_FIELDS_LOWERCASE_KEY = s"$CONNECTOR_PREFIX.convert.lowercase.fields"
182+
val TRANSFORM_FIELDS_LOWERCASE_DOC =
183+
s"Convert all fields to lowercase"
184+
val TRANSFORM_FIELDS_LOWERCASE_DEFAULT = false
185+
186+
val NULL_PK_KEY = s"$CONNECTOR_PREFIX.allow.null.pk"
182187
val NULL_PK_KEY_DOC =
183188
s"Allow parsing messages with null values in the columns listed as primary keys. If disabled connector will fail after receiving such a message. NOTE: enabling that will cause data inconsistency issue on the EMS side."
184189
val NULL_PK_KEY_DEFAULT = false
185190

186-
val EMBED_KAFKA_EMBEDDED_METADATA_KEY = s"${CONNECTOR_PREFIX}.embed.kafka.metadata"
191+
val EMBED_KAFKA_EMBEDDED_METADATA_KEY = s"$CONNECTOR_PREFIX.embed.kafka.metadata"
187192
val EMBED_KAFKA_EMBEDDED_METADATA_DOC =
188193
"Embed Kafka metadata such as partition, offset and timestamp as additional record fields."
189194
val EMBED_KAFKA_EMBEDDED_METADATA_DEFAULT = true
190195

191-
val USE_IN_MEMORY_FS_KEY = s"${CONNECTOR_PREFIX}.inmemfs.enable"
196+
val USE_IN_MEMORY_FS_KEY = s"$CONNECTOR_PREFIX.inmemfs.enable"
192197
val USE_IN_MEMORY_FS_DOC =
193198
"Rather than writing to the host file system, buffer parquet data files in memory"
194199
val USE_IN_MEMORY_FS_DEFAULT = false
195200

196-
val SINK_PUT_TIMEOUT_KEY = s"${CONNECTOR_PREFIX}.sink.put.timeout.ms"
201+
val SINK_PUT_TIMEOUT_KEY = s"$CONNECTOR_PREFIX.sink.put.timeout.ms"
197202
val SINK_PUT_TIMEOUT_DOC =
198203
"The maximum time (in milliseconds) for the connector task to complete the upload of a single Parquet file before being flagged as failed. Note: this value should always be lower than max.poll.interval.ms"
199204
val SINK_PUT_TIMEOUT_DEFAULT = 288000L // 4.8 minutes

connector/src/main/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfigDef.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -363,9 +363,16 @@ object EmsSinkConfigDef {
363363
.define(
364364
DECIMAL_CONVERSION_KEY,
365365
Type.BOOLEAN,
366-
DECIMAL_CONVERSION_KEY_DEFAULT,
366+
DECIMAL_CONVERSION_DEFAULT,
367367
Importance.MEDIUM,
368-
DECIMAL_CONVERSION_KEY_DOC,
368+
DECIMAL_CONVERSION_DOC,
369+
)
370+
.define(
371+
TRANSFORM_FIELDS_LOWERCASE_KEY,
372+
Type.BOOLEAN,
373+
TRANSFORM_FIELDS_LOWERCASE_DEFAULT,
374+
Importance.MEDIUM,
375+
TRANSFORM_FIELDS_LOWERCASE_DOC,
369376
)
370377
.define(
371378
NULL_PK_KEY,

connector/src/main/scala/com/celonis/kafka/connect/transform/PreConversionConfig.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,17 @@
1717
package com.celonis.kafka.connect.transform
1818

1919
import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants.DECIMAL_CONVERSION_KEY
20-
import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants.DECIMAL_CONVERSION_KEY_DEFAULT
20+
import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants.DECIMAL_CONVERSION_DEFAULT
21+
import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants.TRANSFORM_FIELDS_LOWERCASE_KEY
22+
import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants.TRANSFORM_FIELDS_LOWERCASE_DEFAULT
2123
import com.celonis.kafka.connect.ems.config.PropertiesHelper.getBoolean
2224

23-
final case class PreConversionConfig(convertDecimalsToFloat: Boolean)
25+
final case class PreConversionConfig(convertDecimalsToFloat: Boolean, convertFieldsToLowercase: Boolean)
2426

2527
object PreConversionConfig {
2628
def extract(props: Map[String, _]): PreConversionConfig =
2729
PreConversionConfig(
28-
getBoolean(props, DECIMAL_CONVERSION_KEY).getOrElse(DECIMAL_CONVERSION_KEY_DEFAULT),
30+
getBoolean(props, DECIMAL_CONVERSION_KEY).getOrElse(DECIMAL_CONVERSION_DEFAULT),
31+
getBoolean(props, TRANSFORM_FIELDS_LOWERCASE_KEY).getOrElse(TRANSFORM_FIELDS_LOWERCASE_DEFAULT),
2932
)
3033
}

connector/src/main/scala/com/celonis/kafka/connect/transform/conversion/ConnectConversion.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,11 @@ trait ConnectConversion {
3333

3434
object ConnectConversion {
3535
def fromConfig(config: PreConversionConfig): ConnectConversion =
36-
if (config.convertDecimalsToFloat) new RecursiveConversion(DecimalToFloatConversion)
37-
else noOpConversion
36+
if (config.convertFieldsToLowercase || config.convertDecimalsToFloat) {
37+
val inner = if (config.convertDecimalsToFloat) DecimalToFloatConversion else noOpConversion
38+
val fieldNameConversion: String => String = if (config.convertFieldsToLowercase) _.toLowerCase else identity
39+
new RecursiveConversion(inner, fieldNameConversion)
40+
} else noOpConversion
3841

3942
val noOpConversion: ConnectConversion = new ConnectConversion {
4043
override def convertSchema(originalSchema: Schema): Schema = originalSchema

connector/src/main/scala/com/celonis/kafka/connect/transform/conversion/RecursiveConversion.scala

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,14 @@ import scala.jdk.CollectionConverters._
2424

2525
/** Traverse containers data objects (structs, maps and lists) and apply the inner conversion to the leaves
2626
*/
27-
final class RecursiveConversion(innerConversion: ConnectConversion) extends ConnectConversion {
27+
final class RecursiveConversion(innerConversion: ConnectConversion, fieldNameConversion: String => String)
28+
extends ConnectConversion {
2829

2930
override def convertSchema(originalSchema: Schema): Schema =
3031
originalSchema.`type`() match {
3132
case Schema.Type.STRUCT =>
3233
originalSchema.fields().asScala.foldLeft(SchemaBuilder.struct()) { case (builder, field) =>
33-
builder.field(field.name(), convertSchema(field.schema()))
34+
builder.field(fieldNameConversion(field.name()), convertSchema(field.schema()))
3435
}.optionalIf(originalSchema.isOptional).build()
3536
case Schema.Type.ARRAY =>
3637
SchemaBuilder.array(convertSchema(originalSchema.valueSchema())).optionalIf(originalSchema.isOptional).build()
@@ -46,10 +47,12 @@ final class RecursiveConversion(innerConversion: ConnectConversion) extends Conn
4647
connectValue match {
4748
case connectValue: Struct =>
4849
val newStruct = new Struct(targetSchema)
49-
targetSchema.fields().asScala.foreach { field =>
50+
originalSchema.fields().asScala.foreach { field =>
51+
val newFieldName = fieldNameConversion(field.name())
52+
val newFieldSchema = targetSchema.field(newFieldName).schema()
5053
newStruct.put(
51-
field.name(),
52-
convertValue(connectValue.get(field), originalSchema.field(field.name()).schema(), field.schema()),
54+
newFieldName,
55+
convertValue(connectValue.get(field), field.schema(), newFieldSchema),
5356
)
5457
}
5558
newStruct

connector/src/test/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfigTest.scala

Lines changed: 51 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ class EmsSinkConfigTest extends AnyFunSuite with Matchers {
6161
http = UnproxiedHttpClientConfig(defaultPoolingConfig),
6262
explode = ExplodeConfig.None,
6363
orderField = OrderFieldConfig(EmbeddedKafkaMetadataFieldInserter.CelonisOrderFieldName.some),
64-
preConversionConfig = PreConversionConfig(convertDecimalsToFloat = false),
64+
preConversionConfig = PreConversionConfig(convertDecimalsToFloat = false, convertFieldsToLowercase = false),
6565
flattenerConfig = None,
6666
embedKafkaMetadata = true,
6767
useInMemoryFileSystem = false,
@@ -79,19 +79,40 @@ class EmsSinkConfigTest extends AnyFunSuite with Matchers {
7979

8080
test(s"parse PreConversionConfig") {
8181
val expectedWithDefault =
82-
anEmsSinkConfig.copy(preConversionConfig = PreConversionConfig(convertDecimalsToFloat = false))
83-
val properties = propertiesFromConfig(expectedWithDefault).removed(DECIMAL_CONVERSION_KEY)
82+
anEmsSinkConfig.copy(preConversionConfig =
83+
PreConversionConfig(convertDecimalsToFloat = false, convertFieldsToLowercase = false),
84+
)
85+
val properties =
86+
propertiesFromConfig(expectedWithDefault).removed(DECIMAL_CONVERSION_KEY).removed(TRANSFORM_FIELDS_LOWERCASE_KEY)
8487
parseProperties(properties) shouldBe Right(expectedWithDefault)
8588

86-
val expectedWithConversion =
87-
anEmsSinkConfig.copy(preConversionConfig = PreConversionConfig(convertDecimalsToFloat = true))
88-
val propertiesWithConversion = propertiesFromConfig(expectedWithConversion)
89-
parseProperties(propertiesWithConversion) shouldBe Right(expectedWithConversion)
90-
91-
val expectedWithoutConversion =
92-
anEmsSinkConfig.copy(preConversionConfig = PreConversionConfig(convertDecimalsToFloat = false))
93-
val propertiesWithoutConversion = propertiesFromConfig(expectedWithoutConversion)
94-
parseProperties(propertiesWithoutConversion) shouldBe Right(expectedWithoutConversion)
89+
val expectedWithDecimalConversion =
90+
anEmsSinkConfig.copy(preConversionConfig =
91+
PreConversionConfig(convertDecimalsToFloat = true, convertFieldsToLowercase = false),
92+
)
93+
val propertiesWithDecimalConversion = propertiesFromConfig(expectedWithDecimalConversion)
94+
parseProperties(propertiesWithDecimalConversion) shouldBe Right(expectedWithDecimalConversion)
95+
96+
val expectedWithFieldNamesConversion =
97+
anEmsSinkConfig.copy(preConversionConfig =
98+
PreConversionConfig(convertDecimalsToFloat = false, convertFieldsToLowercase = true),
99+
)
100+
val propertiesWithFieldNamesConversion = propertiesFromConfig(expectedWithFieldNamesConversion)
101+
parseProperties(propertiesWithFieldNamesConversion) shouldBe Right(expectedWithFieldNamesConversion)
102+
103+
val expectedWithAllConversions =
104+
anEmsSinkConfig.copy(preConversionConfig =
105+
PreConversionConfig(convertDecimalsToFloat = true, convertFieldsToLowercase = true),
106+
)
107+
val propertiesWithAllConversions = propertiesFromConfig(expectedWithAllConversions)
108+
parseProperties(propertiesWithAllConversions) shouldBe Right(expectedWithAllConversions)
109+
110+
val expectedWithoutConversions =
111+
anEmsSinkConfig.copy(preConversionConfig =
112+
PreConversionConfig(convertDecimalsToFloat = false, convertFieldsToLowercase = false),
113+
)
114+
val propertiesWithoutConversion = propertiesFromConfig(expectedWithoutConversions)
115+
parseProperties(propertiesWithoutConversion) shouldBe Right(expectedWithoutConversions)
95116
}
96117

97118
test(s"returns an error if AUTHORIZATION_KEY is missing") {
@@ -240,23 +261,24 @@ class EmsSinkConfigTest extends AnyFunSuite with Matchers {
240261
}
241262

242263
private def propertiesFromConfig(config: EmsSinkConfig): Map[String, _] = Map(
243-
"name" -> config.sinkName,
244-
ENDPOINT_KEY -> config.url.toString,
245-
TARGET_TABLE_KEY -> config.target,
246-
AUTHORIZATION_KEY -> config.authorization.header,
247-
ERROR_POLICY_KEY -> config.errorPolicyConfig.policyType.toString,
248-
COMMIT_SIZE_KEY -> config.commitPolicy.fileSize,
249-
COMMIT_INTERVAL_KEY -> config.commitPolicy.interval,
250-
COMMIT_RECORDS_KEY -> config.commitPolicy.records,
251-
ERROR_RETRY_INTERVAL -> config.errorPolicyConfig.retryConfig.interval,
252-
ERROR_POLICY_RETRIES_KEY -> config.errorPolicyConfig.retryConfig.retries,
253-
TMP_DIRECTORY_KEY -> config.workingDir.toString,
254-
PRIMARY_KEYS_KEY -> config.primaryKeys.mkString(","),
255-
CONNECTION_ID_KEY -> config.connectionId.get,
256-
ORDER_FIELD_NAME_KEY -> config.orderField.name.orNull,
257-
FALLBACK_VARCHAR_LENGTH_KEY -> config.fallbackVarCharLengths.orNull,
258-
DECIMAL_CONVERSION_KEY -> config.preConversionConfig.convertDecimalsToFloat,
259-
FLATTENER_ENABLE_KEY -> config.flattenerConfig.isDefined,
264+
"name" -> config.sinkName,
265+
ENDPOINT_KEY -> config.url.toString,
266+
TARGET_TABLE_KEY -> config.target,
267+
AUTHORIZATION_KEY -> config.authorization.header,
268+
ERROR_POLICY_KEY -> config.errorPolicyConfig.policyType.toString,
269+
COMMIT_SIZE_KEY -> config.commitPolicy.fileSize,
270+
COMMIT_INTERVAL_KEY -> config.commitPolicy.interval,
271+
COMMIT_RECORDS_KEY -> config.commitPolicy.records,
272+
ERROR_RETRY_INTERVAL -> config.errorPolicyConfig.retryConfig.interval,
273+
ERROR_POLICY_RETRIES_KEY -> config.errorPolicyConfig.retryConfig.retries,
274+
TMP_DIRECTORY_KEY -> config.workingDir.toString,
275+
PRIMARY_KEYS_KEY -> config.primaryKeys.mkString(","),
276+
CONNECTION_ID_KEY -> config.connectionId.get,
277+
ORDER_FIELD_NAME_KEY -> config.orderField.name.orNull,
278+
FALLBACK_VARCHAR_LENGTH_KEY -> config.fallbackVarCharLengths.orNull,
279+
DECIMAL_CONVERSION_KEY -> config.preConversionConfig.convertDecimalsToFloat,
280+
TRANSFORM_FIELDS_LOWERCASE_KEY -> config.preConversionConfig.convertFieldsToLowercase,
281+
FLATTENER_ENABLE_KEY -> config.flattenerConfig.isDefined,
260282
FLATTENER_DISCARD_COLLECTIONS_KEY -> config.flattenerConfig.map(_.discardCollections).getOrElse(
261283
FLATTENER_DISCARD_COLLECTIONS_DEFAULT,
262284
),

connector/src/test/scala/com/celonis/kafka/connect/ems/sink/EmsSinkTaskObfuscationTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ class EmsSinkTaskObfuscationTest extends AnyFunSuite with Matchers with WorkingD
6868
UnproxiedHttpClientConfig(defaultPoolingConfig),
6969
ExplodeConfig.None,
7070
OrderFieldConfig(Some(EmbeddedKafkaMetadataFieldInserter.CelonisOrderFieldName)),
71-
PreConversionConfig(convertDecimalsToFloat = false),
71+
PreConversionConfig(false, false),
7272
None,
7373
embedKafkaMetadata = false,
7474
useInMemoryFileSystem = false,

0 commit comments

Comments
 (0)