Skip to content

Introduce preliminary SEA Result Set #588

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 68 commits into from
Jun 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
138c2ae
[squash from exec-sea] bring over execution phase changes
varun-edachali-dbx Jun 9, 2025
3e3ab94
remove excess test
varun-edachali-dbx Jun 9, 2025
4a78165
add docstring
varun-edachali-dbx Jun 9, 2025
0dac4aa
remvoe exec func in sea backend
varun-edachali-dbx Jun 9, 2025
1b794c7
remove excess files
varun-edachali-dbx Jun 9, 2025
da5a6fe
remove excess models
varun-edachali-dbx Jun 9, 2025
686ade4
remove excess sea backend tests
varun-edachali-dbx Jun 9, 2025
31e6c83
cleanup
varun-edachali-dbx Jun 9, 2025
69ea238
re-introduce get_schema_desc
varun-edachali-dbx Jun 9, 2025
66d7517
remove SeaResultSet
varun-edachali-dbx Jun 9, 2025
71feef9
clean imports and attributes
varun-edachali-dbx Jun 9, 2025
ae9862f
pass CommandId to ExecResp
varun-edachali-dbx Jun 9, 2025
d8aa69e
remove changes in types
varun-edachali-dbx Jun 9, 2025
db139bc
add back essential types (ExecResponse, from_sea_state)
varun-edachali-dbx Jun 9, 2025
b977b12
fix fetch types
varun-edachali-dbx Jun 9, 2025
da615c0
excess imports
varun-edachali-dbx Jun 9, 2025
0da04a6
reduce diff by maintaining logs
varun-edachali-dbx Jun 9, 2025
ea9d456
fix int test types
varun-edachali-dbx Jun 9, 2025
8985c62
[squashed from exec-sea] init execution func
varun-edachali-dbx Jun 9, 2025
d9bcdbe
remove irrelevant changes
varun-edachali-dbx Jun 9, 2025
ee9fa1c
remove ResultSetFilter functionality
varun-edachali-dbx Jun 9, 2025
24c6152
remove more irrelevant changes
varun-edachali-dbx Jun 9, 2025
67fd101
remove more irrelevant changes
varun-edachali-dbx Jun 9, 2025
271fcaf
even more irrelevant changes
varun-edachali-dbx Jun 9, 2025
bf26ea3
remove sea response as init option
varun-edachali-dbx Jun 9, 2025
d97463b
move guid_to_hex_id import to utils
varun-edachali-dbx Jun 9, 2025
139e246
reduce diff in guid utils import
varun-edachali-dbx Jun 9, 2025
e3ee4e4
move arrow_schema_bytes back into ExecuteResult
varun-edachali-dbx Jun 9, 2025
f448a8f
maintain log
varun-edachali-dbx Jun 9, 2025
82ca1ee
remove un-necessary assignment
varun-edachali-dbx Jun 9, 2025
e96a078
remove un-necessary tuple response
varun-edachali-dbx Jun 9, 2025
27158b1
remove un-ncessary verbose mocking
varun-edachali-dbx Jun 9, 2025
d3200c4
move Queue construction to ResultSert
varun-edachali-dbx Jun 10, 2025
8a014f0
move description to List[Tuple]
varun-edachali-dbx Jun 10, 2025
39c41ab
frmatting (black)
varun-edachali-dbx Jun 10, 2025
2cd04df
reduce diff (remove explicit tuple conversion)
varun-edachali-dbx Jun 10, 2025
067a019
remove has_more_rows from ExecuteResponse
varun-edachali-dbx Jun 10, 2025
48c83e0
remove un-necessary has_more_rows aclc
varun-edachali-dbx Jun 10, 2025
281a9e9
default has_more_rows to True
varun-edachali-dbx Jun 10, 2025
192901d
return has_more_rows from ExecResponse conversion during GetRespMetadata
varun-edachali-dbx Jun 10, 2025
55f5c45
remove unnecessary replacement
varun-edachali-dbx Jun 10, 2025
edc36b5
better mocked backend naming
varun-edachali-dbx Jun 10, 2025
81280e7
remove has_more_rows test in ExecuteResponse
varun-edachali-dbx Jun 10, 2025
c1d3be2
introduce replacement of original has_more_rows read test
varun-edachali-dbx Jun 10, 2025
5ee4136
call correct method in test_use_arrow_schema
varun-edachali-dbx Jun 10, 2025
b881ab0
call correct method in test_fall_back_to_hive_schema
varun-edachali-dbx Jun 10, 2025
53bf715
re-introduce result response read test
varun-edachali-dbx Jun 10, 2025
45a32be
simplify test
varun-edachali-dbx Jun 10, 2025
e3fe299
remove excess fetch_results mocks
varun-edachali-dbx Jun 10, 2025
e8038d3
more minimal changes to thrift_backend tests
varun-edachali-dbx Jun 10, 2025
2f6ec19
move back to old table types
varun-edachali-dbx Jun 10, 2025
73bc282
remove outdated arrow_schema_bytes return
varun-edachali-dbx Jun 10, 2025
4e07f1e
align SeaResultSet with new structure
varun-edachali-dbx Jun 11, 2025
65e7c6b
correct sea res set tests
varun-edachali-dbx Jun 11, 2025
7c483f2
remove duplicate import
varun-edachali-dbx Jun 11, 2025
8cbeb08
rephrase model docstrings to explicitly denote that they are represen…
varun-edachali-dbx Jun 11, 2025
36b9cfb
has_more_rows -> is_direct_results
varun-edachali-dbx Jun 11, 2025
c04d583
switch docstring format to align with Connection class
varun-edachali-dbx Jun 11, 2025
ba91138
Merge branch 'exec-resp-norm' into sea-res-set
varun-edachali-dbx Jun 11, 2025
ed7079e
has_more_rows -> is_direct_results
varun-edachali-dbx Jun 11, 2025
0384b65
fix type errors with arrow_schema_bytes
varun-edachali-dbx Jun 11, 2025
218e547
spaces after multi line pydocs
varun-edachali-dbx Jun 11, 2025
c7b0701
Merge branch 'sea-migration' into sea-res-set
varun-edachali-dbx Jun 11, 2025
a6788f8
remove duplicate queue init (merge artifact)
varun-edachali-dbx Jun 11, 2025
93468e6
reduce diff (remove newlines)
varun-edachali-dbx Jun 11, 2025
a70a6ce
remove un-necessary changes
varun-edachali-dbx Jun 11, 2025
de181d8
Revert "remove un-necessary changes"
varun-edachali-dbx Jun 11, 2025
8c65345
b"" -> None
varun-edachali-dbx Jun 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 87 additions & 2 deletions src/databricks/sql/result_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ def __init__(
results_queue=None,
description=None,
is_staging_operation: bool = False,
lz4_compressed: bool = False,
arrow_schema_bytes: Optional[bytes] = None,
):
"""
A ResultSet manages the results of a single command.
Expand Down Expand Up @@ -75,6 +77,8 @@ def __init__(
self.is_direct_results = is_direct_results
self.results = results_queue
self._is_staging_operation = is_staging_operation
self.lz4_compressed = lz4_compressed
self._arrow_schema_bytes = arrow_schema_bytes

def __iter__(self):
while True:
Expand Down Expand Up @@ -177,10 +181,10 @@ def __init__(
:param ssl_options: SSL options for cloud fetch
:param is_direct_results: Whether there are more rows to fetch
"""

