Skip to content

Extra method for potential dask dataframe support #420

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 178 additions & 1 deletion civis/io/_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
CHUNK_SIZE = 32 * 1024
log = logging.getLogger(__name__)
__all__ = ['read_civis', 'read_civis_sql', 'civis_to_csv',
'civis_to_multifile_csv', 'dataframe_to_civis', 'csv_to_civis',
'civis_to_multifile_csv', 'dataframe_to_civis',
'dask_dataframe_to_civis', 'csv_to_civis',
'civis_file_to_table', 'split_schema_tablename',
'export_to_civis_file']

Expand Down Expand Up @@ -766,6 +767,182 @@ def dataframe_to_civis(df, database, table, api_key=None, client=None,
return fut


@deprecate_param('v2.0.0', 'api_key', 'headers')
def dask_dataframe_to_civis(df, database, table, api_key=None, client=None,
max_errors=None, existing_table_rows="fail",
diststyle=None, distkey=None,
sortkey1=None, sortkey2=None,
table_columns=None,
headers=None, credential_id=None,
primary_keys=None, last_modified_keys=None,
execution="immediate",
delimiter=None, polling_interval=None,
archive=False, hidden=True, **kwargs):
"""Upload a `dask` `DataFrame` into a Civis table.

The `DataFrame`'s index will not be included. To store the index
along with the other values, use `df.reset_index()` instead
of `df` as the first argument to this function.

Parameters
----------
df : :class:`dask.dataframe:dask.dataframe.DataFrame`
The `DataFrame` to upload to Civis.
database : str or int
Upload data into this database. Can be the database name or ID.
table : str
The schema and table you want to upload to. E.g.,
``'scratch.table'``. Schemas or tablenames with periods must
be double quoted, e.g. ``'scratch."my.table"'``.
api_key : DEPRECATED str, optional
Your Civis API key. If not given, the :envvar:`CIVIS_API_KEY`
environment variable will be used.
client : :class:`civis.APIClient`, optional
If not provided, an :class:`civis.APIClient` object will be
created from the :envvar:`CIVIS_API_KEY`.
max_errors : int, optional
The maximum number of rows with errors to remove from the import
before failing.
existing_table_rows : str, optional
The behaviour if a table with the requested name already exists.
One of ``'fail'``, ``'truncate'``, ``'append'``, ``'drop'``, or
``'upsert'``. Defaults to ``'fail'``.
diststyle : str, optional
The distribution style for the table.
One of ``'even'``, ``'all'`` or ``'key'``.
distkey : str, optional
The column to use as the distkey for the table.
sortkey1 : str, optional
The column to use as the sortkey for the table.
sortkey2 : str, optional
The second column in a compound sortkey for the table.
table_columns : list[Dict[str, str]], optional
An array of hashes corresponding to the columns in the order
they appear in the source file. Each hash should have keys for
database column "name" and "sql_type". This parameter is
required if the table does not exist, the table is being dropped,
or the columns in the source file do not appear in the same order
as in the destination table. The "sql_type" key is not required
when appending to an existing table.
headers : bool, optional [DEPRECATED]
Whether or not the first row of the file should be treated as
headers. The default, ``None``, attempts to autodetect whether
or not the first row contains headers.

This parameter has no effect in versions >= 1.11 and will be
removed in v2.0. Tables will always be written with column
names read from the DataFrame. Use the `header` parameter
(which will be passed directly to :func:`~pandas.DataFrame.to_csv`)
to modify the column names in the Civis Table.
credential_id : str or int, optional
The ID of the database credential. If ``None``, the default
credential will be used.
primary_keys: list[str], optional
A list of the primary key column(s) of the destination table that
uniquely identify a record. These columns must not contain null values.
If existing_table_rows is "upsert", this
field is required. Note that this is true regardless of whether the
destination database itself requires a primary key.
last_modified_keys: list[str], optional
A list of the columns indicating a record has been updated. If
existing_table_rows is "upsert", this field is required.
escaped: bool, optional
A boolean value indicating whether or not the source file has quotes
escaped with a backslash. Defaults to false.
execution: string, optional, default "immediate"
One of "delayed" or "immediate". If "immediate", refresh column
statistics as part of the run. If "delayed", flag the table for a
deferred statistics update; column statistics may not be available
for up to 24 hours. In addition, if existing_table_rows is "upsert",
delayed executions move data from staging table to final table after a
brief delay, in order to accommodate multiple concurrent imports to the
same destination table.
polling_interval : int or float, optional
Number of seconds to wait between checks for job completion.
archive : bool, optional (deprecated)
If ``True``, archive the import job as soon as it completes.
hidden : bool, optional
If ``True`` (the default), this job will not appear in the Civis UI.
**kwargs : kwargs
Extra keyword arguments will be passed to
:meth:`dask.dataframe:dask.dataframe.DataFrame.to_csv`.

Returns
-------
futs : :class:`~civis.futures.CivisFuture`
A list of `CivisFuture` objects.

Examples
--------
>>> import dask.dataframe as dd
>>> import pandas as pd
>>> df = pd.DataFrame({'a': [1, 2, 3], 'b': [4, 5, 6]})
>>> ddf = dd.from_pandas(df, npartitions=20)
>>> futs = civis.io.dask_dataframe_to_civis(ddf, 'my-database',
... 'scratch.df_table')
>>> [fut.result() for fut in futs]

See Also
--------
:func:`~dask.dataframe.DataFrame.to_csv`
"""
if client is None:
client = APIClient(api_key=api_key)
if archive:
warnings.warn("`archive` is deprecated and will be removed in v2.0.0. "
"Use `hidden` instead.", FutureWarning)

headers = False if kwargs.get('header') is False else True
with TemporaryDirectory() as tmp_dir:
tmp_path = os.path.join(tmp_dir, 'dataframe_to_civis*.csv')
to_csv_kwargs = {'encoding': 'utf-8', 'index': False}
to_csv_kwargs.update(kwargs)
df.to_csv(tmp_path, **to_csv_kwargs)
_, name = split_schema_tablename(table)
file_paths = os.listdir(tmp_dir)
file_ids = [
file_to_civis(
os.path.join(tmp_dir, file_path),
name,
client=client) for file_path in file_paths]

delimiter = ','
futs = []
if existing_table_rows in ['truncate', 'drop', 'fail']:
first_file_id = file_ids.pop(0)
futs.append(civis_file_to_table(
first_file_id, database, table,
client=client, max_errors=max_errors,
existing_table_rows=existing_table_rows,
diststyle=diststyle, distkey=distkey,
sortkey1=sortkey1, sortkey2=sortkey2,
table_columns=table_columns,
delimiter=delimiter, headers=headers,
credential_id=credential_id,
primary_keys=primary_keys,
last_modified_keys=last_modified_keys,
escaped=False, execution=execution,
polling_interval=polling_interval,
hidden=hidden))
existing_table_rows = 'append'

futs.extend([civis_file_to_table(
file_id, database, table,
client=client, max_errors=max_errors,
existing_table_rows=existing_table_rows,
diststyle=diststyle, distkey=distkey,
sortkey1=sortkey1, sortkey2=sortkey2,
table_columns=table_columns,
delimiter=delimiter, headers=headers,
credential_id=credential_id,
primary_keys=primary_keys,
last_modified_keys=last_modified_keys,
escaped=False, execution=execution,
polling_interval=polling_interval,
hidden=hidden) for file_id in file_ids])
return futs


@deprecate_param('v2.0.0', 'api_key')
def csv_to_civis(filename, database, table, api_key=None, client=None,
max_errors=None, existing_table_rows="fail",
Expand Down