diff --git a/.circleci/config.yml b/.circleci/config.yml index 9d0634106b..287a1a4bab 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -38,7 +38,7 @@ defaults: - &wait_for_flowdb name: Wait for flowdb to start command: | - dockerize -wait tcp://localhost:5432 -timeout 10m + dockerize -wait tcp://localhost:5432 -timeout 20m - &run_always_org_context context: org-global filters: @@ -818,6 +818,7 @@ jobs: name: python_with_flowdb flowdb_image: "testdata" python_version: "3.8.5" + num_days: 7 # To avoid overriding fixed number of days working_directory: /home/circleci/project/integration_tests steps: - checkout: diff --git a/flowdb/testdata/bin/9910_migrate_test_data.sh b/flowdb/testdata/bin/9910_migrate_test_data.sh new file mode 100644 index 0000000000..86fe269277 --- /dev/null +++ b/flowdb/testdata/bin/9910_migrate_test_data.sh @@ -0,0 +1,15 @@ +#!/bin/sh +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + + + +set -e +export PGUSER="$POSTGRES_USER" + +# +# Migrate synthetic data. +# + +pipenv run python /docker-entrypoint-initdb.d/migrate_synth_data.py diff --git a/flowdb/testdata/bin/9910_migrate_test_data.sql b/flowdb/testdata/bin/9910_migrate_test_data.sql deleted file mode 100644 index 985b88eeb1..0000000000 --- a/flowdb/testdata/bin/9910_migrate_test_data.sql +++ /dev/null @@ -1,389 +0,0 @@ -/* -This Source Code Form is subject to the terms of the Mozilla Public -License, v. 2.0. If a copy of the MPL was not distributed with this -file, You can obtain one at http://mozilla.org/MPL/2.0/. -*/ - -BEGIN; -/* Populate subscribers */ - -INSERT INTO interactions.subscriber (msisdn, imei, imsi, tac) - SELECT msisdn, imei, imsi, tac FROM events.calls group by msisdn, imei, imsi, tac - UNION - SELECT msisdn, imei, imsi, tac FROM events.sms group by msisdn, imei, imsi, tac - UNION - SELECT msisdn, imei, imsi, tac FROM events.mds group by msisdn, imei, imsi, tac - UNION - SELECT msisdn, imei, imsi, tac FROM events.topups group by msisdn, imei, imsi, tac; - -/* Populate locations */ - -INSERT INTO interactions.locations (site_id, cell_id, position) - SELECT sites.site_id as site_id, cells.cell_id AS cell_id, cells.geom_point as position FROM - infrastructure.cells LEFT JOIN - infrastructure.sites ON - cells.site_id=sites.id AND cells.version=sites.version; - -/* Create a view mapping location ids to cell ids */ - -CREATE VIEW cell_id_mapping AS ( - SELECT * FROM - interactions.locations - LEFT JOIN ( - SELECT cell_id, id as mno_cell_id, daterange(date_of_first_service, date_of_last_service, '[]') as valid_period FROM - infrastructure.cells) c - USING (cell_id) -); - -/* Create partitions on the events tables */ - -CREATE TABLE interactions.events_supertable_20160101 PARTITION OF interactions.event_supertable - FOR VALUES FROM (20160101) TO (20160102); - -CREATE TABLE interactions.events_supertable_20160102 PARTITION OF interactions.event_supertable - FOR VALUES FROM (20160102) TO (20160103); - -CREATE TABLE interactions.events_supertable_20160103 PARTITION OF interactions.event_supertable - FOR VALUES FROM (20160103) TO (20160104); - -CREATE TABLE interactions.events_supertable_20160104 PARTITION OF interactions.event_supertable - FOR VALUES FROM (20160104) TO (20160105); - -CREATE TABLE interactions.events_supertable_20160105 PARTITION OF interactions.event_supertable - FOR VALUES FROM (20160105) TO (20160106); - -CREATE TABLE interactions.events_supertable_20160106 PARTITION OF interactions.event_supertable - FOR VALUES FROM (20160106) TO (20160107); - -CREATE TABLE interactions.events_supertable_20160107 PARTITION OF interactions.event_supertable - FOR VALUES FROM (20160107) TO (20160108); - -/* Calls */ - -CREATE TABLE interactions.calls_20160101 PARTITION OF interactions.calls - FOR VALUES FROM (20160101) TO (20160102); - -CREATE TABLE interactions.calls_20160102 PARTITION OF interactions.calls - FOR VALUES FROM (20160102) TO (20160103); - -CREATE TABLE interactions.calls_20160103 PARTITION OF interactions.calls - FOR VALUES FROM (20160103) TO (20160104); - -CREATE TABLE interactions.calls_20160104 PARTITION OF interactions.calls - FOR VALUES FROM (20160104) TO (20160105); - -CREATE TABLE interactions.calls_20160105 PARTITION OF interactions.calls - FOR VALUES FROM (20160105) TO (20160106); - -CREATE TABLE interactions.calls_20160106 PARTITION OF interactions.calls - FOR VALUES FROM (20160106) TO (20160107); - -CREATE TABLE interactions.calls_20160107 PARTITION OF interactions.calls - FOR VALUES FROM (20160107) TO (20160108); - -/* sms */ - -CREATE TABLE interactions.sms_20160101 PARTITION OF interactions.sms - FOR VALUES FROM (20160101) TO (20160102); - -CREATE TABLE interactions.sms_20160102 PARTITION OF interactions.sms - FOR VALUES FROM (20160102) TO (20160103); - -CREATE TABLE interactions.sms_20160103 PARTITION OF interactions.sms - FOR VALUES FROM (20160103) TO (20160104); - -CREATE TABLE interactions.sms_20160104 PARTITION OF interactions.sms - FOR VALUES FROM (20160104) TO (20160105); - -CREATE TABLE interactions.sms_20160105 PARTITION OF interactions.sms - FOR VALUES FROM (20160105) TO (20160106); - -CREATE TABLE interactions.sms_20160106 PARTITION OF interactions.sms - FOR VALUES FROM (20160106) TO (20160107); - -CREATE TABLE interactions.sms_20160107 PARTITION OF interactions.sms - FOR VALUES FROM (20160107) TO (20160108); - -/* mds */ - -CREATE TABLE interactions.mds_20160101 PARTITION OF interactions.mds - FOR VALUES FROM (20160101) TO (20160102); - -CREATE TABLE interactions.mds_20160102 PARTITION OF interactions.mds - FOR VALUES FROM (20160102) TO (20160103); - -CREATE TABLE interactions.mds_20160103 PARTITION OF interactions.mds - FOR VALUES FROM (20160103) TO (20160104); - -CREATE TABLE interactions.mds_20160104 PARTITION OF interactions.mds - FOR VALUES FROM (20160104) TO (20160105); - -CREATE TABLE interactions.mds_20160105 PARTITION OF interactions.mds - FOR VALUES FROM (20160105) TO (20160106); - -CREATE TABLE interactions.mds_20160106 PARTITION OF interactions.mds - FOR VALUES FROM (20160106) TO (20160107); - -CREATE TABLE interactions.mds_20160107 PARTITION OF interactions.mds - FOR VALUES FROM (20160107) TO (20160108); - -/* topup */ - -CREATE TABLE interactions.topup_20160101 PARTITION OF interactions.topup - FOR VALUES FROM (20160101) TO (20160102); - -CREATE TABLE interactions.topup_20160102 PARTITION OF interactions.topup - FOR VALUES FROM (20160102) TO (20160103); - -CREATE TABLE interactions.topup_20160103 PARTITION OF interactions.topup - FOR VALUES FROM (20160103) TO (20160104); - -CREATE TABLE interactions.topup_20160104 PARTITION OF interactions.topup - FOR VALUES FROM (20160104) TO (20160105); - -CREATE TABLE interactions.topup_20160105 PARTITION OF interactions.topup - FOR VALUES FROM (20160105) TO (20160106); - -CREATE TABLE interactions.topup_20160106 PARTITION OF interactions.topup - FOR VALUES FROM (20160106) TO (20160107); - -CREATE TABLE interactions.topup_20160107 PARTITION OF interactions.topup - FOR VALUES FROM (20160107) TO (20160108); - -/* Populate calls */ - -WITH event_data AS (SELECT - caller_ident.subscriber_id, - caller_loc.location_id, - time_dim_id, - date_dim_id, - callee_ident.subscriber_id as called_subscriber_id, - callee_loc.location_id as called_party_location_id, - calling_party_msisdn, - called_party_msisdn, - duration, - event_timestamp - FROM - (SELECT id, duration as duration, datetime as event_timestamp, location_id as caller_location_id, - msisdn as calling_party_msisdn, tac as caller_tac FROM events.calls - WHERE outgoing) callers - LEFT JOIN (SELECT id, location_id as callee_location_id, - msisdn as called_party_msisdn, tac as callee_tac FROM events.calls - WHERE not outgoing) called - USING (id) - LEFT JOIN - interactions.subscriber AS caller_ident - ON caller_ident.msisdn=calling_party_msisdn AND caller_ident.tac=caller_tac - LEFT JOIN - interactions.subscriber AS callee_ident - ON callee_ident.msisdn=called_party_msisdn AND callee_ident.tac=callee_tac - LEFT JOIN - cell_id_mapping AS caller_loc - ON caller_location_id=caller_loc.mno_cell_id AND caller_loc.valid_period @> event_timestamp::date - LEFT JOIN - cell_id_mapping AS callee_loc - ON callee_location_id=callee_loc.mno_cell_id AND callee_loc.valid_period @> event_timestamp::date - LEFT JOIN - d_date ON event_timestamp::date = date_actual - LEFT JOIN - d_time ON - EXTRACT(HOUR from event_timestamp) = hour_of_day), - call_data AS - - (INSERT INTO interactions.event_supertable (subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp, event_type_id) - SELECT subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp, (SELECT event_type_id FROM interactions.d_event_type WHERE name='calls') - FROM event_data - RETURNING *) - -INSERT INTO interactions.calls (event_id, date_dim_id, called_subscriber_id, called_party_location_id, calling_party_msisdn, called_party_msisdn, duration) - SELECT event_id, date_dim_id, called_subscriber_id, called_party_location_id, calling_party_msisdn, called_party_msisdn, duration FROM call_data NATURAL JOIN event_data; - -/* Populate sms */ - -WITH event_data AS (SELECT caller_ident.subscriber_id, - caller_loc.location_id, - time_dim_id as time_dim_id, - date_dim_id as date_dim_id, - callee_ident.subscriber_id as called_subscriber_id, - callee_loc.location_id as called_party_location_id, - calling_party_msisdn, - called_party_msisdn, - event_timestamp - FROM - (SELECT id, datetime as event_timestamp, location_id as caller_location_id, - msisdn as calling_party_msisdn, tac as caller_tac FROM events.sms - WHERE outgoing) callers - LEFT JOIN (SELECT id, location_id as callee_location_id, - msisdn as called_party_msisdn, tac as callee_tac FROM events.sms - WHERE not outgoing) called - USING (id) - LEFT JOIN - interactions.subscriber AS caller_ident - ON caller_ident.msisdn=calling_party_msisdn AND caller_ident.tac=caller_tac - LEFT JOIN - interactions.subscriber AS callee_ident - ON callee_ident.msisdn=called_party_msisdn AND callee_ident.tac=callee_tac - LEFT JOIN - cell_id_mapping AS caller_loc - ON caller_location_id=caller_loc.mno_cell_id AND caller_loc.valid_period @> event_timestamp::date - LEFT JOIN - cell_id_mapping AS callee_loc - ON callee_location_id=callee_loc.mno_cell_id AND callee_loc.valid_period @> event_timestamp::date - LEFT JOIN - d_date ON event_timestamp::date = date_actual - LEFT JOIN - d_time ON - EXTRACT(HOUR from event_timestamp) = hour_of_day), - sms_data AS - (INSERT INTO interactions.event_supertable (subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp, event_type_id) - SELECT subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp, (SELECT event_type_id FROM interactions.d_event_type WHERE name='sms') - FROM event_data - RETURNING *) - -INSERT INTO interactions.sms (event_id, date_dim_id, called_subscriber_id, called_party_location_id, calling_party_msisdn, called_party_msisdn) - SELECT event_id, date_dim_id, called_subscriber_id, called_party_location_id, calling_party_msisdn, called_party_msisdn FROM sms_data NATURAL JOIN event_data; - -/* Populate topup */ - - -WITH event_data AS (SELECT caller_ident.subscriber_id, - caller_loc.location_id, - time_dim_id, - date_dim_id, - recharge_amount, - airtime_fee, - tax_and_fee, - pre_event_balance, - post_event_balance, - calling_party_msisdn, - caller_tac, - event_timestamp - FROM - (SELECT datetime as event_timestamp, location_id as caller_location_id, - msisdn as calling_party_msisdn, tac as caller_tac, recharge_amount, - airtime_fee, tax_and_fee, pre_event_balance, post_event_balance - FROM events.topups) topup - LEFT JOIN - interactions.subscriber AS caller_ident - ON caller_ident.msisdn=calling_party_msisdn AND caller_ident.tac=caller_tac - LEFT JOIN - cell_id_mapping AS caller_loc - ON caller_location_id=caller_loc.mno_cell_id AND caller_loc.valid_period @> event_timestamp::date - LEFT JOIN - d_date ON event_timestamp::date = date_actual - LEFT JOIN - d_time ON - EXTRACT(HOUR from event_timestamp) = hour_of_day), - topup_data AS - (INSERT INTO interactions.event_supertable (subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp, event_type_id) - SELECT subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp, (SELECT event_type_id FROM interactions.d_event_type WHERE name='topup') - FROM event_data - RETURNING *) - -INSERT INTO interactions.topup (event_id, date_dim_id, recharge_amount, airtime_fee, tax_and_fee, pre_event_balance, post_event_balance) - SELECT event_id, date_dim_id, recharge_amount, airtime_fee, tax_and_fee, pre_event_balance, post_event_balance FROM topup_data NATURAL JOIN event_data; - -/* Populate mds */ - - -WITH event_data AS (SELECT caller_ident.subscriber_id, - caller_loc.location_id, - time_dim_id, - date_dim_id, - volume_total as data_volume_total, - volume_upload as data_volume_up, - volume_download as data_volume_down, - duration, - event_timestamp - FROM - (SELECT datetime as event_timestamp, location_id as caller_location_id, - msisdn as calling_party_msisdn, tac as caller_tac, volume_total, volume_upload, volume_download, - duration - FROM events.mds) mds - LEFT JOIN - interactions.subscriber AS caller_ident - ON caller_ident.msisdn=calling_party_msisdn AND caller_ident.tac=caller_tac - LEFT JOIN - cell_id_mapping AS caller_loc - ON caller_location_id=caller_loc.mno_cell_id AND caller_loc.valid_period @> event_timestamp::date - LEFT JOIN - d_date ON event_timestamp::date = date_actual - LEFT JOIN - d_time ON - EXTRACT(HOUR from event_timestamp) = hour_of_day), - mds_data AS - (INSERT INTO interactions.event_supertable (subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp, event_type_id) - SELECT subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp, (SELECT event_type_id FROM interactions.d_event_type WHERE name='mds') - FROM event_data - RETURNING *) - -INSERT INTO interactions.mds (event_id, date_dim_id, data_volume_total, data_volume_up, - data_volume_down, - duration) - SELECT event_id, date_dim_id, data_volume_total, data_volume_up, - data_volume_down, - duration FROM mds_data NATURAL JOIN event_data; - -/* Populate geoms from the existing admin units */ - -INSERT INTO geography.geoms (short_name, long_name, geo_kind_id, spatial_resolution, geom) - SELECT admin3pcod as short_name, admin3name as long_name, 1 as geo_kind_id, 3 as spatial_resolution, geom - FROM geography.admin3; - -INSERT INTO geography.geoms (short_name, long_name, geo_kind_id, spatial_resolution, geom) - SELECT admin2pcod as short_name, admin2name as long_name, 1 as geo_kind_id, 2 as spatial_resolution, geom - FROM geography.admin2; - -INSERT INTO geography.geoms (short_name, long_name, geo_kind_id, spatial_resolution, geom) - SELECT admin1pcod as short_name, admin1name as long_name, 1 as geo_kind_id, 1 as spatial_resolution, geom - FROM geography.admin1; - -INSERT INTO geography.geoms (short_name, long_name, geo_kind_id, spatial_resolution, geom) - SELECT admin0pcod as short_name, admin0name as long_name, 1 as geo_kind_id, 0 as spatial_resolution, geom - FROM geography.admin0; - -INSERT INTO geography.geoms (short_name, long_name, geo_kind_id, spatial_resolution, geom) - SELECT district_c as short_name, district_n as long_name, 1 as geo_kind_id, 2 as spatial_resolution, geom - FROM public.gambia_admin2; - -/* Populate the geobridge */ - -INSERT INTO geography.geo_bridge (location_id, gid, valid_from, valid_to, linkage_method_id) - SELECT locations.location_id, geoms.gid, '-Infinity'::date as valid_from, 'Infinity'::date as valid_to, 1 as linkage_method_id from interactions.locations LEFT JOIN geography.geoms ON ST_Intersects(position, geom); - - -/* Populate subscriber sightings */ - -CREATE TABLE interactions.subscriber_sightings_20160101 PARTITION OF interactions.subscriber_sightings - FOR VALUES FROM (20160101) TO (20160102); - -CREATE TABLE interactions.subscriber_sightings_20160102 PARTITION OF interactions.subscriber_sightings - FOR VALUES FROM (20160102) TO (20160103); - -CREATE TABLE interactions.subscriber_sightings_20160103 PARTITION OF interactions.subscriber_sightings - FOR VALUES FROM (20160103) TO (20160104); - -CREATE TABLE interactions.subscriber_sightings_20160104 PARTITION OF interactions.subscriber_sightings - FOR VALUES FROM (20160104) TO (20160105); - -CREATE TABLE interactions.subscriber_sightings_20160105 PARTITION OF interactions.subscriber_sightings - FOR VALUES FROM (20160105) TO (20160106); - -CREATE TABLE interactions.subscriber_sightings_20160106 PARTITION OF interactions.subscriber_sightings - FOR VALUES FROM (20160106) TO (20160107); - -CREATE TABLE interactions.subscriber_sightings_20160107 PARTITION OF interactions.subscriber_sightings - FOR VALUES FROM (20160107) TO (20160108); - -INSERT INTO interactions.subscriber_sightings (event_id, subscriber_id, location_id, time_dim_id, date_dim_id, sighting_timestamp) - SELECT event_id, subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp FROM interactions.event_supertable; - -INSERT INTO interactions.subscriber_sightings (event_id, subscriber_id, location_id, time_dim_id, date_dim_id, sighting_timestamp) - SELECT event_id, called_subscriber_id as subscriber_id, called_party_location_id as location_id, time_dim_id, date_dim_id, event_timestamp - FROM interactions.event_supertable NATURAL JOIN interactions.calls; - -INSERT INTO interactions.subscriber_sightings (event_id, subscriber_id, location_id, time_dim_id, date_dim_id, sighting_timestamp) - SELECT event_id, called_subscriber_id as subscriber_id, called_party_location_id as location_id, time_dim_id, date_dim_id, event_timestamp - FROM interactions.event_supertable NATURAL JOIN interactions.sms; - -COMMIT; \ No newline at end of file diff --git a/flowdb/testdata/bin/9910_run_synthetic_dfs_data_generation_script.sh b/flowdb/testdata/bin/9910_run_synthetic_dfs_data_generation_script.sh deleted file mode 100755 index 85402d9f89..0000000000 --- a/flowdb/testdata/bin/9910_run_synthetic_dfs_data_generation_script.sh +++ /dev/null @@ -1,23 +0,0 @@ -#!/bin/sh -# This Source Code Form is subject to the terms of the Mozilla Public -# License, v. 2.0. If a copy of the MPL was not distributed with this -# file, You can obtain one at http://mozilla.org/MPL/2.0/. - - -set -e -export PGUSER="$POSTGRES_USER" - -# -# Generate synthetic DFS data. -# -# Note that the only purpose of this script is to -# call the Python script which does the actual data -# data generation, but we need this shell script as -# a wrapper because the PostgreSQL entrypoint script -# does not pick up .py files on its own. -# - -export DIR=/docker-entrypoint-initdb.d/py/testdata/ - -echo "Running Python script to generate synthetic DFS data." -pipenv run python ${DIR}/zz_generate_synthetic_dfs_data.py diff --git a/flowdb/testdata/bin/migrate_synth_data.py b/flowdb/testdata/bin/migrate_synth_data.py new file mode 100644 index 0000000000..0f25b6186d --- /dev/null +++ b/flowdb/testdata/bin/migrate_synth_data.py @@ -0,0 +1,405 @@ +import sys +from pathlib import Path + +import os +import argparse +import datetime +from concurrent.futures.thread import ThreadPoolExecutor +from contextlib import contextmanager +from multiprocessing import cpu_count + +import sqlalchemy as sqlalchemy +from sqlalchemy.exc import ResourceClosedError +import json + +try: + import structlog + + structlog.configure( + processors=[ + structlog.stdlib.PositionalArgumentsFormatter(), + structlog.processors.TimeStamper(fmt="iso"), + structlog.processors.StackInfoRenderer(), + structlog.processors.format_exc_info, + structlog.processors.JSONRenderer(serializer=json.dumps), + ] + ) + logger = structlog.get_logger(__name__) +except ImportError: + import logging + + logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) + logger = logging.getLogger(__name__) + logger.setLevel("DEBUG") + + +def log(msg, **kwargs): + try: + logger.info(msg, **kwargs) + except: + logger.info(f"{msg}: {kwargs}") + + +@contextmanager +def log_duration(job: str, **kwargs): + """ + Small context handler that logs the duration of the with block. + + Parameters + ---------- + job: str + Description of what is being run, will be shown under the "job" key in log + kwargs: dict + Any kwargs will be shown in the log as "key":"value" + """ + start_time = datetime.datetime.now() + log("Started", job=job, **kwargs) + yield + log( + "Finished", + job=job, + runtime=str(datetime.datetime.now() - start_time), + **kwargs, + ) + + +parser = argparse.ArgumentParser(description="Flowminder Synthetic CDR Migrator\n") +parser.add_argument( + "--n-days", + type=int, + default=os.getenv("N_DAYS", 7), + help="Number of days of data to migrate.", +) + +if __name__ == "__main__": + args = parser.parse_args() + with log_duration("Migrating synthetic data..", **vars(args)): + num_days = args.n_days + engine = sqlalchemy.create_engine( + f"postgresql://{os.getenv('POSTGRES_USER')}@/{os.getenv('POSTGRES_DB')}", + echo=False, + pool_size=min(cpu_count(), int(os.getenv("MAX_CPUS", cpu_count()))), + pool_timeout=None, + ) + log( + "Connected.", + num_connections=min(cpu_count(), int(os.getenv("MAX_CPUS", cpu_count()))), + ) + + def do_exec(args): + sql, msg = args + with log_duration(msg): + with engine.begin() as trans: + res = trans.execute(sql) + try: + log(f"SQL result", job=msg, result=res.fetchall()) + except ResourceClosedError: + pass # Nothing to do here + except Exception as exc: + log("Hit an issue.", exc=exc) + raise exc + + start_time = datetime.datetime.now() + + for date in ( + datetime.date(2016, 1, 1) + datetime.timedelta(days=i) + for i in range(num_days) + ): + with log_duration("Migrating day.", day=date): + partition_period = f"FROM ({date.strftime('%Y%m%d')}) TO ({(date + datetime.timedelta(days=1)).strftime('%Y%m%d')})" + with log_duration("Creating partitions.", day=date): + with engine.begin() as trans: + trans.execute( + f"CREATE TABLE interactions.events_supertable_{date.strftime('%Y%m%d')} PARTITION OF interactions.event_supertable FOR VALUES {partition_period};" + ) + trans.execute( + f"CREATE TABLE interactions.subscriber_sightings_{date.strftime('%Y%m%d')} PARTITION OF interactions.subscriber_sightings FOR VALUES {partition_period};" + ) + for event_type in ("calls", "sms", "mds", "topup"): + trans.execute( + f"CREATE TABLE interactions.{event_type}_{date.strftime('%Y%m%d')} PARTITION OF interactions.{event_type} FOR VALUES {partition_period};" + ) + with log_duration("Migrate subscribers."): + with engine.begin() as trans: + trans.execute( + """ + INSERT INTO interactions.subscriber (msisdn, imei, imsi, tac) + SELECT msisdn, imei, imsi, tac FROM events.calls group by msisdn, imei, imsi, tac + UNION + SELECT msisdn, imei, imsi, tac FROM events.sms group by msisdn, imei, imsi, tac + UNION + SELECT msisdn, imei, imsi, tac FROM events.mds group by msisdn, imei, imsi, tac + UNION + SELECT msisdn, imei, imsi, tac FROM events.topups group by msisdn, imei, imsi, tac; + """ + ) + trans.execute( + """ + INSERT INTO interactions.locations (site_id, cell_id, position) + SELECT sites.site_id as site_id, cells.cell_id AS cell_id, cells.geom_point as position FROM + infrastructure.cells LEFT JOIN + infrastructure.sites ON + cells.site_id=sites.id AND cells.version=sites.version; + """ + ) + trans.execute( + """ + CREATE VIEW cell_id_mapping AS ( + SELECT * FROM + interactions.locations + LEFT JOIN ( + SELECT cell_id, id as mno_cell_id, daterange(date_of_first_service, date_of_last_service, '[]') as valid_period FROM + infrastructure.cells) c + USING (cell_id) + ); + """ + ) + + with log_duration("Migrate geography"): + with engine.begin() as trans: + trans.execute( + """ + INSERT INTO geography.geoms (short_name, long_name, geo_kind_id, spatial_resolution, geom) + SELECT admin3pcod as short_name, admin3name as long_name, 1 as geo_kind_id, 3 as spatial_resolution, geom + FROM geography.admin3; + + INSERT INTO geography.geoms (short_name, long_name, geo_kind_id, spatial_resolution, geom) + SELECT admin2pcod as short_name, admin2name as long_name, 1 as geo_kind_id, 2 as spatial_resolution, geom + FROM geography.admin2; + + INSERT INTO geography.geoms (short_name, long_name, geo_kind_id, spatial_resolution, geom) + SELECT admin1pcod as short_name, admin1name as long_name, 1 as geo_kind_id, 1 as spatial_resolution, geom + FROM geography.admin1; + + INSERT INTO geography.geoms (short_name, long_name, geo_kind_id, spatial_resolution, geom) + SELECT admin0pcod as short_name, admin0name as long_name, 1 as geo_kind_id, 0 as spatial_resolution, geom + FROM geography.admin0;""" + ) + with engine.begin() as trans: + try: + trans.execute( + """ + INSERT INTO geography.geoms (short_name, long_name, geo_kind_id, spatial_resolution, geom) + SELECT district_c as short_name, district_n as long_name, 1 as geo_kind_id, 2 as spatial_resolution, geom + FROM public.gambia_admin2;""" + ) + except: + pass # No gambia table + with engine.begin() as trans: + # Populate the geobridge + trans.execute( + """INSERT INTO geography.geo_bridge (location_id, gid, valid_from, valid_to, linkage_method_id) + SELECT locations.location_id, geoms.gid, '-Infinity'::date as valid_from, 'Infinity'::date as valid_to, 1 as linkage_method_id from interactions.locations LEFT JOIN geography.geoms ON ST_Intersects(position, geom); + """ + ) + + events = [ + ( + """ + WITH event_data AS (SELECT + caller_ident.subscriber_id, + caller_loc.location_id, + time_dim_id, + date_dim_id, + callee_ident.subscriber_id as called_subscriber_id, + callee_loc.location_id as called_party_location_id, + calling_party_msisdn, + called_party_msisdn, + duration, + event_timestamp + FROM + (SELECT id, duration as duration, datetime as event_timestamp, location_id as caller_location_id, + msisdn as calling_party_msisdn, tac as caller_tac FROM events.calls + WHERE outgoing) callers + LEFT JOIN (SELECT id, location_id as callee_location_id, + msisdn as called_party_msisdn, tac as callee_tac FROM events.calls + WHERE not outgoing) called + USING (id) + LEFT JOIN + interactions.subscriber AS caller_ident + ON caller_ident.msisdn=calling_party_msisdn AND caller_ident.tac=caller_tac + LEFT JOIN + interactions.subscriber AS callee_ident + ON callee_ident.msisdn=called_party_msisdn AND callee_ident.tac=callee_tac + LEFT JOIN + cell_id_mapping AS caller_loc + ON caller_location_id=caller_loc.mno_cell_id AND caller_loc.valid_period @> event_timestamp::date + LEFT JOIN + cell_id_mapping AS callee_loc + ON callee_location_id=callee_loc.mno_cell_id AND callee_loc.valid_period @> event_timestamp::date + LEFT JOIN + d_date ON event_timestamp::date = date_actual + LEFT JOIN + d_time ON + EXTRACT(HOUR from event_timestamp) = hour_of_day), + call_data AS + + (INSERT INTO interactions.event_supertable (subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp, event_type_id) + SELECT subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp, (SELECT event_type_id FROM interactions.d_event_type WHERE name='calls') + FROM event_data + RETURNING *) + + INSERT INTO interactions.calls (event_id, date_dim_id, called_subscriber_id, called_party_location_id, calling_party_msisdn, called_party_msisdn, duration) + SELECT event_id, date_dim_id, called_subscriber_id, called_party_location_id, calling_party_msisdn, called_party_msisdn, duration FROM call_data NATURAL JOIN event_data; + """, + "Call events", + ), + ( + """ + WITH event_data AS (SELECT caller_ident.subscriber_id, + caller_loc.location_id, + time_dim_id as time_dim_id, + date_dim_id as date_dim_id, + callee_ident.subscriber_id as called_subscriber_id, + callee_loc.location_id as called_party_location_id, + calling_party_msisdn, + called_party_msisdn, + event_timestamp + FROM + (SELECT id, datetime as event_timestamp, location_id as caller_location_id, + msisdn as calling_party_msisdn, tac as caller_tac FROM events.sms + WHERE outgoing) callers + LEFT JOIN (SELECT id, location_id as callee_location_id, + msisdn as called_party_msisdn, tac as callee_tac FROM events.sms + WHERE not outgoing) called + USING (id) + LEFT JOIN + interactions.subscriber AS caller_ident + ON caller_ident.msisdn=calling_party_msisdn AND caller_ident.tac=caller_tac + LEFT JOIN + interactions.subscriber AS callee_ident + ON callee_ident.msisdn=called_party_msisdn AND callee_ident.tac=callee_tac + LEFT JOIN + cell_id_mapping AS caller_loc + ON caller_location_id=caller_loc.mno_cell_id AND caller_loc.valid_period @> event_timestamp::date + LEFT JOIN + cell_id_mapping AS callee_loc + ON callee_location_id=callee_loc.mno_cell_id AND callee_loc.valid_period @> event_timestamp::date + LEFT JOIN + d_date ON event_timestamp::date = date_actual + LEFT JOIN + d_time ON + EXTRACT(HOUR from event_timestamp) = hour_of_day), + sms_data AS + (INSERT INTO interactions.event_supertable (subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp, event_type_id) + SELECT subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp, (SELECT event_type_id FROM interactions.d_event_type WHERE name='sms') + FROM event_data + RETURNING *) + + INSERT INTO interactions.sms (event_id, date_dim_id, called_subscriber_id, called_party_location_id, calling_party_msisdn, called_party_msisdn) + SELECT event_id, date_dim_id, called_subscriber_id, called_party_location_id, calling_party_msisdn, called_party_msisdn FROM sms_data NATURAL JOIN event_data; + """, + "SMS events", + ), + ( + """ + WITH event_data AS (SELECT caller_ident.subscriber_id, + caller_loc.location_id, + time_dim_id, + date_dim_id, + volume_total as data_volume_total, + volume_upload as data_volume_up, + volume_download as data_volume_down, + duration, + event_timestamp + FROM + (SELECT datetime as event_timestamp, location_id as caller_location_id, + msisdn as calling_party_msisdn, tac as caller_tac, volume_total, volume_upload, volume_download, + duration + FROM events.mds) mds + LEFT JOIN + interactions.subscriber AS caller_ident + ON caller_ident.msisdn=calling_party_msisdn AND caller_ident.tac=caller_tac + LEFT JOIN + cell_id_mapping AS caller_loc + ON caller_location_id=caller_loc.mno_cell_id AND caller_loc.valid_period @> event_timestamp::date + LEFT JOIN + d_date ON event_timestamp::date = date_actual + LEFT JOIN + d_time ON + EXTRACT(HOUR from event_timestamp) = hour_of_day), + mds_data AS + (INSERT INTO interactions.event_supertable (subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp, event_type_id) + SELECT subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp, (SELECT event_type_id FROM interactions.d_event_type WHERE name='mds') + FROM event_data + RETURNING *) + + INSERT INTO interactions.mds (event_id, date_dim_id, data_volume_total, data_volume_up, + data_volume_down, + duration) + SELECT event_id, date_dim_id, data_volume_total, data_volume_up, + data_volume_down, + duration FROM mds_data NATURAL JOIN event_data; + """, + "MDS events", + ), + ( + """ + WITH event_data AS (SELECT caller_ident.subscriber_id, + caller_loc.location_id, + time_dim_id, + date_dim_id, + recharge_amount, + airtime_fee, + tax_and_fee, + pre_event_balance, + post_event_balance, + calling_party_msisdn, + caller_tac, + event_timestamp + FROM + (SELECT datetime as event_timestamp, location_id as caller_location_id, + msisdn as calling_party_msisdn, tac as caller_tac, recharge_amount, + airtime_fee, tax_and_fee, pre_event_balance, post_event_balance + FROM events.topups) topup + LEFT JOIN + interactions.subscriber AS caller_ident + ON caller_ident.msisdn=calling_party_msisdn AND caller_ident.tac=caller_tac + LEFT JOIN + cell_id_mapping AS caller_loc + ON caller_location_id=caller_loc.mno_cell_id AND caller_loc.valid_period @> event_timestamp::date + LEFT JOIN + d_date ON event_timestamp::date = date_actual + LEFT JOIN + d_time ON + EXTRACT(HOUR from event_timestamp) = hour_of_day), + topup_data AS + (INSERT INTO interactions.event_supertable (subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp, event_type_id) + SELECT subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp, (SELECT event_type_id FROM interactions.d_event_type WHERE name='topup') + FROM event_data + RETURNING *) + + INSERT INTO interactions.topup (event_id, date_dim_id, recharge_amount, airtime_fee, tax_and_fee, pre_event_balance, post_event_balance) + SELECT event_id, date_dim_id, recharge_amount, airtime_fee, tax_and_fee, pre_event_balance, post_event_balance FROM topup_data NATURAL JOIN event_data; + """, + "Topup events", + ), + ] + with log_duration("Migrate events."): + with ThreadPoolExecutor( + min(cpu_count(), int(os.getenv("MAX_CPUS", cpu_count()))) + ) as tp: + list(tp.map(do_exec, events)) + with log_duration("Migrate sightings."): + with engine.begin() as trans: + trans.execute( + """ + INSERT INTO interactions.subscriber_sightings (event_id, subscriber_id, location_id, time_dim_id, date_dim_id, sighting_timestamp) + SELECT event_id, subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp FROM interactions.event_supertable; + + INSERT INTO interactions.subscriber_sightings (event_id, subscriber_id, location_id, time_dim_id, date_dim_id, sighting_timestamp) + SELECT event_id, called_subscriber_id as subscriber_id, called_party_location_id as location_id, time_dim_id, date_dim_id, event_timestamp + FROM interactions.event_supertable NATURAL JOIN interactions.calls; + + INSERT INTO interactions.subscriber_sightings (event_id, subscriber_id, location_id, time_dim_id, date_dim_id, sighting_timestamp) + SELECT event_id, called_subscriber_id as subscriber_id, called_party_location_id as location_id, time_dim_id, date_dim_id, event_timestamp + FROM interactions.event_supertable NATURAL JOIN interactions.sms; + """ + ) + with log_duration("Drop events tables"): + with engine.begin() as trans: + trans.execute( + """ + DROP SCHEMA events CASCADE; + """ + ) diff --git a/flowdb_synthetic_data.Dockerfile b/flowdb_synthetic_data.Dockerfile index 73e7ee0d1b..44652f0c5c 100644 --- a/flowdb_synthetic_data.Dockerfile +++ b/flowdb_synthetic_data.Dockerfile @@ -38,6 +38,8 @@ RUN mkdir -p /docker-entrypoint-initdb.d/sql/syntheticdata/ && \ COPY --chown=postgres flowdb/testdata/bin/9900_ingest_synthetic_data.sh /docker-entrypoint-initdb.d/ COPY --chown=postgres flowdb/testdata/bin/9800_population_density.sql.gz /docker-entrypoint-initdb.d/ +COPY --chown=postgres flowdb/testdata/bin/9910_migrate_test_data.sh /docker-entrypoint-initdb.d/ +COPY --chown=postgres flowdb/testdata/bin/migrate_synth_data.py /docker-entrypoint-initdb.d/ COPY --chown=postgres flowdb/testdata/bin/9910_run_synthetic_dfs_data_generation_script.sh /docker-entrypoint-initdb.d/ COPY --chown=postgres flowdb/testdata/test_data/py/* /docker-entrypoint-initdb.d/py/testdata/ diff --git a/flowdb_synthetic_data.Dockerfile.dockerignore b/flowdb_synthetic_data.Dockerfile.dockerignore index 0ed8d196c7..c77be87c4d 100644 --- a/flowdb_synthetic_data.Dockerfile.dockerignore +++ b/flowdb_synthetic_data.Dockerfile.dockerignore @@ -12,6 +12,8 @@ !flowdb/testdata/bin/9900_ingest_synthetic_data.sh !flowdb/testdata/bin/9800_population_density.sql.gz !flowdb/testdata/bin/generate_synthetic_data*.py +!flowdb/testdata/bin/9910_migrate_test_data.sh +!flowdb/testdata/bin/migrate_synth_data.py !flowdb/testdata/test_data/sql/admin*.sql !flowdb/testdata/synthetic_data/data/NPL_admbnda_adm3_Districts_simplified.geojson !flowdb/testdata/synthetic_data/Pipfile* diff --git a/flowdb_testdata.Dockerfile.dockerignore b/flowdb_testdata.Dockerfile.dockerignore index fa6b71a5cc..1463f7ac31 100644 --- a/flowdb_testdata.Dockerfile.dockerignore +++ b/flowdb_testdata.Dockerfile.dockerignore @@ -12,7 +12,8 @@ # !flowdb/testdata/bin/9900_ingest_test_data.sh !flowdb/testdata/bin/9910_run_synthetic_dfs_data_generation_script.sh -!flowdb/testdata/bin/9910_migrate_test_data.sql +!flowdb/testdata/bin/9910_migrate_test_data.sh +!flowdb/testdata/bin/migrate_synth_data.py !flowdb/testdata/test_data/Pipfile !flowdb/testdata/test_data/Pipfile.lock !flowdb/testdata/test_data/sql/