Skip to content

Commit f020cc8

Browse files
authored
DP-2918: continue on data error (#62)
* DP-2918: drop or serialise collections in schema inference as well * DP-2918: add a test for dropping arrays in normalization * DP-2918: Return Either in normalizer, and fail in the task when normalization fails * DP-2918: Put failing records in DLQ * DP-2918: Change exception name * DP-2918: Introduce ErrorPolicyConfig class * DP-2918: Tests for ErrorPolicyConfig * DP-2918: Add continueOninvalidInput configuration * DP-2918: Call errand record reporter only for InvalidInputException * DP-2918: Change names of a test * DP-2918: Scalafmt * DP-2918: Fix maxretries reset in case of continue on error * DP-2918: Add comment * DP-2918: InvalidInputErrorHandler test * DP-2918: Fix InvalidInputErrorHandler * DP-2918: Integrate InvalidInputErrorHandler into EmsSinkTask * DP-2918: Add a comment * DP-2918: Add e2e test * DP-2918: Remove a comment * DP-2918: Scalafmt * DP-2918: remove a println
1 parent 5f9eaca commit f020cc8

24 files changed

+562
-224
lines changed

connector/src/main/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfig.scala

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package com.celonis.kafka.connect.ems.config
1818

1919
import cats.implicits._
2020
import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants._
21-
import com.celonis.kafka.connect.ems.errors.ErrorPolicy
2221
import com.celonis.kafka.connect.ems.storage.FileSystemOperations
2322
import com.celonis.kafka.connect.transform.FlattenerConfig
2423
import com.celonis.kafka.connect.transform.PreConversionConfig
@@ -36,9 +35,8 @@ final case class EmsSinkConfig(
3635
target: String,
3736
connectionId: Option[String],
3837
authorization: AuthorizationHeader,
39-
errorPolicy: ErrorPolicy,
38+
errorPolicyConfig: ErrorPolicyConfig,
4039
commitPolicy: CommitPolicyConfig,
41-
retries: RetryConfig,
4240
workingDir: Path,
4341
parquet: ParquetConfig,
4442
primaryKeys: List[String],
@@ -106,8 +104,7 @@ object EmsSinkConfig {
106104
url <- extractURL(props)
107105
table <- extractTargetTable(props)
108106
authorization <- AuthorizationHeader.extract(props)
109-
error <- ErrorPolicy.extract(props)
110-
retry <- RetryConfig.extractRetry(props)
107+
errorPolicyConfig <- ErrorPolicyConfig.extract(props)
111108
useInMemoryFs = PropertiesHelper.getBoolean(props, USE_IN_MEMORY_FS_KEY).getOrElse(USE_IN_MEMORY_FS_DEFAULT)
112109
allowNullsAsPks = PropertiesHelper.getBoolean(props, NULL_PK_KEY).getOrElse(NULL_PK_KEY_DEFAULT)
113110
tempDir <- if (useInMemoryFs) Right(FileSystemOperations.InMemoryPseudoDir) else extractWorkingDirectory(props)
@@ -135,9 +132,8 @@ object EmsSinkConfig {
135132
table,
136133
connectionId,
137134
authorization,
138-
error,
135+
errorPolicyConfig,
139136
commitPolicy,
140-
retry,
141137
tempDir,
142138
parquetConfig,
143139
primaryKeys,

connector/src/main/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfigConstants.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,10 @@ object EmsSinkConfigConstants {
6464
| Default is $PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT.
6565
| """.stripMargin
6666

67-
val NBR_OF_RETRIES_KEY = s"$CONNECTOR_PREFIX.max.retries"
68-
val NBR_OF_RETRIES_DOC =
67+
val ERROR_POLICY_RETRIES_KEY = s"$CONNECTOR_PREFIX.max.retries"
68+
val ERROR_POLICY_RETRIES_DOC =
6969
"The maximum number of times to re-attempt to write the records before the task is marked as failed."
70-
val NBR_OF_RETIRES_DEFAULT: Int = 10
70+
val ERROR_POLICY_RETRIES_DEFAULT: Int = 10
7171

7272
val ERROR_POLICY_KEY = s"$CONNECTOR_PREFIX.error.policy"
7373
val ERROR_POLICY_DOC: String =
@@ -76,7 +76,7 @@ object EmsSinkConfigConstants {
7676
| There are three available options:
7777
| CONTINUE - the error is swallowed
7878
| THROW - the error is allowed to propagate.
79-
| RETRY - The exception causes the Connect framework to retry the message. The number of retries is set by $NBR_OF_RETRIES_KEY.
79+
| RETRY - The exception causes the Connect framework to retry the message. The number of retries is set by $ERROR_POLICY_RETRIES_KEY.
8080
|All errors will be logged automatically, even if the code swallows them.
8181
""".stripMargin
8282
val ERROR_POLICY_DEFAULT = "THROW"
@@ -85,6 +85,11 @@ object EmsSinkConfigConstants {
8585
val ERROR_RETRY_INTERVAL_DOC = "The time in milliseconds between retries."
8686
val ERROR_RETRY_INTERVAL_DEFAULT: Long = 60000L
8787

88+
val ERROR_CONTINUE_ON_INVALID_INPUT_KEY = s"$CONNECTOR_PREFIX.error.policy.continue.on.invalid.input"
89+
val ERROR_CONTINUE_ON_INVALID_INPUT_DOC: String =
90+
"If set to 'true', connector will continue when invalid input errors occur. Invalid records will be sent to the DLQ, if present. "
91+
val ERROR_CONTINUE_ON_INVALID_INPUT_DEFAULT = false
92+
8893
val FALLBACK_VARCHAR_LENGTH_KEY = s"$CONNECTOR_PREFIX.data.fallback.varchar.length"
8994
val FALLBACK_VARCHAR_LENGTH_DOC =
9095
"Optional parameter representing the STRING (VARCHAR) length when the schema is created in EMS. Must be greater than 0 and smaller or equal than 65000"

connector/src/main/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfigDef.scala

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -134,15 +134,15 @@ object EmsSinkConfigDef {
134134
ERROR_POLICY_DOC,
135135
)
136136
.define(
137-
NBR_OF_RETRIES_KEY,
137+
ERROR_POLICY_RETRIES_KEY,
138138
Type.INT,
139-
NBR_OF_RETIRES_DEFAULT,
139+
ERROR_POLICY_RETRIES_DEFAULT,
140140
Importance.MEDIUM,
141-
NBR_OF_RETRIES_DOC,
141+
ERROR_POLICY_RETRIES_DOC,
142142
"Error",
143143
2,
144144
ConfigDef.Width.LONG,
145-
NBR_OF_RETRIES_KEY,
145+
ERROR_POLICY_RETRIES_KEY,
146146
)
147147
.define(
148148
ERROR_RETRY_INTERVAL,
@@ -155,6 +155,13 @@ object EmsSinkConfigDef {
155155
ConfigDef.Width.LONG,
156156
ERROR_RETRY_INTERVAL,
157157
)
158+
.define(
159+
ERROR_CONTINUE_ON_INVALID_INPUT_KEY,
160+
Type.BOOLEAN,
161+
ERROR_CONTINUE_ON_INVALID_INPUT_DEFAULT,
162+
Importance.MEDIUM,
163+
ERROR_CONTINUE_ON_INVALID_INPUT_DOC,
164+
)
158165
.define(
159166
PARQUET_ROW_GROUP_SIZE_BYTES_KEY,
160167
Type.INT,
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Copyright 2024 Celonis SE
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.celonis.kafka.connect.ems.config
18+
19+
import cats.syntax.either._
20+
import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants.ERROR_CONTINUE_ON_INVALID_INPUT_DEFAULT
21+
import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants.ERROR_CONTINUE_ON_INVALID_INPUT_KEY
22+
import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants.ERROR_POLICY_DOC
23+
import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants.ERROR_POLICY_KEY
24+
import com.celonis.kafka.connect.ems.config.ErrorPolicyConfig.ErrorPolicyType
25+
import com.celonis.kafka.connect.ems.config.ErrorPolicyConfig.ErrorPolicyType.CONTINUE
26+
import com.celonis.kafka.connect.ems.config.ErrorPolicyConfig.ErrorPolicyType.RETRY
27+
import com.celonis.kafka.connect.ems.config.ErrorPolicyConfig.ErrorPolicyType.THROW
28+
import com.celonis.kafka.connect.ems.config.PropertiesHelper.error
29+
import com.celonis.kafka.connect.ems.config.PropertiesHelper.getBoolean
30+
import com.celonis.kafka.connect.ems.config.PropertiesHelper.nonEmptyStringOr
31+
import com.celonis.kafka.connect.ems.errors.ErrorPolicy
32+
import com.celonis.kafka.connect.ems.errors.InvalidInputErrorHandler
33+
import org.apache.kafka.connect.sink.ErrantRecordReporter
34+
35+
final case class ErrorPolicyConfig(
36+
policyType: ErrorPolicyType,
37+
retryConfig: RetryConfig,
38+
continueOnInvalidInput: Boolean,
39+
) {
40+
lazy val errorPolicy: ErrorPolicy =
41+
policyType match {
42+
case ErrorPolicyType.THROW => ErrorPolicy.Throw
43+
case ErrorPolicyType.CONTINUE => ErrorPolicy.Continue
44+
case ErrorPolicyType.RETRY => ErrorPolicy.Retry
45+
}
46+
47+
def invalidInputErrorHandler(reporter: Option[ErrantRecordReporter]): InvalidInputErrorHandler =
48+
new InvalidInputErrorHandler(continueOnInvalidInput, reporter)
49+
}
50+
51+
object ErrorPolicyConfig {
52+
sealed trait ErrorPolicyType
53+
object ErrorPolicyType {
54+
case object THROW extends ErrorPolicyType
55+
case object CONTINUE extends ErrorPolicyType
56+
case object RETRY extends ErrorPolicyType
57+
}
58+
59+
def extract(props: Map[String, _]): Either[String, ErrorPolicyConfig] =
60+
for {
61+
policyType <- extractType(props)
62+
retryConfig <- RetryConfig.extractRetry(props)
63+
continueOnInvalidInput =
64+
getBoolean(props, ERROR_CONTINUE_ON_INVALID_INPUT_KEY).getOrElse(ERROR_CONTINUE_ON_INVALID_INPUT_DEFAULT)
65+
} yield ErrorPolicyConfig(policyType, retryConfig, continueOnInvalidInput = continueOnInvalidInput)
66+
67+
private def extractType(props: Map[String, _]): Either[String, ErrorPolicyType] =
68+
nonEmptyStringOr(props, ERROR_POLICY_KEY, ERROR_POLICY_DOC).map(_.toUpperCase)
69+
.flatMap {
70+
case "THROW" => THROW.asRight
71+
case "RETRY" => RETRY.asRight
72+
case "CONTINUE" => CONTINUE.asRight
73+
case _ => error(ERROR_POLICY_KEY, ERROR_POLICY_DOC)
74+
}
75+
}

connector/src/main/scala/com/celonis/kafka/connect/ems/config/RetryConfig.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,11 @@ object RetryConfig {
3232
else value.asRight[String]
3333
case None => ERROR_RETRY_INTERVAL_DEFAULT.asRight[String]
3434
}
35-
retries <- PropertiesHelper.getInt(props, NBR_OF_RETRIES_KEY) match {
35+
retries <- PropertiesHelper.getInt(props, ERROR_POLICY_RETRIES_KEY) match {
3636
case Some(value) =>
37-
if (value <= 0) error(NBR_OF_RETRIES_KEY, "Number of retries needs to be greater than 0.")
37+
if (value <= 0) error(ERROR_POLICY_RETRIES_KEY, "Number of retries needs to be greater than 0.")
3838
else value.asRight[String]
39-
case None => NBR_OF_RETIRES_DEFAULT.asRight[String]
39+
case None => ERROR_POLICY_RETRIES_DEFAULT.asRight[String]
4040
}
4141
} yield RetryConfig(retries, interval)
4242

connector/src/main/scala/com/celonis/kafka/connect/ems/errors/EmsSinkException.scala

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,16 @@
1717
package com.celonis.kafka.connect.ems.errors
1818

1919
import org.apache.kafka.connect.errors.ConnectException
20-
import org.http4s.DecodeFailure
2120
import org.http4s.Status
2221

2322
sealed trait EmsSinkException {
2423
def getMessage: String
2524
}
2625

27-
case class UploadFailedException(status: Status, msg: String, throwable: Throwable)
26+
final case class UploadFailedException(status: Status, msg: String, throwable: Throwable)
2827
extends ConnectException(msg, throwable)
2928
with EmsSinkException
3029

31-
case class UnexpectedUploadException(msg: String, throwable: Throwable) extends ConnectException(msg, throwable)
30+
final case class InvalidInputException(msg: String) extends ConnectException(msg) with EmsSinkException
3231

33-
case class UploadInvalidResponseException(failure: DecodeFailure)
34-
extends ConnectException(failure.getMessage(), failure)
35-
with EmsSinkException
36-
37-
case class InvalidInputException(msg: String) extends ConnectException(msg) with EmsSinkException
38-
39-
case class FailedObfuscationException(msg: String) extends ConnectException(msg) with EmsSinkException
32+
final case class FailedObfuscationException(msg: String) extends ConnectException(msg) with EmsSinkException

connector/src/main/scala/com/celonis/kafka/connect/ems/errors/ErrorPolicy.scala

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,15 @@
1616

1717
package com.celonis.kafka.connect.ems.errors
1818

19-
import cats.syntax.either._
20-
import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants.ERROR_POLICY_DOC
21-
import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants.ERROR_POLICY_KEY
22-
import com.celonis.kafka.connect.ems.config.PropertiesHelper.error
23-
import com.celonis.kafka.connect.ems.config.PropertiesHelper.nonEmptyStringOr
2419
import com.typesafe.scalalogging.StrictLogging
25-
26-
import enumeratum._
27-
import enumeratum.EnumEntry.Uppercase
2820
import org.apache.kafka.connect.errors.ConnectException
2921
import org.apache.kafka.connect.errors.RetriableException
3022

31-
sealed trait ErrorPolicy extends EnumEntry with Uppercase {
23+
sealed trait ErrorPolicy {
3224
def handle(error: Throwable, retries: Int): Unit
3325
}
3426

35-
object ErrorPolicy extends Enum[ErrorPolicy] {
36-
val values: IndexedSeq[ErrorPolicy] = findValues
27+
object ErrorPolicy {
3728

3829
case object Continue extends ErrorPolicy with StrictLogging {
3930
override def handle(error: Throwable, retries: Int): Unit =
@@ -50,19 +41,12 @@ object ErrorPolicy extends Enum[ErrorPolicy] {
5041
case object Retry extends ErrorPolicy with StrictLogging {
5142
override def handle(error: Throwable, retries: Int): Unit =
5243
if (retries == 0) {
44+
logger.warn(s"Error policy is set to RETRY and no more attempts left.", error)
5345
throw new ConnectException(error)
5446
} else {
5547
logger.warn(s"Error policy is set to RETRY. Remaining attempts [$retries]", error)
5648
throw new RetriableException(error)
5749
}
5850
}
5951

60-
def extract(props: Map[String, _]): Either[String, ErrorPolicy] =
61-
nonEmptyStringOr(props, ERROR_POLICY_KEY, ERROR_POLICY_DOC)
62-
.flatMap { constant =>
63-
ErrorPolicy.withNameInsensitiveOption(constant) match {
64-
case Some(value) => value.asRight[String]
65-
case None => error(ERROR_POLICY_KEY, ERROR_POLICY_DOC)
66-
}
67-
}
6852
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2024 Celonis SE
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.celonis.kafka.connect.ems.errors
18+
19+
import com.typesafe.scalalogging.StrictLogging
20+
import org.apache.kafka.connect.sink.ErrantRecordReporter
21+
import org.apache.kafka.connect.sink.SinkRecord
22+
23+
/** Error policies work at the batch level, while this handler works at the record level. It works only for
24+
* InvalidInputExceptions, that are errors due to defects of single records.
25+
*/
26+
final class InvalidInputErrorHandler(
27+
continueOnInvalidInput: Boolean,
28+
errantRecordReporter: Option[ErrantRecordReporter],
29+
) extends StrictLogging {
30+
def handle(record: SinkRecord, error: Throwable): Unit = error match {
31+
case _: InvalidInputException if continueOnInvalidInput =>
32+
logger.warn("Error policy is set to CONTINUE on InvalidInput", error)
33+
errantRecordReporter.foreach(errantRecordReporter => errantRecordReporter.report(record, error))
34+
case _ => throw error
35+
}
36+
}

0 commit comments

Comments
 (0)