Description
Context
I've been exploring different ways of getting large amounts of data (100GB+) out of Unity Catalog and into external ray clusters for distributed ml model training and while assessing databricks-sql-python
, noticed the download speeds are significantly slower than using the statement execution API. In the actual external ray cluster, the difference is 10x, however I was able to also replicate this to a lesser extent in a databricks notebook.
Replication
The first two approaches both lead to a ~45MB/s download speed on an i3.4xlarge
Using databricks-sql-python directly
from databricks.sql import connect
with connect(
server_hostname="same-host",
http_path="same-http-path",
access_token="token",
use_cloud_fetch=True,
) as connection:
cursor = connection.cursor()
cursor.execute(
"SELECT * from foo.bar.baz"
)
print(cursor.fetchall_arrow())
Using databricks-sql-python + ray.data.read_sql
reference: https://docs.ray.io/en/latest/data/api/doc/ray.data.read_sql.html#ray.data.read_sql
from databricks.sql import connect
from ray.data import read_sql
import ray
ray.init(num_cpu=16)
def connection_factory():
return connect(
server_hostname="same-host",
http_path="same-http-path",
access_token="token"
use_cloud_fetch=True,
)
ray_dataset = read_sql(
sql="SELECT * from foo.bar.baz",
connection_factory=connection_factory,
override_num_blocks=1,
ray_remote_args={"num_cpus": 16},
)
print(f"Ray dataset count: {ray_dataset.count()}")
However when I use ray.data.read_databricks_tables, I can reach download speeds of ~150MB/s on the same machine.
Using ray.data.read_databricks_tables
import os
import ray
from ray.data import read_databricks_tables
ray.init(num_cpus=16)
os.environ["DATABRICKS_TOKEN"] = "token"
os.environ["DATABRICKS_HOST"] = "same-host"
ray_dataset = read_databricks_tables(
warehouse_id="same-id-in-http-path-above",
catalog="foo",
schema="bar",
query="SELECT * from baz",
)
print(f"Ray dataset size: {ray_dataset.size_bytes()}")
print(f"Ray dataset count: {ray_dataset.count()}")
print(
f"Ray dataset summed: {ray_dataset.sum('some_column')}"
)
Potential Cause
I suspect this is because the statement execution api allows you to make separate parallel requests to retrieve different "chunks" of data vs how the sql connector adopts a cursor based approach where you can only retrieve data sequentially.
Ask
Are there any plans on supporting a similar chunking pattern for databricks-sql-python
and in lieu of that, is there currently any way to reach download speed parity with the statement execution api?
databricks-sql-python
is great because it does not have the 100GB limit of the statement execution api but the slow download speed is a major blocker for use in ml applications requiring the transfer of large data, which to be fair may not the use case that databricks-sql-python
has been designed for.