|
6 | 6 | from snowflake.connector.cursor import SnowflakeCursor
|
7 | 7 |
|
8 | 8 | from ..queries import execute_statement
|
9 |
| -from ..settings import connect, governance_settings |
| 9 | +from ..settings import connect, governance_settings, SnowflakeSettings |
10 | 10 | from .column import Column, MetadataColumn, _inserts, _matched, _type_cast
|
11 | 11 | from .enums import MatchByColumnName, TagLevel
|
12 | 12 | from .file_format import FileFormat, InlineFileFormat
|
@@ -271,7 +271,7 @@ def add_column(self, cursor: SnowflakeCursor, column: Column) -> None:
|
271 | 271 | def exists(self, cursor: SnowflakeCursor) -> bool:
|
272 | 272 | return bool(
|
273 | 273 | cursor.execute(
|
274 |
| - f"select table_name from information_schema.tables where table_name ilike '{self.name}' and table_schema = '{self.schema_name}'" |
| 274 | + f"select table_name from information_schema.tables where table_name ilike '{self.name}' and table_schema = '{self.schema_name}' and table_catalog = '{self.database or SnowflakeSettings().db}'" |
275 | 275 | ).fetchall()
|
276 | 276 | )
|
277 | 277 |
|
@@ -339,6 +339,48 @@ def copy_callable(table: Table, sync_tags: bool) -> None:
|
339 | 339 |
|
340 | 340 | return self._merge(copy_callable, primary_keys, replication_keys, qualify)
|
341 | 341 |
|
| 342 | + def setup_connection( |
| 343 | + self, |
| 344 | + path: str, |
| 345 | + storage_integration: str, |
| 346 | + cursor: SnowflakeCursor, |
| 347 | + file_format: FileFormat | InlineFileFormat, |
| 348 | + stage: str | None = None, |
| 349 | + ) -> callable: |
| 350 | + """Setup the connection including custom role, database, schema, and temporary stage""" |
| 351 | + _execute_statement = partial(execute_statement, cursor) |
| 352 | + if self.role is not None: |
| 353 | + logging.debug(f"Using role: {self.role}") |
| 354 | + _execute_statement(f"USE ROLE {self.role}") |
| 355 | + |
| 356 | + # If we don't have database in FQN, we need to set the database context |
| 357 | + if self.database is None: |
| 358 | + default_db = SnowflakeSettings().db |
| 359 | + logging.debug(f"Using default database: {default_db}") |
| 360 | + _execute_statement(f"USE DATABASE {default_db}") |
| 361 | + |
| 362 | + self.setup_file_format(_execute_statement, file_format) |
| 363 | + self.setup_stage(_execute_statement, storage_integration, path, stage) |
| 364 | + |
| 365 | + return _execute_statement |
| 366 | + |
| 367 | + def setup_stage( |
| 368 | + self, |
| 369 | + execute_statement: callable, |
| 370 | + storage_integration: str | None = None, |
| 371 | + path: str | None = None, |
| 372 | + stage: str | None = None, |
| 373 | + ) -> None: |
| 374 | + if stage: |
| 375 | + self._stage = f"{stage}/{path}" |
| 376 | + return None |
| 377 | + |
| 378 | + if storage_integration and path: |
| 379 | + execute_statement( |
| 380 | + self.get_create_temporary_external_stage(path, storage_integration) |
| 381 | + ) |
| 382 | + self._stage = self.temporary_stage |
| 383 | + |
342 | 384 | def qualify(
|
343 | 385 | self,
|
344 | 386 | cursor: SnowflakeCursor,
|
@@ -378,12 +420,12 @@ def _merge_statement(
|
378 | 420 | inserts = _inserts(columns, old_columns)
|
379 | 421 |
|
380 | 422 | logging.info(
|
381 |
| - f"Running merge statement on table: {self.fqn} using {temp_table.schema_name}.{temp_table.name}" |
| 423 | + f"Running merge statement on table: {self.fqn} using {temp_table.fqn}" |
382 | 424 | )
|
383 | 425 | logging.debug(f"Primary keys: {pkes}")
|
384 | 426 | return f"""
|
385 | 427 | merge into {self.fqn} as dest
|
386 |
| - using {temp_table.schema_name}.{temp_table.name} tmp |
| 428 | + using {temp_table.fqn} tmp |
387 | 429 | ON {pkes}
|
388 | 430 | when matched then update set {matched}
|
389 | 431 | when not matched then insert ({column_names}) VALUES ({inserts})
|
@@ -552,46 +594,3 @@ def copy_callable(table: Table, sync_tags: bool) -> None:
|
552 | 594 | )
|
553 | 595 |
|
554 | 596 | return self._merge(copy_callable, primary_keys, replication_keys, qualify)
|
555 |
| - |
556 |
| - def setup_connection( |
557 |
| - self, |
558 |
| - path: str, |
559 |
| - storage_integration: str, |
560 |
| - cursor: SnowflakeCursor, |
561 |
| - file_format: FileFormat | InlineFileFormat, |
562 |
| - stage: str | None = None, |
563 |
| - ) -> callable: |
564 |
| - """Setup the connection including custom role, database, schema, and temporary stage""" |
565 |
| - _execute_statement = partial(execute_statement, cursor) |
566 |
| - if self.role is not None: |
567 |
| - logging.debug(f"Using role: {self.role}") |
568 |
| - _execute_statement(f"USE ROLE {self.role}") |
569 |
| - if self.database is not None: |
570 |
| - logging.debug(f"Using database: {self.database}") |
571 |
| - _execute_statement(f"USE DATABASE {self.database}") |
572 |
| - |
573 |
| - self.setup_file_format(_execute_statement, file_format) |
574 |
| - self.setup_stage(_execute_statement, storage_integration, path, stage) |
575 |
| - |
576 |
| - _execute_statement(self.get_create_schema_statement()) |
577 |
| - logging.debug(f"Using schema: {self.schema_name}") |
578 |
| - _execute_statement(f"USE SCHEMA {self.schema_name}") |
579 |
| - |
580 |
| - return _execute_statement |
581 |
| - |
582 |
| - def setup_stage( |
583 |
| - self, |
584 |
| - execute_statement: callable, |
585 |
| - storage_integration: str | None = None, |
586 |
| - path: str | None = None, |
587 |
| - stage: str | None = None, |
588 |
| - ) -> None: |
589 |
| - if stage: |
590 |
| - self._stage = f"{stage}/{path}" |
591 |
| - return None |
592 |
| - |
593 |
| - if storage_integration and path: |
594 |
| - execute_statement( |
595 |
| - self.get_create_temporary_external_stage(path, storage_integration) |
596 |
| - ) |
597 |
| - self._stage = self.temporary_stage |
0 commit comments