# Initialize ThriftResultSet-specific attributes
self._arrow_schema_bytes = execute_response.arrow_schema_bytes
self._use_cloud_fetch = use_cloud_fetch
self.lz4_compressed = execute_response.lz4_compressed
self.is_direct_results = is_direct_results

# Build the results queue if t_row_set is provided
results_queue = None
Expand Down Expand Up @@ -211,6 +215,8 @@ def __init__(
results_queue=results_queue,
description=execute_response.description,
is_staging_operation=execute_response.is_staging_operation,
lz4_compressed=execute_response.lz4_compressed,
arrow_schema_bytes=execute_response.arrow_schema_bytes,
)

# Initialize results queue if not provided
Expand Down Expand Up @@ -438,3 +444,82 @@ def map_col_type(type_):
(column.name, map_col_type(column.datatype), None, None, None, None, None)
for column in table_schema_message.columns
]


class SeaResultSet(ResultSet):
"""ResultSet implementation for SEA backend."""

def __init__(
self,
connection: "Connection",
execute_response: "ExecuteResponse",
sea_client: "SeaDatabricksClient",
buffer_size_bytes: int = 104857600,
arraysize: int = 10000,
result_data=None,
manifest=None,
):
"""
Initialize a SeaResultSet with the response from a SEA query execution.

Args:
connection: The parent connection
execute_response: Response from the execute command
sea_client: The SeaDatabricksClient instance for direct access
buffer_size_bytes: Buffer size for fetching results
arraysize: Default number of rows to fetch
result_data: Result data from SEA response (optional)
manifest: Manifest from SEA response (optional)
"""

