Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdks/java/io/expansion-service/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ dependencies {
permitUnusedDeclared project(":sdks:java:io:kafka") // BEAM-11761
implementation project(":sdks:java:io:kafka:upgrade")
permitUnusedDeclared project(":sdks:java:io:kafka:upgrade") // BEAM-11761
implementation project(":sdks:java:extensions:kafka-factories")
permitUnusedDeclared project(":sdks:java:extensions:kafka-factories")

if (JavaVersion.current().compareTo(JavaVersion.VERSION_11) >= 0 && project.findProperty('testJavaVersion') != '8') {
// iceberg ended support for Java 8 in 1.7.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -94,6 +95,7 @@
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.Manual;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.WallTime;
import org.apache.beam.sdk.util.InstanceBuilder;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.util.construction.PTransformMatchers;
import org.apache.beam.sdk.util.construction.ReplacementOutputs;
Expand Down Expand Up @@ -930,6 +932,34 @@ static <K, V> void setupExternalBuilder(
builder.setOffsetDeduplication(false);
builder.setRedistributeByRecordKey(false);
}

if (config.consumerFactoryFnClass != null) {
if (config.consumerFactoryFnClass.contains("KerberosConsumerFactoryFn")) {
try {
if (!config.consumerFactoryFnParams.containsKey("krb5Location")) {
throw new IllegalArgumentException(
"The KerberosConsumerFactoryFn requires a location for the krb5.conf file. "
+ "Please provide either a GCS location or Google Secret Manager location for this file.");
}
String krb5Location = config.consumerFactoryFnParams.get("krb5Location");
builder.setConsumerFactoryFn(
InstanceBuilder.ofType(
new TypeDescriptor<
SerializableFunction<
Map<String, Object>, Consumer<byte[], byte[]>>>() {})
.fromClassName(config.consumerFactoryFnClass)
.withArg(String.class, Objects.requireNonNull(krb5Location))
.build());
} catch (Exception e) {
throw new RuntimeException(
"Unable to construct FactoryFn "
+ config.consumerFactoryFnClass
+ ": "
+ e.getMessage(),
e);
}
}
}
}

private static <T> Coder<T> resolveCoder(Class<Deserializer<T>> deserializer) {
Expand Down Expand Up @@ -1000,6 +1030,8 @@ public static class Configuration {
private Boolean offsetDeduplication;
private Boolean redistributeByRecordKey;
private Long dynamicReadPollIntervalSeconds;
private String consumerFactoryFnClass;
private Map<String, String> consumerFactoryFnParams;

public void setConsumerConfig(Map<String, String> consumerConfig) {
this.consumerConfig = consumerConfig;
Expand Down Expand Up @@ -1068,6 +1100,14 @@ public void setRedistributeByRecordKey(Boolean redistributeByRecordKey) {
public void setDynamicReadPollIntervalSeconds(Long dynamicReadPollIntervalSeconds) {
this.dynamicReadPollIntervalSeconds = dynamicReadPollIntervalSeconds;
}

public void setConsumerFactoryFnClass(String consumerFactoryFnClass) {
this.consumerFactoryFnClass = consumerFactoryFnClass;
}

public void setConsumerFactoryFnParams(Map<String, String> consumerFactoryFnParams) {
this.consumerFactoryFnParams = consumerFactoryFnParams;
}
}
}

Expand Down
47 changes: 29 additions & 18 deletions sdks/python/apache_beam/io/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@

# pytype: skip-file

import collections
import typing

import numpy as np
Expand All @@ -110,22 +111,21 @@

ReadFromKafkaSchema = typing.NamedTuple(
'ReadFromKafkaSchema',
[
('consumer_config', typing.Mapping[str, str]),
('topics', typing.List[str]),
('key_deserializer', str),
('value_deserializer', str),
('start_read_time', typing.Optional[int]),
('max_num_records', typing.Optional[int]),
('max_read_time', typing.Optional[int]),
('commit_offset_in_finalize', bool),
('timestamp_policy', str),
('consumer_polling_timeout', typing.Optional[int]),
('redistribute', typing.Optional[bool]),
('redistribute_num_keys', typing.Optional[np.int32]),
('allow_duplicates', typing.Optional[bool]),
('dynamic_read_poll_interval_seconds', typing.Optional[int]),
])
[('consumer_config', typing.Mapping[str, str]),
('topics', typing.List[str]), ('key_deserializer', str),
('value_deserializer', str), ('start_read_time', typing.Optional[int]),
('max_num_records', typing.Optional[int]),
('max_read_time', typing.Optional[int]),
('commit_offset_in_finalize', bool), ('timestamp_policy', str),
('consumer_polling_timeout', typing.Optional[int]),
('redistribute', typing.Optional[bool]),
('redistribute_num_keys', typing.Optional[np.int32]),
('allow_duplicates', typing.Optional[bool]),
('dynamic_read_poll_interval_seconds', typing.Optional[int]),
('consumer_factory_fn_class', typing.Optional[str]),
(
'consumer_factory_fn_params',
typing.Optional[collections.abc.Mapping[str, str]])])


def default_io_expansion_service(append_args=None):
Expand Down Expand Up @@ -173,7 +173,9 @@ def __init__(
redistribute_num_keys=np.int32(0),
allow_duplicates=False,
dynamic_read_poll_interval_seconds: typing.Optional[int] = None,
):
consumer_factory_fn_class: typing.Optional[str] = None,
consumer_factory_fn_params: typing.Optional[
collections.abc.Mapping] = None):
"""
Initializes a read operation from Kafka.

Expand Down Expand Up @@ -216,6 +218,13 @@ def __init__(
:param dynamic_read_poll_interval_seconds: The interval in seconds at which
to check for new partitions. If not None, dynamic partition discovery
is enabled.
:param consumer_factory_fn_class: A fully qualified classpath to an
existing provided consumerFactoryFn. If not None, this will construct
Kafka consumers with a custom configuration.
:param consumer_factory_fn_params: A map which specifies the parameters for
the provided consumer_factory_fn_class. If not None, the values in this
map will be used when constructing the consumer_factory_fn_class object.
This cannot be null if the consumer_factory_fn_class is not null.
"""
if timestamp_policy not in [ReadFromKafka.processing_time_policy,
ReadFromKafka.create_time_policy,
Expand All @@ -242,7 +251,9 @@ def __init__(
redistribute_num_keys=redistribute_num_keys,
allow_duplicates=allow_duplicates,
dynamic_read_poll_interval_seconds=
dynamic_read_poll_interval_seconds)),
dynamic_read_poll_interval_seconds,
consumer_factory_fn_class=consumer_factory_fn_class,
consumer_factory_fn_params=consumer_factory_fn_params)),
expansion_service or default_io_expansion_service())


