Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,27 @@
package com.salesforce.datacloud.jdbc.core;

import com.salesforce.datacloud.jdbc.exception.DataCloudJDBCException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.TreeMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import lombok.Builder;
import lombok.Getter;
import salesforce.cdp.hyperdb.v1.AttachedDatabase;

/**
* Connection properties that control the JDBC connection behavior.
*/
@Getter
@Builder
public class ConnectionProperties {
/**
* Regex pattern to match database path properties in the format "databases.N.path"
*/
private static final Pattern DATABASE_PATH_PATTERN = Pattern.compile("^databases\\.(\\d+)\\.path$");
/**
* The dataspace to use for the connection
*/
Expand Down Expand Up @@ -46,6 +57,12 @@ public class ConnectionProperties {
private final StatementProperties statementProperties =
StatementProperties.builder().build();

/**
* The attached databases for this connection (optional)
*/
@Builder.Default
private final List<AttachedDatabase> attachedDatabases = new ArrayList<>();

/**
* Parses connection properties from a Properties object.
*
Expand All @@ -72,6 +89,46 @@ public static ConnectionProperties of(Properties props) throws DataCloudJDBCExce
if (userNameValue != null) {
builder.userName(userNameValue);
}

// Parse attached databases by iterating over properties to find databases.N.path patterns
// Use TreeMap to automatically sort by index
Map<Integer, AttachedDatabase> databaseMap = new TreeMap<>();
for (String propertyName : props.stringPropertyNames()) {
Matcher matcher = DATABASE_PATH_PATTERN.matcher(propertyName);
if (matcher.matches()) {
int index = Integer.parseInt(matcher.group(1));
String databasePath = props.getProperty(propertyName);

if (databasePath != null && !databasePath.trim().isEmpty()) {
String databaseAlias = props.getProperty("databases." + index + ".alias");
AttachedDatabase.Builder dbBuilder =
AttachedDatabase.newBuilder().setPath(databasePath.trim());

// Alias is optional - if provided, use it; otherwise leave it empty
if (databaseAlias != null && !databaseAlias.trim().isEmpty()) {
dbBuilder.setAlias(databaseAlias.trim());
}
databaseMap.put(index, dbBuilder.build());
}
}
}

// Validate that indexes are consecutive starting from 0
if (!databaseMap.isEmpty()) {
int expectedIndex = 0;
for (Integer index : databaseMap.keySet()) {
if (index != expectedIndex) {
throw new DataCloudJDBCException(
"Database indexes must be consecutive starting from 0. Missing index: " + expectedIndex
+ ". Found indexes: " + databaseMap.keySet());
}
expectedIndex++;
}
}

List<AttachedDatabase> attachedDatabases = new ArrayList<>(databaseMap.values());
builder.attachedDatabases(attachedDatabases);

builder.statementProperties(StatementProperties.of(props));

return builder.build();
Expand All @@ -97,6 +154,15 @@ public Properties toProperties() {
if (!userName.isEmpty()) {
props.setProperty("userName", userName);
}

for (int i = 0; i < attachedDatabases.size(); ++i) {
AttachedDatabase database = attachedDatabases.get(i);
props.setProperty("databases." + i + ".path", database.getPath());
if (database.hasAlias() && !database.getAlias().isEmpty()) {
props.setProperty("databases." + i + ".alias", database.getAlias());
}
}

props.putAll(statementProperties.toProperties());

return props;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ protected HyperGrpcClientExecutor getQueryClient(QueryTimeout queryTimeout) thro
querySettings.put(
"query_timeout", queryTimeout.getServerQueryTimeout().toMillis() + "ms");
}
return HyperGrpcClientExecutor.of(stub, querySettings);
val databases = connection.getConnectionProperties().getAttachedDatabases();
return HyperGrpcClientExecutor.of(stub, querySettings, databases);
}

@Getter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
Expand All @@ -24,6 +25,7 @@
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import salesforce.cdp.hyperdb.v1.AttachedDatabase;
import salesforce.cdp.hyperdb.v1.CancelQueryParam;
import salesforce.cdp.hyperdb.v1.ExecuteQueryResponse;
import salesforce.cdp.hyperdb.v1.HyperServiceGrpc;
Expand Down Expand Up @@ -61,11 +63,25 @@ public static HyperGrpcClientExecutor forSubmittedQuery(@NonNull HyperServiceGrp

public static HyperGrpcClientExecutor of(
@NonNull HyperServiceGrpc.HyperServiceBlockingStub stub, Map<String, String> querySettings) {
return of(stub, querySettings, List.of());
}

public static HyperGrpcClientExecutor of(
@NonNull HyperServiceGrpc.HyperServiceBlockingStub stub,
Map<String, String> querySettings,
List<AttachedDatabase> databases) {
val builder = HyperGrpcClientExecutor.builder().stub(stub);

if (!querySettings.isEmpty()) {
builder.settingsQueryParams(
QueryParam.newBuilder().putAllSettings(querySettings).build());
if (!querySettings.isEmpty() || !databases.isEmpty()) {
val queryParamBuilder = QueryParam.newBuilder();

if (!querySettings.isEmpty()) {
queryParamBuilder.putAllSettings(querySettings);
}
if (!databases.isEmpty()) {
queryParamBuilder.addAllDatabases(databases);
}
builder.settingsQueryParams(queryParamBuilder.build());
}

return builder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
import java.sql.SQLException;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import org.grpcmock.GrpcMock;
import org.junit.jupiter.api.Test;
import salesforce.cdp.hyperdb.v1.AttachedDatabase;
import salesforce.cdp.hyperdb.v1.ExecuteQueryResponse;
import salesforce.cdp.hyperdb.v1.HyperServiceGrpc;
import salesforce.cdp.hyperdb.v1.OutputFormat;
Expand Down Expand Up @@ -49,4 +51,41 @@ public void testExecuteQuery() throws SQLException {
GrpcMock.verifyThat(
GrpcMock.calledMethod(HyperServiceGrpc.getExecuteQueryMethod()).withRequest(expectedQueryParam));
}

@Test
public void testExecuteQueryWithDatabases() throws SQLException {
List<AttachedDatabase> databases = List.of(
AttachedDatabase.newBuilder()
.setPath("/path/to/db1")
.setAlias("db1_alias")
.build(),
);

HyperGrpcClientExecutor clientWithDatabases = HyperGrpcClientExecutor.of(
stubProvider.getStub(),
java.util.Map.of(),
databases
);

GrpcMock.stubFor(GrpcMock.serverStreamingMethod(HyperServiceGrpc.getExecuteQueryMethod())
.willReturn(chunk1));

String query = "SELECT * FROM test";
Iterator<ExecuteQueryResponse> queryResultIterator =
clientWithDatabases.executeQuery(query, QueryTimeout.of(Duration.ZERO, Duration.ZERO));
assertDoesNotThrow(() -> {
while (queryResultIterator.hasNext()) {
queryResultIterator.next();
}
});

QueryParam expectedQueryParam = QueryParam.newBuilder()
.setQuery(query)
.setOutputFormat(OutputFormat.ARROW_IPC)
.setTransferMode(QueryParam.TransferMode.ADAPTIVE)
.addAllDatabases(databases)
.build();
GrpcMock.verifyThat(
GrpcMock.calledMethod(HyperServiceGrpc.getExecuteQueryMethod()).withRequest(expectedQueryParam));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,144 @@ void testUnprefixedLcTimeRaisesUserError() {
assertThat(exception.getMessage()).contains("Use 'querySetting.lc_time'");
}

@Test
void testDatabasesParsing() throws DataCloudJDBCException {
Properties properties = new Properties();
properties.setProperty("databases.0.path", "/path/to/db");
properties.setProperty("databases.0.alias", "db_alias");
ConnectionProperties connectionProperties = ConnectionProperties.of(properties);

assertThat(connectionProperties.getAttachedDatabases()).hasSize(1);
assertThat(connectionProperties.getAttachedDatabases().get(0).getPath()).isEqualTo("/path/to/db");
assertThat(connectionProperties.getAttachedDatabases().get(0).getAlias())
.isEqualTo("db_alias");
}

@Test
void testDatabasesParsingWithEmptyValue() throws DataCloudJDBCException {
Properties properties = new Properties();
ConnectionProperties connectionProperties = ConnectionProperties.of(properties);

assertThat(connectionProperties.getAttachedDatabases()).isEmpty();
}

@Test
void testDatabasesParsingWithOptionalAlias() throws DataCloudJDBCException {
Properties properties = new Properties();
properties.setProperty("databases.0.path", "/single/path");
ConnectionProperties connectionProperties = ConnectionProperties.of(properties);

assertThat(connectionProperties.getAttachedDatabases()).hasSize(1);
assertThat(connectionProperties.getAttachedDatabases().get(0).getPath()).isEqualTo("/single/path");
assertThat(connectionProperties.getAttachedDatabases().get(0).hasAlias())
.isFalse();
}

@Test
void testDatabasesRoundtrip() throws DataCloudJDBCException {
Properties properties = new Properties();
properties.setProperty("databases.0.path", "/path1");
properties.setProperty("databases.0.alias", "alias1");
properties.setProperty("workload", "testWorkload");

ConnectionProperties connectionProperties = ConnectionProperties.of(properties);
Properties roundtripProperties = connectionProperties.toProperties();

assertThat(roundtripProperties.getProperty("databases.0.path")).isEqualTo("/path1");
assertThat(roundtripProperties.getProperty("databases.0.alias")).isEqualTo("alias1");
assertThat(roundtripProperties.getProperty("workload")).isEqualTo("testWorkload");
}

@Test
void testMultipleDatabasesParsing() throws DataCloudJDBCException {
Properties properties = new Properties();
properties.setProperty("databases.0.path", "/path/to/db1");
properties.setProperty("databases.0.alias", "db1_alias");
properties.setProperty("databases.1.path", "/path/to/db2");
properties.setProperty("databases.2.path", "/path/to/db3");
properties.setProperty("databases.2.alias", "db3_alias");

ConnectionProperties connectionProperties = ConnectionProperties.of(properties);

assertThat(connectionProperties.getAttachedDatabases()).hasSize(3);

// First database with alias
assertThat(connectionProperties.getAttachedDatabases().get(0).getPath()).isEqualTo("/path/to/db1");
assertThat(connectionProperties.getAttachedDatabases().get(0).getAlias())
.isEqualTo("db1_alias");

// Second database without alias
assertThat(connectionProperties.getAttachedDatabases().get(1).getPath()).isEqualTo("/path/to/db2");
assertThat(connectionProperties.getAttachedDatabases().get(1).hasAlias())
.isFalse();

// Third database with alias
assertThat(connectionProperties.getAttachedDatabases().get(2).getPath()).isEqualTo("/path/to/db3");
assertThat(connectionProperties.getAttachedDatabases().get(2).getAlias())
.isEqualTo("db3_alias");
}

@Test
void testDatabasesRegexPatternValidation() throws DataCloudJDBCException {
Properties properties = new Properties();
// Valid consecutive patterns
properties.setProperty("databases.0.path", "/valid/path1");
properties.setProperty("databases.1.path", "/valid/path2");
// Invalid patterns that should be ignored
properties.setProperty("databases.path", "/invalid/path1");
properties.setProperty("databases.abc.path", "/invalid/path2");
properties.setProperty("databases.0.other", "/invalid/path3");
properties.setProperty("otherdatabases.0.path", "/invalid/path4");

ConnectionProperties connectionProperties = ConnectionProperties.of(properties);

// Only the two valid patterns should be parsed
assertThat(connectionProperties.getAttachedDatabases()).hasSize(2);
assertThat(connectionProperties.getAttachedDatabases().get(0).getPath()).isEqualTo("/valid/path1");
assertThat(connectionProperties.getAttachedDatabases().get(1).getPath()).isEqualTo("/valid/path2");
}

@Test
void testDatabasesSortedByIndex() throws DataCloudJDBCException {
Properties properties = new Properties();
// Add databases in non-sequential order
properties.setProperty("databases.2.path", "/path/to/db3");
properties.setProperty("databases.0.path", "/path/to/db1");
properties.setProperty("databases.1.path", "/path/to/db2");

ConnectionProperties connectionProperties = ConnectionProperties.of(properties);

// Databases should be sorted by index
assertThat(connectionProperties.getAttachedDatabases()).hasSize(3);
assertThat(connectionProperties.getAttachedDatabases().get(0).getPath()).isEqualTo("/path/to/db1");
assertThat(connectionProperties.getAttachedDatabases().get(1).getPath()).isEqualTo("/path/to/db2");
assertThat(connectionProperties.getAttachedDatabases().get(2).getPath()).isEqualTo("/path/to/db3");
}

@Test
void testDatabasesMissingIndexRaisesError() {
Properties properties = new Properties();
properties.setProperty("databases.0.path", "/path/to/db1");
properties.setProperty("databases.2.path", "/path/to/db3"); // Missing index 1

val exception = assertThrows(DataCloudJDBCException.class, () -> ConnectionProperties.of(properties));
assertThat(exception.getMessage()).contains("Database indexes must be consecutive starting from 0");
assertThat(exception.getMessage()).contains("Missing index: 1");
assertThat(exception.getMessage()).contains("Found indexes: [0, 2]");
}

@Test
void testDatabasesNotStartingFromZeroRaisesError() {
Properties properties = new Properties();
properties.setProperty("databases.1.path", "/path/to/db1"); // Should start from 0
properties.setProperty("databases.2.path", "/path/to/db2");

val exception = assertThrows(DataCloudJDBCException.class, () -> ConnectionProperties.of(properties));
assertThat(exception.getMessage()).contains("Database indexes must be consecutive starting from 0");
assertThat(exception.getMessage()).contains("Missing index: 0");
assertThat(exception.getMessage()).contains("Found indexes: [1, 2]");
}

@Test
void testUnknownTopLevelPropertyRaisesUserError() {
Properties properties = new Properties();
Expand Down