super().__init__(
connection=connection,
backend=sea_client,
arraysize=arraysize,
buffer_size_bytes=buffer_size_bytes,
command_id=execute_response.command_id,
status=execute_response.status,
has_been_closed_server_side=execute_response.has_been_closed_server_side,
description=execute_response.description,
is_staging_operation=execute_response.is_staging_operation,
lz4_compressed=execute_response.lz4_compressed,
arrow_schema_bytes=execute_response.arrow_schema_bytes,
)

def _fill_results_buffer(self):
"""Fill the results buffer from the backend."""
raise NotImplementedError(
"_fill_results_buffer is not implemented for SEA backend"
)

def fetchone(self) -> Optional[Row]:
"""
Fetch the next row of a query result set, returning a single sequence,
or None when no more data is available.
"""

raise NotImplementedError("fetchone is not implemented for SEA backend")

def fetchmany(self, size: Optional[int] = None) -> List[Row]:
"""
Fetch the next set of rows of a query result, returning a list of rows.

An empty sequence is returned when no more rows are available.
"""

raise NotImplementedError("fetchmany is not implemented for SEA backend")

def fetchall(self) -> List[Row]:
"""
Fetch all (remaining) rows of a query result, returning them as a list of rows.
"""

raise NotImplementedError("fetchall is not implemented for SEA backend")

def fetchmany_arrow(self, size: int) -> Any:
"""Fetch the next set of rows as an Arrow table."""
raise NotImplementedError("fetchmany_arrow is not implemented for SEA backend")

def fetchall_arrow(self) -> Any:
"""Fetch all remaining rows as an Arrow table."""
raise NotImplementedError("fetchall_arrow is not implemented for SEA backend")
201 changes: 201 additions & 0 deletions tests/unit/test_sea_result_set.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
"""
Tests for the SeaResultSet class.

This module contains tests for the SeaResultSet class, which implements
the result set functionality for the SEA (Statement Execution API) backend.
"""

import pytest
from unittest.mock import patch, MagicMock, Mock

from databricks.sql.result_set import SeaResultSet
from databricks.sql.backend.types import CommandId, CommandState, BackendType


class TestSeaResultSet:
"""Test suite for the SeaResultSet class."""

@pytest.fixture
def mock_connection(self):
"""Create a mock connection."""
connection = Mock()
connection.open = True
return connection

@pytest.fixture
def mock_sea_client(self):
"""Create a mock SEA client."""
return Mock()

@pytest.fixture
def execute_response(self):
"""Create a sample execute response."""
mock_response = Mock()
mock_response.command_id = CommandId.from_sea_statement_id("test-statement-123")
mock_response.status = CommandState.SUCCEEDED
mock_response.has_been_closed_server_side = False
mock_response.is_direct_results = False
mock_response.results_queue = None
mock_response.description = [
("test_value", "INT", None, None, None, None, None)
]
mock_response.is_staging_operation = False
return mock_response

def test_init_with_execute_response(
self, mock_connection, mock_sea_client, execute_response
):
"""Test initializing SeaResultSet with an execute response."""
result_set = SeaResultSet(
connection=mock_connection,
execute_response=execute_response,
sea_client=mock_sea_client,
buffer_size_bytes=1000,
arraysize=100,
)

