Skip to content

Commit 128e3a8

Browse files
treff7esshirshanka
andauthored
fix(ingest): bigquery-beta - turning sql parsing off in lineage extraction (#6163)
Co-authored-by: Shirshanka Das <[email protected]>
1 parent d569734 commit 128e3a8

File tree

8 files changed

+300
-92
lines changed

8 files changed

+300
-92
lines changed

metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -540,6 +540,9 @@ def _process_project(
540540
)
541541
return
542542

543+
self.report.num_project_datasets_to_scan[project_id] = len(
544+
bigquery_project.datasets
545+
)
543546
for bigquery_dataset in bigquery_project.datasets:
544547

545548
if not self.config.dataset_pattern.allowed(bigquery_dataset.name):
@@ -619,7 +622,9 @@ def _process_table(
619622
self.report.report_dropped(table_identifier.raw_table_name())
620623
return
621624

622-
table.columns = self.get_columns_for_table(conn, table_identifier)
625+
table.columns = self.get_columns_for_table(
626+
conn, table_identifier, self.config.column_limit
627+
)
623628
if not table.columns:
624629
logger.warning(f"Unable to get columns for table: {table_identifier}")
625630

@@ -653,7 +658,9 @@ def _process_view(
653658
self.report.report_dropped(table_identifier.raw_table_name())
654659
return
655660

656-
view.columns = self.get_columns_for_table(conn, table_identifier)
661+
view.columns = self.get_columns_for_table(
662+
conn, table_identifier, column_limit=self.config.column_limit
663+
)
657664

658665
lineage_info: Optional[Tuple[UpstreamLineage, Dict[str, str]]] = None
659666
if self.config.include_table_lineage:
@@ -877,8 +884,8 @@ def gen_schema_fields(self, columns: List[BigqueryColumn]) -> List[SchemaField]:
877884
_COMPLEX_TYPE = re.compile("^(struct|array)")
878885
last_id = -1
879886
for col in columns:
880-
881-
if _COMPLEX_TYPE.match(col.data_type.lower()):
887+
# if col.data_type is empty that means this column is part of a complex type
888+
if col.data_type is None or _COMPLEX_TYPE.match(col.data_type.lower()):
882889
# If the we have seen the ordinal position that most probably means we already processed this complex type
883890
if last_id != col.ordinal_position:
884891
schema_fields.extend(
@@ -1099,7 +1106,10 @@ def get_views_for_dataset(
10991106
return views.get(dataset_name, [])
11001107

11011108
def get_columns_for_table(
1102-
self, conn: bigquery.Client, table_identifier: BigqueryTableIdentifier
1109+
self,
1110+
conn: bigquery.Client,
1111+
table_identifier: BigqueryTableIdentifier,
1112+
column_limit: Optional[int] = None,
11031113
) -> List[BigqueryColumn]:
11041114

11051115
if (
@@ -1110,6 +1120,7 @@ def get_columns_for_table(
11101120
conn,
11111121
project_id=table_identifier.project_id,
11121122
dataset_name=table_identifier.dataset,
1123+
column_limit=column_limit,
11131124
)
11141125
self.schema_columns[
11151126
(table_identifier.project_id, table_identifier.dataset)
@@ -1125,7 +1136,9 @@ def get_columns_for_table(
11251136
logger.warning(
11261137
f"Couldn't get columns on the dataset level for {table_identifier}. Trying to get on table level..."
11271138
)
1128-
return BigQueryDataDictionary.get_columns_for_table(conn, table_identifier)
1139+
return BigQueryDataDictionary.get_columns_for_table(
1140+
conn, table_identifier, self.config.column_limit
1141+
)
11291142

11301143
# Access to table but none of its columns - is this possible ?
11311144
return columns.get(table_identifier.table, [])

metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_audit.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ def get_table_display_name(self) -> str:
121121
]
122122
if invalid_chars_in_table_name:
123123
raise ValueError(
124-
f"Cannot handle {self} - poorly formatted table name, contains {invalid_chars_in_table_name}"
124+
f"Cannot handle {self.raw_table_name()} - poorly formatted table name, contains {invalid_chars_in_table_name}"
125125
)
126126
return table_name
127127

@@ -207,6 +207,7 @@ class QueryEvent:
207207
actor_email: str
208208
query: str
209209
statementType: str
210+
project_id: str
210211

211212
job_name: Optional[str] = None
212213
destinationTable: Optional[BigQueryTableRef] = None
@@ -238,6 +239,15 @@ def _job_name_ref(project: str, jobId: str) -> Optional[str]:
238239
return f"projects/{project}/jobs/{jobId}"
239240
return None
240241

242+
@staticmethod
243+
def _get_project_id_from_job_name(job_name: str) -> str:
244+
project_id_pattern = r"projects\/(.*)\/jobs\/.*"
245+
matches = re.match(project_id_pattern, job_name, re.MULTILINE)
246+
if matches:
247+
return matches.group(1)
248+
else:
249+
raise ValueError(f"Unable to get project_id from jobname: {job_name}")
250+
241251
@classmethod
242252
def from_entry(
243253
cls, entry: AuditLogEntry, debug_include_full_payloads: bool = False
@@ -253,6 +263,7 @@ def from_entry(
253263
job.get("jobName", {}).get("projectId"),
254264
job.get("jobName", {}).get("jobId"),
255265
),
266+
project_id=job.get("jobName", {}).get("projectId"),
256267
default_dataset=job_query_conf["defaultDataset"]
257268
if job_query_conf["defaultDataset"]
258269
else None,
@@ -331,6 +342,7 @@ def from_exported_bigquery_audit_metadata(
331342
actor_email=payload["authenticationInfo"]["principalEmail"],
332343
query=query_config["query"],
333344
job_name=job["jobName"],
345+
project_id=QueryEvent._get_project_id_from_job_name(job["jobName"]),
334346
default_dataset=query_config["defaultDataset"]
335347
if query_config.get("defaultDataset")
336348
else None,
@@ -392,6 +404,7 @@ def from_entry_v2(
392404
# basic query_event
393405
query_event = QueryEvent(
394406
job_name=job["jobName"],
407+
project_id=QueryEvent._get_project_id_from_job_name(job["jobName"]),
395408
timestamp=row.timestamp,
396409
actor_email=payload["authenticationInfo"]["principalEmail"],
397410
query=query_config["query"],

metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ class BigQueryV2Config(BigQueryConfig):
3232
usage: BigQueryUsageConfig = Field(
3333
default=BigQueryUsageConfig(), description="Usage related configs"
3434
)
35+
3536
include_usage_statistics: bool = Field(
3637
default=True,
3738
description="Generate usage statistic",
@@ -56,14 +57,22 @@ class BigQueryV2Config(BigQueryConfig):
5657
default=50,
5758
description="Number of table queried in batch when getting metadata. This is a low leve config propert which should be touched with care. This restriction needed because we query partitions system view which throws error if we try to touch too many tables.",
5859
)
59-
60+
column_limit: int = Field(
61+
default=1000,
62+
description="Maximum number of columns to process in a table",
63+
)
6064
# The inheritance hierarchy is wonky here, but these options need modifications.
6165
project_id: Optional[str] = Field(
6266
default=None,
6367
description="[deprecated] Use project_id_pattern instead.",
6468
)
6569
storage_project_id: None = Field(default=None, exclude=True)
6670

71+
lineage_use_sql_parser: bool = Field(
72+
default=False,
73+
description="Experimental. Use sql parser to resolve view/table lineage. If there is a view being referenced then bigquery sends both the view as well as underlying tablein the references. There is no distinction between direct/base objects accessed. So doing sql parsing to ensure we only use direct objects accessed for lineage.",
74+
)
75+
6776
@root_validator(pre=False)
6877
def profile_default_settings(cls, values: Dict) -> Dict:
6978
# Extra default SQLAlchemy option for better connection pooling and threading.

metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py

Lines changed: 31 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -8,27 +8,40 @@
88

99
from datahub.ingestion.source.sql.sql_common import SQLSourceReport
1010
from datahub.utilities.lossy_collections import LossyDict, LossyList
11+
from datahub.utilities.stats_collections import TopKDict
1112

1213

1314
@dataclass
1415
class BigQueryV2Report(SQLSourceReport):
15-
num_total_lineage_entries: Optional[int] = None
16-
num_skipped_lineage_entries_missing_data: Optional[int] = None
17-
num_skipped_lineage_entries_not_allowed: Optional[int] = None
18-
num_lineage_entries_sql_parser_failure: Optional[int] = None
19-
num_skipped_lineage_entries_other: Optional[int] = None
20-
num_total_log_entries: Optional[int] = None
21-
num_parsed_log_entires: Optional[int] = None
22-
num_total_audit_entries: Optional[int] = None
23-
num_parsed_audit_entires: Optional[int] = None
16+
num_total_lineage_entries: TopKDict[str, int] = field(default_factory=TopKDict)
17+
num_skipped_lineage_entries_missing_data: TopKDict[str, int] = field(
18+
default_factory=TopKDict
19+
)
20+
num_skipped_lineage_entries_not_allowed: TopKDict[str, int] = field(
21+
default_factory=TopKDict
22+
)
23+
num_lineage_entries_sql_parser_failure: TopKDict[str, int] = field(
24+
default_factory=TopKDict
25+
)
26+
num_lineage_entries_sql_parser_success: TopKDict[str, int] = field(
27+
default_factory=TopKDict
28+
)
29+
num_skipped_lineage_entries_other: TopKDict[str, int] = field(
30+
default_factory=TopKDict
31+
)
32+
num_total_log_entries: TopKDict[str, int] = field(default_factory=TopKDict)
33+
num_parsed_log_entries: TopKDict[str, int] = field(default_factory=TopKDict)
34+
num_total_audit_entries: TopKDict[str, int] = field(default_factory=TopKDict)
35+
num_parsed_audit_entries: TopKDict[str, int] = field(default_factory=TopKDict)
2436
bigquery_audit_metadata_datasets_missing: Optional[bool] = None
2537
lineage_failed_extraction: LossyList[str] = field(default_factory=LossyList)
26-
lineage_metadata_entries: Optional[int] = None
27-
lineage_mem_size: Optional[str] = None
28-
lineage_extraction_sec: Dict[str, float] = field(default_factory=dict)
29-
usage_extraction_sec: Dict[str, float] = field(default_factory=dict)
38+
lineage_metadata_entries: TopKDict[str, int] = field(default_factory=TopKDict)
39+
lineage_mem_size: Dict[str, str] = field(default_factory=TopKDict)
40+
lineage_extraction_sec: Dict[str, float] = field(default_factory=TopKDict)
41+
usage_extraction_sec: Dict[str, float] = field(default_factory=TopKDict)
3042
usage_failed_extraction: LossyList[str] = field(default_factory=LossyList)
31-
metadata_extraction_sec: Dict[str, float] = field(default_factory=dict)
43+
num_project_datasets_to_scan: Dict[str, int] = field(default_factory=TopKDict)
44+
metadata_extraction_sec: Dict[str, float] = field(default_factory=TopKDict)
3245
include_table_lineage: Optional[bool] = None
3346
use_date_sharded_audit_log_tables: Optional[bool] = None
3447
log_page_size: Optional[pydantic.PositiveInt] = None
@@ -40,10 +53,10 @@ class BigQueryV2Report(SQLSourceReport):
4053
audit_start_time: Optional[str] = None
4154
audit_end_time: Optional[str] = None
4255
upstream_lineage: LossyDict = field(default_factory=LossyDict)
43-
partition_info: Dict[str, str] = field(default_factory=dict)
44-
profile_table_selection_criteria: Dict[str, str] = field(default_factory=dict)
45-
selected_profile_tables: Dict[str, List[str]] = field(default_factory=dict)
46-
invalid_partition_ids: Dict[str, str] = field(default_factory=dict)
56+
partition_info: Dict[str, str] = field(default_factory=TopKDict)
57+
profile_table_selection_criteria: Dict[str, str] = field(default_factory=TopKDict)
58+
selected_profile_tables: Dict[str, List[str]] = field(default_factory=TopKDict)
59+
invalid_partition_ids: Dict[str, str] = field(default_factory=TopKDict)
4760
allow_pattern: Optional[str] = None
4861
deny_pattern: Optional[str] = None
4962
num_usage_workunits_emitted: Optional[int] = None

metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py

Lines changed: 61 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ class BigqueryQuery:
227227
c.ordinal_position as ordinal_position,
228228
cfp.field_path as field_path,
229229
c.is_nullable as is_nullable,
230-
c.data_type as data_type,
230+
CASE WHEN CONTAINS_SUBSTR(field_path, ".") THEN NULL ELSE c.data_type END as data_type,
231231
description as comment,
232232
c.is_hidden as is_hidden,
233233
c.is_partitioning_column as is_partitioning_column
@@ -236,7 +236,7 @@ class BigqueryQuery:
236236
join `{project_id}`.`{dataset_name}`.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS as cfp on cfp.table_name = c.table_name
237237
and cfp.column_name = c.column_name
238238
ORDER BY
239-
ordinal_position"""
239+
table_catalog, table_schema, table_name, ordinal_position ASC, data_type DESC"""
240240

241241
columns_for_table: str = """
242242
select
@@ -247,7 +247,7 @@ class BigqueryQuery:
247247
c.ordinal_position as ordinal_position,
248248
cfp.field_path as field_path,
249249
c.is_nullable as is_nullable,
250-
c.data_type as data_type,
250+
CASE WHEN CONTAINS_SUBSTR(field_path, ".") THEN NULL ELSE c.data_type END as data_type,
251251
c.is_hidden as is_hidden,
252252
c.is_partitioning_column as is_partitioning_column,
253253
description as comment
@@ -258,7 +258,7 @@ class BigqueryQuery:
258258
where
259259
c.table_name = '{table_identifier.table}'
260260
ORDER BY
261-
ordinal_position"""
261+
table_catalog, table_schema, table_name, ordinal_position ASC, data_type DESC"""
262262

263263

264264
class BigQueryDataDictionary:
@@ -419,7 +419,10 @@ def get_views_for_dataset(
419419

420420
@staticmethod
421421
def get_columns_for_dataset(
422-
conn: bigquery.Client, project_id: str, dataset_name: str
422+
conn: bigquery.Client,
423+
project_id: str,
424+
dataset_name: str,
425+
column_limit: Optional[int] = None,
423426
) -> Optional[Dict[str, List[BigqueryColumn]]]:
424427
columns: Dict[str, List[BigqueryColumn]] = defaultdict(list)
425428
try:
@@ -435,40 +438,70 @@ def get_columns_for_dataset(
435438
# Please repeat query with more selective predicates.
436439
return None
437440

441+
last_seen_table: str = ""
438442
for column in cur:
439-
columns[column.table_name].append(
440-
BigqueryColumn(
441-
name=column.column_name,
442-
ordinal_position=column.ordinal_position,
443-
field_path=column.field_path,
444-
is_nullable=column.is_nullable == "YES",
445-
data_type=column.data_type,
446-
comment=column.comment,
447-
is_partition_column=column.is_partitioning_column == "YES",
443+
if (
444+
column_limit
445+
and column.table_name in columns
446+
and len(columns[column.table_name]) >= column_limit
447+
):
448+
if last_seen_table != column.table_name:
449+
logger.warning(
450+
f"{project_id}.{dataset_name}.{column.table_name} contains more than {column_limit} columns, only processing {column_limit} columns"
451+
)
452+
last_seen_table = column.table_name
453+
else:
454+
columns[column.table_name].append(
455+
BigqueryColumn(
456+
name=column.column_name,
457+
ordinal_position=column.ordinal_position,
458+
field_path=column.field_path,
459+
is_nullable=column.is_nullable == "YES",
460+
data_type=column.data_type,
461+
comment=column.comment,
462+
is_partition_column=column.is_partitioning_column == "YES",
463+
)
448464
)
449-
)
450465

451466
return columns
452467

453468
@staticmethod
454469
def get_columns_for_table(
455-
conn: bigquery.Client, table_identifier: BigqueryTableIdentifier
470+
conn: bigquery.Client,
471+
table_identifier: BigqueryTableIdentifier,
472+
column_limit: Optional[int],
456473
) -> List[BigqueryColumn]:
457474

458475
cur = BigQueryDataDictionary.get_query_result(
459476
conn,
460477
BigqueryQuery.columns_for_table.format(table_identifier=table_identifier),
461478
)
462479

463-
return [
464-
BigqueryColumn(
465-
name=column.column_name,
466-
ordinal_position=column.ordinal_position,
467-
is_nullable=column.is_nullable == "YES",
468-
field_path=column.field_path,
469-
data_type=column.data_type,
470-
comment=column.comment,
471-
is_partition_column=column.is_partitioning_column == "YES",
472-
)
473-
for column in cur
474-
]
480+
columns: List[BigqueryColumn] = []
481+
last_seen_table: str = ""
482+
for column in cur:
483+
if (
484+
column_limit
485+
and column.table_name in columns
486+
and len(columns[column.table_name]) >= column_limit
487+
):
488+
if last_seen_table != column.table_name:
489+
logger.warning(
490+
f"{table_identifier.project_id}.{table_identifier.dataset}.{column.table_name} contains more than {column_limit} columns, only processing {column_limit} columns"
491+
)
492+
last_seen_table = column.table_name
493+
else:
494+
columns.append(
495+
BigqueryColumn(
496+
name=column.column_name,
497+
ordinal_position=column.ordinal_position,
498+
is_nullable=column.is_nullable == "YES",
499+
field_path=column.field_path,
500+
data_type=column.data_type,
501+
comment=column.comment,
502+
is_partition_column=column.is_partitioning_column == "YES",
503+
)
504+
)
505+
last_seen_table = column.table_name
506+
507+
return columns

0 commit comments

Comments
 (0)