Skip to content

Commit 452acd1

Browse files
fix: Respect explicit use_cache: false for parent streams in declarative sources (#864)
Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
1 parent 953fc35 commit 452acd1

File tree

5 files changed

+277
-31
lines changed

5 files changed

+277
-31
lines changed

airbyte_cdk/legacy/sources/declarative/manifest_declarative_source.py

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -341,29 +341,41 @@ def streams(self, config: Mapping[str, Any]) -> List[Union[Stream, AbstractStrea
341341
def _initialize_cache_for_parent_streams(
342342
stream_configs: List[Dict[str, Any]],
343343
) -> List[Dict[str, Any]]:
344+
"""Enable caching for parent streams unless explicitly disabled.
345+
346+
Caching is enabled by default for parent streams to optimize performance when the same
347+
parent data is needed by multiple child streams. However, explicit `use_cache: false`
348+
settings are respected for streams that cannot use caching (e.g., scroll-based pagination
349+
APIs where caching causes duplicate records).
350+
"""
344351
parent_streams = set()
345352

353+
def _set_cache_if_not_disabled(requester: Dict[str, Any]) -> None:
354+
"""Set use_cache to True only if not explicitly disabled."""
355+
if requester.get("use_cache") is not False:
356+
requester["use_cache"] = True
357+
346358
def update_with_cache_parent_configs(
347359
parent_configs: list[dict[str, Any]],
348360
) -> None:
349361
for parent_config in parent_configs:
350362
parent_streams.add(parent_config["stream"]["name"])
351363
if parent_config["stream"]["type"] == "StateDelegatingStream":
352-
parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"][
353-
"use_cache"
354-
] = True
355-
parent_config["stream"]["incremental_stream"]["retriever"]["requester"][
356-
"use_cache"
357-
] = True
364+
_set_cache_if_not_disabled(
365+
parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"]
366+
)
367+
_set_cache_if_not_disabled(
368+
parent_config["stream"]["incremental_stream"]["retriever"]["requester"]
369+
)
358370
else:
359-
parent_config["stream"]["retriever"]["requester"]["use_cache"] = True
371+
_set_cache_if_not_disabled(parent_config["stream"]["retriever"]["requester"])
360372

361373
for stream_config in stream_configs:
362374
if stream_config.get("incremental_sync", {}).get("parent_stream"):
363375
parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"])
364-
stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"][
365-
"use_cache"
366-
] = True
376+
_set_cache_if_not_disabled(
377+
stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"]
378+
)
367379

368380
elif stream_config.get("retriever", {}).get("partition_router", {}):
369381
partition_router = stream_config["retriever"]["partition_router"]
@@ -380,14 +392,14 @@ def update_with_cache_parent_configs(
380392
for stream_config in stream_configs:
381393
if stream_config["name"] in parent_streams:
382394
if stream_config["type"] == "StateDelegatingStream":
383-
stream_config["full_refresh_stream"]["retriever"]["requester"]["use_cache"] = (
384-
True
395+
_set_cache_if_not_disabled(
396+
stream_config["full_refresh_stream"]["retriever"]["requester"]
385397
)
386-
stream_config["incremental_stream"]["retriever"]["requester"]["use_cache"] = (
387-
True
398+
_set_cache_if_not_disabled(
399+
stream_config["incremental_stream"]["retriever"]["requester"]
388400
)
389401
else:
390-
stream_config["retriever"]["requester"]["use_cache"] = True
402+
_set_cache_if_not_disabled(stream_config["retriever"]["requester"])
391403
return stream_configs
392404

393405
def spec(self, logger: logging.Logger) -> ConnectorSpecification:

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -424,29 +424,41 @@ def streams(self, config: Mapping[str, Any]) -> List[AbstractStream]: # type: i
424424
def _initialize_cache_for_parent_streams(
425425
stream_configs: List[Dict[str, Any]],
426426
) -> List[Dict[str, Any]]:
427+
"""Enable caching for parent streams unless explicitly disabled.
428+
429+
Caching is enabled by default for parent streams to optimize performance when the same
430+
parent data is needed by multiple child streams. However, explicit `use_cache: false`
431+
settings are respected for streams that cannot use caching (e.g., scroll-based pagination
432+
APIs where caching causes duplicate records).
433+
"""
427434
parent_streams = set()
428435

436+
def _set_cache_if_not_disabled(requester: Dict[str, Any]) -> None:
437+
"""Set use_cache to True only if not explicitly disabled."""
438+
if requester.get("use_cache") is not False:
439+
requester["use_cache"] = True
440+
429441
def update_with_cache_parent_configs(
430442
parent_configs: list[dict[str, Any]],
431443
) -> None:
432444
for parent_config in parent_configs:
433445
parent_streams.add(parent_config["stream"]["name"])
434446
if parent_config["stream"]["type"] == "StateDelegatingStream":
435-
parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"][
436-
"use_cache"
437-
] = True
438-
parent_config["stream"]["incremental_stream"]["retriever"]["requester"][
439-
"use_cache"
440-
] = True
447+
_set_cache_if_not_disabled(
448+
parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"]
449+
)
450+
_set_cache_if_not_disabled(
451+
parent_config["stream"]["incremental_stream"]["retriever"]["requester"]
452+
)
441453
else:
442-
parent_config["stream"]["retriever"]["requester"]["use_cache"] = True
454+
_set_cache_if_not_disabled(parent_config["stream"]["retriever"]["requester"])
443455

444456
for stream_config in stream_configs:
445457
if stream_config.get("incremental_sync", {}).get("parent_stream"):
446458
parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"])
447-
stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"][
448-
"use_cache"
449-
] = True
459+
_set_cache_if_not_disabled(
460+
stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"]
461+
)
450462

451463
elif stream_config.get("retriever", {}).get("partition_router", {}):
452464
partition_router = stream_config["retriever"]["partition_router"]
@@ -463,14 +475,14 @@ def update_with_cache_parent_configs(
463475
for stream_config in stream_configs:
464476
if stream_config["name"] in parent_streams:
465477
if stream_config["type"] == "StateDelegatingStream":
466-
stream_config["full_refresh_stream"]["retriever"]["requester"]["use_cache"] = (
467-
True
478+
_set_cache_if_not_disabled(
479+
stream_config["full_refresh_stream"]["retriever"]["requester"]
468480
)
469-
stream_config["incremental_stream"]["retriever"]["requester"]["use_cache"] = (
470-
True
481+
_set_cache_if_not_disabled(
482+
stream_config["incremental_stream"]["retriever"]["requester"]
471483
)
472484
else:
473-
stream_config["retriever"]["requester"]["use_cache"] = True
485+
_set_cache_if_not_disabled(stream_config["retriever"]["requester"])
474486
return stream_configs
475487

476488
def spec(self, logger: logging.Logger) -> ConnectorSpecification:

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2286,7 +2286,7 @@ definitions:
22862286
- "$ref": "#/definitions/CustomErrorHandler"
22872287
use_cache:
22882288
title: Use Cache
2289-
description: Enables stream requests caching. This field is automatically set by the CDK.
2289+
description: Enables stream requests caching. When set to true, repeated requests to the same URL will return cached responses. Parent streams automatically have caching enabled. Only set this to false if you are certain that caching should be disabled, as it may negatively impact performance when the same data is needed multiple times (e.g., for scroll-based pagination APIs where caching causes duplicate records).
22902290
type: boolean
22912291
default: false
22922292
$parameters:

unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1652,6 +1652,112 @@ def test_only_parent_streams_use_cache():
16521652
assert not get_retriever(streams[2]).requester.use_cache
16531653

16541654

1655+
def test_parent_stream_respects_explicit_use_cache_false():
1656+
"""Test that explicit use_cache: false is respected for parent streams.
1657+
1658+
This is important for APIs that use scroll-based pagination (like Intercom's /companies/scroll
1659+
endpoint), where caching must be disabled because the same scroll_param is returned in
1660+
pagination responses, causing duplicate records and infinite pagination loops.
1661+
"""
1662+
# Parent stream with explicit use_cache: false
1663+
companies_stream = {
1664+
"type": "DeclarativeStream",
1665+
"$parameters": {
1666+
"name": "companies",
1667+
"primary_key": "id",
1668+
"url_base": "https://api.intercom.io/",
1669+
},
1670+
"schema_loader": {
1671+
"name": "{{ parameters.stream_name }}",
1672+
"file_path": "./source_intercom/schemas/{{ parameters.name }}.yaml",
1673+
},
1674+
"retriever": {
1675+
"paginator": {
1676+
"type": "DefaultPaginator",
1677+
"page_token_option": {"type": "RequestPath"},
1678+
"pagination_strategy": {
1679+
"type": "CursorPagination",
1680+
"cursor_value": "{{ response.get('scroll_param') }}",
1681+
"page_size": 100,
1682+
},
1683+
},
1684+
"requester": {
1685+
"path": "companies/scroll",
1686+
"use_cache": False, # Explicitly disabled for scroll-based pagination
1687+
"authenticator": {
1688+
"type": "BearerAuthenticator",
1689+
"api_token": "{{ config['api_key'] }}",
1690+
},
1691+
},
1692+
"record_selector": {"extractor": {"type": "DpathExtractor", "field_path": ["data"]}},
1693+
},
1694+
}
1695+
1696+
manifest = {
1697+
"version": "0.29.3",
1698+
"definitions": {},
1699+
"streams": [
1700+
deepcopy(companies_stream),
1701+
{
1702+
"type": "DeclarativeStream",
1703+
"$parameters": {
1704+
"name": "company_segments",
1705+
"primary_key": "id",
1706+
"url_base": "https://api.intercom.io/",
1707+
},
1708+
"schema_loader": {
1709+
"name": "{{ parameters.stream_name }}",
1710+
"file_path": "./source_intercom/schemas/{{ parameters.name }}.yaml",
1711+
},
1712+
"retriever": {
1713+
"paginator": {"type": "NoPagination"},
1714+
"requester": {
1715+
"path": "companies/{{ stream_partition.parent_id }}/segments",
1716+
"authenticator": {
1717+
"type": "BearerAuthenticator",
1718+
"api_token": "{{ config['api_key'] }}",
1719+
},
1720+
},
1721+
"record_selector": {
1722+
"extractor": {"type": "DpathExtractor", "field_path": ["data"]}
1723+
},
1724+
"partition_router": {
1725+
"parent_stream_configs": [
1726+
{
1727+
"parent_key": "id",
1728+
"partition_field": "parent_id",
1729+
"stream": deepcopy(companies_stream),
1730+
}
1731+
],
1732+
"type": "SubstreamPartitionRouter",
1733+
},
1734+
},
1735+
},
1736+
],
1737+
"check": {"type": "CheckStream", "stream_names": ["companies"]},
1738+
}
1739+
source = ManifestDeclarativeSource(source_config=manifest)
1740+
1741+
streams = source.streams({})
1742+
assert len(streams) == 2
1743+
1744+
# Main stream with explicit use_cache: false should remain false (parent for substream)
1745+
assert streams[0].name == "companies"
1746+
# use_cache should remain False because it was explicitly set to False
1747+
assert not get_retriever(streams[0]).requester.use_cache
1748+
1749+
# Substream
1750+
assert streams[1].name == "company_segments"
1751+
1752+
# Parent stream created for substream should also respect use_cache: false
1753+
stream_slicer = streams[1]._stream_partition_generator._stream_slicer
1754+
assert stream_slicer.parent_stream_configs[0].stream.name == "companies"
1755+
# The parent stream in the substream config should also have use_cache: false
1756+
assert not stream_slicer.parent_stream_configs[
1757+
0
1758+
].stream._stream_partition_generator._partition_factory._retriever.requester.use_cache
1759+
1760+
16551761
def _run_read(manifest: Mapping[str, Any], stream_name: str) -> List[AirbyteMessage]:
16561762
catalog = ConfiguredAirbyteCatalog(
16571763
streams=[

unit_tests/sources/declarative/test_concurrent_declarative_source.py

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3931,6 +3931,122 @@ def test_only_parent_streams_use_cache():
39313931
)
39323932

39333933

3934+
def test_parent_stream_respects_explicit_use_cache_false():
3935+
"""Test that explicit use_cache: false is respected for parent streams.
3936+
3937+
This is important for APIs that use scroll-based pagination (like Intercom's /companies/scroll
3938+
endpoint), where caching must be disabled because the same scroll_param is returned in
3939+
pagination responses, causing duplicate records and infinite pagination loops.
3940+
"""
3941+
# Parent stream with explicit use_cache: false
3942+
companies_stream = {
3943+
"type": "DeclarativeStream",
3944+
"$parameters": {
3945+
"name": "companies",
3946+
"primary_key": "id",
3947+
"url_base": "https://api.intercom.io/",
3948+
},
3949+
"schema_loader": {
3950+
"name": "{{ parameters.stream_name }}",
3951+
"file_path": "./source_intercom/schemas/{{ parameters.name }}.yaml",
3952+
},
3953+
"retriever": {
3954+
"paginator": {
3955+
"type": "DefaultPaginator",
3956+
"page_token_option": {"type": "RequestPath"},
3957+
"pagination_strategy": {
3958+
"type": "CursorPagination",
3959+
"cursor_value": "{{ response.get('scroll_param') }}",
3960+
"page_size": 100,
3961+
},
3962+
},
3963+
"requester": {
3964+
"path": "companies/scroll",
3965+
"use_cache": False, # Explicitly disabled for scroll-based pagination
3966+
"authenticator": {
3967+
"type": "BearerAuthenticator",
3968+
"api_token": "{{ config['api_key'] }}",
3969+
},
3970+
},
3971+
"record_selector": {"extractor": {"type": "DpathExtractor", "field_path": ["data"]}},
3972+
},
3973+
}
3974+
3975+
manifest = {
3976+
"version": "0.29.3",
3977+
"definitions": {},
3978+
"streams": [
3979+
deepcopy(companies_stream),
3980+
{
3981+
"type": "DeclarativeStream",
3982+
"$parameters": {
3983+
"name": "company_segments",
3984+
"primary_key": "id",
3985+
"url_base": "https://api.intercom.io/",
3986+
},
3987+
"schema_loader": {
3988+
"name": "{{ parameters.stream_name }}",
3989+
"file_path": "./source_intercom/schemas/{{ parameters.name }}.yaml",
3990+
},
3991+
"retriever": {
3992+
"paginator": {"type": "NoPagination"},
3993+
"requester": {
3994+
"path": "companies/{{ stream_partition.parent_id }}/segments",
3995+
"authenticator": {
3996+
"type": "BearerAuthenticator",
3997+
"api_token": "{{ config['api_key'] }}",
3998+
},
3999+
},
4000+
"record_selector": {
4001+
"extractor": {"type": "DpathExtractor", "field_path": ["data"]}
4002+
},
4003+
"partition_router": {
4004+
"parent_stream_configs": [
4005+
{
4006+
"parent_key": "id",
4007+
"partition_field": "parent_id",
4008+
"stream": deepcopy(companies_stream),
4009+
}
4010+
],
4011+
"type": "SubstreamPartitionRouter",
4012+
},
4013+
},
4014+
},
4015+
],
4016+
"check": {"type": "CheckStream", "stream_names": ["companies"]},
4017+
}
4018+
source = ConcurrentDeclarativeSource(
4019+
source_config=manifest, config={}, catalog=create_catalog("lists"), state=None
4020+
)
4021+
4022+
streams = source.streams({})
4023+
assert len(streams) == 2
4024+
4025+
# Main stream with explicit use_cache: false should remain false (parent for substream)
4026+
stream_0 = streams[0]
4027+
assert stream_0.name == "companies"
4028+
assert isinstance(stream_0, DefaultStream)
4029+
# use_cache should remain False because it was explicitly set to False
4030+
assert (
4031+
not stream_0._stream_partition_generator._partition_factory._retriever.requester.use_cache
4032+
)
4033+
4034+
# Substream
4035+
stream_1 = streams[1]
4036+
assert stream_1.name == "company_segments"
4037+
assert isinstance(stream_1, DefaultStream)
4038+
4039+
# Parent stream created for substream should also respect use_cache: false
4040+
assert (
4041+
stream_1._stream_partition_generator._stream_slicer.parent_stream_configs[0].stream.name
4042+
== "companies"
4043+
)
4044+
# The parent stream in the substream config should also have use_cache: false
4045+
assert not stream_1._stream_partition_generator._stream_slicer.parent_stream_configs[
4046+
0
4047+
].stream._stream_partition_generator._partition_factory._retriever.requester.use_cache
4048+
4049+
39344050
def _run_read(manifest: Mapping[str, Any], stream_name: str) -> List[AirbyteMessage]:
39354051
source = ConcurrentDeclarativeSource(
39364052
source_config=manifest, config={}, catalog=create_catalog("lists"), state=None

0 commit comments

Comments
 (0)