# Verify basic properties
assert result_set.command_id == execute_response.command_id
assert result_set.status == CommandState.SUCCEEDED
assert result_set.connection == mock_connection
assert result_set.backend == mock_sea_client
assert result_set.buffer_size_bytes == 1000
assert result_set.arraysize == 100
assert result_set.description == execute_response.description

def test_close(self, mock_connection, mock_sea_client, execute_response):
"""Test closing a result set."""
result_set = SeaResultSet(
connection=mock_connection,
execute_response=execute_response,
sea_client=mock_sea_client,
buffer_size_bytes=1000,
arraysize=100,
)

# Close the result set
result_set.close()

# Verify the backend's close_command was called
mock_sea_client.close_command.assert_called_once_with(result_set.command_id)
assert result_set.has_been_closed_server_side is True
assert result_set.status == CommandState.CLOSED

def test_close_when_already_closed_server_side(
self, mock_connection, mock_sea_client, execute_response
):
"""Test closing a result set that has already been closed server-side."""
result_set = SeaResultSet(
connection=mock_connection,
execute_response=execute_response,
sea_client=mock_sea_client,
buffer_size_bytes=1000,
arraysize=100,
)
result_set.has_been_closed_server_side = True

# Close the result set
result_set.close()

# Verify the backend's close_command was NOT called
mock_sea_client.close_command.assert_not_called()
assert result_set.has_been_closed_server_side is True
assert result_set.status == CommandState.CLOSED

def test_close_when_connection_closed(
self, mock_connection, mock_sea_client, execute_response
):
"""Test closing a result set when the connection is closed."""
mock_connection.open = False
result_set = SeaResultSet(
connection=mock_connection,
execute_response=execute_response,
sea_client=mock_sea_client,
buffer_size_bytes=1000,
arraysize=100,
)

# Close the result set
result_set.close()

# Verify the backend's close_command was NOT called
mock_sea_client.close_command.assert_not_called()
assert result_set.has_been_closed_server_side is True
assert result_set.status == CommandState.CLOSED

def test_unimplemented_methods(
self, mock_connection, mock_sea_client, execute_response
):
"""Test that unimplemented methods raise NotImplementedError."""
result_set = SeaResultSet(
connection=mock_connection,
execute_response=execute_response,
sea_client=mock_sea_client,
buffer_size_bytes=1000,
arraysize=100,
)

# Test each unimplemented method individually with specific error messages
with pytest.raises(
NotImplementedError, match="fetchone is not implemented for SEA backend"
):
result_set.fetchone()

with pytest.raises(
NotImplementedError, match="fetchmany is not implemented for SEA backend"
):
result_set.fetchmany(10)

with pytest.raises(
NotImplementedError, match="fetchmany is not implemented for SEA backend"
):
# Test with default parameter value
result_set.fetchmany()

with pytest.raises(
NotImplementedError, match="fetchall is not implemented for SEA backend"
):
result_set.fetchall()

with pytest.raises(
NotImplementedError,
match="fetchmany_arrow is not implemented for SEA backend",
):
result_set.fetchmany_arrow(10)

with pytest.raises(
NotImplementedError,
match="fetchall_arrow is not implemented for SEA backend",
):
result_set.fetchall_arrow()

with pytest.raises(
NotImplementedError, match="fetchone is not implemented for SEA backend"
):
# Test iteration protocol (calls fetchone internally)
next(iter(result_set))

with pytest.raises(
NotImplementedError, match="fetchone is not implemented for SEA backend"
):
# Test using the result set in a for loop
for row in result_set:
pass

def test_fill_results_buffer_not_implemented(
self, mock_connection, mock_sea_client, execute_response
):
"""Test that _fill_results_buffer raises NotImplementedError."""
result_set = SeaResultSet(
connection=mock_connection,
execute_response=execute_response,
sea_client=mock_sea_client,
buffer_size_bytes=1000,
arraysize=100,
)

with pytest.raises(
NotImplementedError,
match="_fill_results_buffer is not implemented for SEA backend",
):
result_set._fill_results_buffer()
Loading