Skip to content

Commit fa7fb68

Browse files
committed
fix: set destinationCatalogId when creating DA connection via public-api (#17656)
1 parent d2030ed commit fa7fb68

File tree

7 files changed

+63
-1
lines changed

7 files changed

+63
-1
lines changed

airbyte-commons-server/src/main/kotlin/io/airbyte/commons/server/services/DestinationDiscoverService.kt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,16 @@ class DestinationDiscoverService(
8888
return actorCatalogToDestinationCatalog(actorCatalog)
8989
}
9090

91+
fun getDestinationCatalogIfSupported(
92+
destinationId: ActorId,
93+
skipCache: Boolean = false,
94+
): DestinationCatalogWithId? =
95+
try {
96+
getDestinationCatalog(destinationId, skipCache)
97+
} catch (_: DestinationDiscoverNotSupportedProblem) {
98+
null
99+
}
100+
91101
/**
92102
* Retrieves the destination catalog associated with a given connection.
93103
*

airbyte-commons-server/src/test/kotlin/io/airbyte/commons/server/services/DestinationDiscoverServiceTest.kt

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,33 @@ class DestinationDiscoverServiceTest {
222222
}
223223
}
224224

225+
@Nested
226+
inner class GetDestinationCatalogIfSupported {
227+
@Test
228+
fun `should return null when destination version does not support data activation`() {
229+
val destination =
230+
DestinationConnection()
231+
.withWorkspaceId(workspaceId)
232+
.withDestinationDefinitionId(destinationDefinitionId)
233+
.withDestinationId(destinationId.value)
234+
.withConfiguration(Jsons.emptyObject())
235+
236+
val destinationDefinition = StandardDestinationDefinition()
237+
val destinationVersion =
238+
ActorDefinitionVersion()
239+
.withDockerImageTag("1.0.1")
240+
.withSupportsDataActivation(false)
241+
242+
every { destinationService.getDestinationConnection(destinationId.value) } returns destination
243+
every { destinationService.getStandardDestinationDefinition(destinationDefinitionId) } returns destinationDefinition
244+
every { actorDefinitionVersionHelper.getDestinationVersion(destinationDefinition, workspaceId, destinationId.value) } returns destinationVersion
245+
246+
val result = service.getDestinationCatalogIfSupported(destinationId)
247+
248+
result shouldBe null
249+
}
250+
}
251+
225252
@Nested
226253
inner class WriteCatalogResult {
227254
@Test

airbyte-server/src/main/kotlin/io/airbyte/server/apis/publicapi/controllers/ConnectionsController.kt

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,11 @@ import io.airbyte.api.model.generated.SourceDiscoverSchemaRead
1111
import io.airbyte.commons.auth.roles.AuthRoleConstants
1212
import io.airbyte.commons.server.authorization.RoleResolver
1313
import io.airbyte.commons.server.scheduling.AirbyteTaskExecutors
14+
import io.airbyte.commons.server.services.DestinationDiscoverService
1415
import io.airbyte.commons.server.support.AuthenticationId
1516
import io.airbyte.commons.server.support.CurrentUserService
17+
import io.airbyte.config.DestinationCatalog
18+
import io.airbyte.domain.models.ActorId
1619
import io.airbyte.publicApi.server.generated.apis.PublicConnectionsApi
1720
import io.airbyte.publicApi.server.generated.models.ConnectionCreateRequest
1821
import io.airbyte.publicApi.server.generated.models.ConnectionPatchRequest
@@ -48,6 +51,7 @@ open class ConnectionsController(
4851
private val connectionService: ConnectionService,
4952
private val sourceService: SourceService,
5053
private val destinationService: DestinationService,
54+
private val destinationDiscoverService: DestinationDiscoverService,
5155
private val trackingHelper: TrackingHelper,
5256
private val roleResolver: RoleResolver,
5357
private val currentUserService: CurrentUserService,
@@ -105,6 +109,14 @@ open class ConnectionsController(
105109
userId,
106110
) as List<DestinationSyncMode>
107111

112+
// get destination_catalog_id
113+
val destinationCatalogId =
114+
destinationDiscoverService
115+
.getDestinationCatalogIfSupported(
116+
destinationId = ActorId(connectionCreateRequest.destinationId),
117+
skipCache = false,
118+
)?.catalogId
119+
108120
// refer to documentation to understand what we need to do for the catalog
109121
// https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#catalog
110122
var configuredCatalog = AirbyteCatalog()
@@ -135,6 +147,7 @@ open class ConnectionsController(
135147
connectionService.createConnection(
136148
validConnectionCreateRequest,
137149
catalogId!!,
150+
destinationCatalogId?.value,
138151
finalConfiguredCatalog,
139152
destinationRead.workspaceId,
140153
)

airbyte-server/src/main/kotlin/io/airbyte/server/apis/publicapi/mappers/ConnectionCreateMapper.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ object ConnectionCreateMapper {
2929
fun from(
3030
connectionCreateRequest: ConnectionCreateRequest,
3131
catalogId: UUID?,
32+
destinationCatalogId: UUID?,
3233
configuredCatalog: AirbyteCatalog?,
3334
): ConnectionCreate {
3435
val connectionCreateOss = ConnectionCreate()
@@ -67,6 +68,9 @@ object ConnectionCreateMapper {
6768
if (configuredCatalog != null) {
6869
connectionCreateOss.syncCatalog = configuredCatalog
6970
}
71+
if (destinationCatalogId != null) {
72+
connectionCreateOss.destinationCatalogId = destinationCatalogId
73+
}
7074
if (connectionCreateRequest.status != null) {
7175
connectionCreateOss.status = ConnectionStatus.fromValue(connectionCreateRequest.status.toString())
7276
} else {

airbyte-server/src/main/kotlin/io/airbyte/server/apis/publicapi/services/ConnectionService.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ interface ConnectionService {
3535
fun createConnection(
3636
connectionCreateRequest: ConnectionCreateRequest,
3737
catalogId: UUID,
38+
destinationCatalogId: UUID?,
3839
configuredCatalog: AirbyteCatalog,
3940
workspaceId: UUID,
4041
): ConnectionResponse
@@ -79,13 +80,15 @@ class ConnectionServiceImpl(
7980
override fun createConnection(
8081
connectionCreateRequest: ConnectionCreateRequest,
8182
catalogId: UUID,
83+
destinationCatalogId: UUID?,
8284
configuredCatalog: AirbyteCatalog,
8385
workspaceId: UUID,
8486
): ConnectionResponse {
8587
val connectionCreateOss: ConnectionCreate =
8688
ConnectionCreateMapper.from(
8789
connectionCreateRequest,
8890
catalogId,
91+
destinationCatalogId,
8992
configuredCatalog,
9093
)
9194

airbyte-server/src/test/kotlin/io/airbyte/server/apis/publicapi/controllers/ConnectionsControllerTest.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import io.airbyte.api.model.generated.AirbyteStreamAndConfiguration
99
import io.airbyte.api.model.generated.DestinationRead
1010
import io.airbyte.api.model.generated.SourceDiscoverSchemaRead
1111
import io.airbyte.commons.server.authorization.RoleResolver
12+
import io.airbyte.commons.server.services.DestinationDiscoverService
1213
import io.airbyte.commons.server.support.CurrentUserService
1314
import io.airbyte.config.AuthenticatedUser
1415
import io.airbyte.publicApi.server.generated.models.ConnectionPatchRequest
@@ -37,6 +38,7 @@ class ConnectionsControllerTest {
3738
private val trackingHelper: TrackingHelper = mockk(relaxed = true)
3839
private val roleResolver: RoleResolver = mockk(relaxed = true)
3940
private val currentUserService: CurrentUserService = mockk()
41+
private val destinationDiscoverService: DestinationDiscoverService = mockk(relaxed = true)
4042

4143
@BeforeEach
4244
fun setUp() {
@@ -56,6 +58,7 @@ class ConnectionsControllerTest {
5658
trackingHelper = trackingHelper,
5759
roleResolver = roleResolver,
5860
currentUserService = currentUserService,
61+
destinationDiscoverService = destinationDiscoverService,
5962
)
6063
}
6164

airbyte-server/src/test/kotlin/io/airbyte/server/apis/publicapi/mappers/ConnectionCreateMapperTest.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ class ConnectionCreateMapperTest {
2424
@Test
2525
fun testConnectionCreateMapper() {
2626
val catalogId = UUID.randomUUID()
27+
val destinationCatalogId = UUID.randomUUID()
2728

2829
val catalog =
2930
AirbyteCatalog().apply {
@@ -59,6 +60,7 @@ class ConnectionCreateMapperTest {
5960
this.prefix = "test"
6061
this.scheduleType = ConnectionScheduleType.CRON
6162
this.sourceCatalogId = catalogId
63+
this.destinationCatalogId = destinationCatalogId
6264
this.syncCatalog = catalog
6365
this.status = ConnectionStatus.INACTIVE
6466
val connectionScheduleDataCron =
@@ -72,6 +74,6 @@ class ConnectionCreateMapperTest {
7274
}
7375
this.scheduleData = connectionScheduleData
7476
}
75-
assertEquals(expectedOssConnectionCreateRequest, ConnectionCreateMapper.from(connectionCreateRequest, catalogId, catalog))
77+
assertEquals(expectedOssConnectionCreateRequest, ConnectionCreateMapper.from(connectionCreateRequest, catalogId, destinationCatalogId, catalog))
7678
}
7779
}

0 commit comments

Comments
 (0)