Expand Down
50 changes: 25 additions & 25 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
import com.gradle.enterprise.gradleplugin.internal.extension.BuildScanExtensionWithHiddenFeatures

pluginManagement {
plugins {
id("org.javacc.javacc") version "3.0.3" // enable the JavaCC parser generator
}
plugins {
id("org.javacc.javacc") version "3.0.3" // enable the JavaCC parser generator
}
}

plugins {
id("com.gradle.develocity") version "3.19"
id("com.gradle.common-custom-user-data-gradle-plugin") version "2.4.0"
id("com.gradle.develocity") version "3.19"
id("com.gradle.common-custom-user-data-gradle-plugin") version "2.2.1"
}


Expand All @@ -36,32 +36,32 @@ val isGithubActionsBuild = arrayOf("GITHUB_REPOSITORY", "GITHUB_RUN_ID").all { S
val isCi = isJenkinsBuild || isGithubActionsBuild

develocity {
server = "https://develocity.apache.org"
projectId = "beam"
server = "https://develocity.apache.org"
projectId = "beam"

buildScan {
uploadInBackground = !isCi
publishing.onlyIf { it.isAuthenticated }
obfuscation {
ipAddresses { addresses -> addresses.map { "0.0.0.0" } }
buildScan {
uploadInBackground = !isCi
publishing.onlyIf { it.isAuthenticated }
obfuscation {
ipAddresses { addresses -> addresses.map { "0.0.0.0" } }
}
}
}
}

buildCache {
local {
isEnabled = true
}
remote<HttpBuildCache> {
url = uri("https://beam-cache.apache.org/cache/")
isAllowUntrustedServer = false
credentials {
username = System.getenv("GRADLE_ENTERPRISE_CACHE_USERNAME")
password = System.getenv("GRADLE_ENTERPRISE_CACHE_PASSWORD")
local {
isEnabled = true
}
remote<HttpBuildCache> {
url = uri("https://beam-cache.apache.org/cache/")
isAllowUntrustedServer = false
credentials {
username = System.getenv("GRADLE_ENTERPRISE_CACHE_USERNAME")
password = System.getenv("GRADLE_ENTERPRISE_CACHE_PASSWORD")
}
isEnabled = !System.getenv("GRADLE_ENTERPRISE_CACHE_USERNAME").isNullOrBlank()
isPush = isCi && !System.getenv("GRADLE_ENTERPRISE_CACHE_USERNAME").isNullOrBlank()
}
isEnabled = !System.getenv("GRADLE_ENTERPRISE_CACHE_USERNAME").isNullOrBlank()
isPush = isCi && !System.getenv("GRADLE_ENTERPRISE_CACHE_USERNAME").isNullOrBlank()
}
}

rootProject.name = "beam"
Expand Down
Loading