|
| 1 | +from google_analytics_plugin.hooks.google_analytics_hook import GoogleAnalyticsHook |
| 2 | + |
| 3 | +from airflow.hooks.S3_hook import S3Hook |
| 4 | +from airflow.models import BaseOperator |
| 5 | + |
| 6 | +import hashlib |
| 7 | +import json |
| 8 | +import os |
| 9 | +from datetime import datetime |
| 10 | + |
| 11 | +class GoogleAnalyticsReportingToS3Operator(BaseOperator): |
| 12 | + template_fields = ('s3_key', 'since', 'until') |
| 13 | + |
| 14 | + def __init__(self, |
| 15 | + google_analytics_conn_id, |
| 16 | + view_id, |
| 17 | + since, |
| 18 | + until, |
| 19 | + sampling_level, |
| 20 | + dimensions, |
| 21 | + metrics, |
| 22 | + page_size, |
| 23 | + include_empty_rows, |
| 24 | + s3_conn_id, |
| 25 | + s3_bucket, |
| 26 | + s3_key, |
| 27 | + *args, |
| 28 | + **kwargs): |
| 29 | + super().__init__(*args, **kwargs) |
| 30 | + |
| 31 | + self.google_analytics_conn_id = google_analytics_conn_id |
| 32 | + self.view_id = view_id |
| 33 | + self.since = since |
| 34 | + self.until = until |
| 35 | + self.sampling_level = sampling_level |
| 36 | + self.dimensions = dimensions |
| 37 | + self.metrics = metrics |
| 38 | + self.page_size = page_size |
| 39 | + self.include_empty_rows = include_empty_rows |
| 40 | + self.s3_conn_id = s3_conn_id |
| 41 | + self.s3_bucket = s3_bucket |
| 42 | + self.s3_key = s3_key |
| 43 | + |
| 44 | + self.metricMap = { |
| 45 | + 'METRIC_TYPE_UNSPECIFIED': 'varchar(255)', |
| 46 | + 'CURRENCY': 'decimal(20,5)', |
| 47 | + 'INTEGER': 'int(11)', |
| 48 | + 'FLOAT': 'decimal(20,5)', |
| 49 | + 'PERCENT': 'decimal(20,5)', |
| 50 | + 'TIME': 'time' |
| 51 | + } |
| 52 | + |
| 53 | + def execute(self, context): |
| 54 | + ga_conn = GoogleAnalyticsHook(self.google_analytics_conn_id) |
| 55 | + s3_conn = S3Hook(self.s3_conn_id) |
| 56 | + |
| 57 | + # This has to be here because template_fields are not yet parsed in the __init__ function |
| 58 | + since_formatted = datetime.strptime(self.since, '%Y-%m-%d %H:%M:%S').strftime('%Y-%m-%d') |
| 59 | + until_formatted = datetime.strptime(self.until, '%Y-%m-%d %H:%M:%S').strftime('%Y-%m-%d') |
| 60 | + |
| 61 | + report = ga_conn.get_analytics_report(self.view_id, since_formatted, until_formatted, self.sampling_level, self.dimensions, self.metrics, self.page_size, self.include_empty_rows) |
| 62 | + |
| 63 | + columnHeader = report.get('columnHeader', {}) |
| 64 | + # Right now all dimensions are hardcoded to varchar(255), will need a map if any non-varchar dimensions are used in the future |
| 65 | + # Unfortunately the API does not send back types for Dimensions like it does for Metrics (yet..) |
| 66 | + dimensionHeaders = [ |
| 67 | + { 'name': header.replace('ga:', ''), 'type': 'varchar(255)' } |
| 68 | + for header |
| 69 | + in columnHeader.get('dimensions', []) |
| 70 | + ] |
| 71 | + metricHeaders = [ |
| 72 | + { 'name': entry.get('name').replace('ga:', ''), 'type': self.metricMap.get(entry.get('type'), 'varchar(255)') } |
| 73 | + for entry |
| 74 | + in columnHeader.get('metricHeader', {}).get('metricHeaderEntries', []) |
| 75 | + ] |
| 76 | + |
| 77 | + file_name = '/tmp/{key}.jsonl'.format(key=self.s3_key) |
| 78 | + with open(file_name, 'w') as ga_file: |
| 79 | + rows = report.get('data', {}).get('rows', []) |
| 80 | + |
| 81 | + for row_counter, row in enumerate(rows): |
| 82 | + root_data_obj = {} |
| 83 | + dimensions = row.get('dimensions', []) |
| 84 | + metrics = row.get('metrics', []) |
| 85 | + |
| 86 | + for index, dimension in enumerate(dimensions): |
| 87 | + header = dimensionHeaders[index].get('name').lower() |
| 88 | + root_data_obj[header] = dimension |
| 89 | + |
| 90 | + for metric in metrics: |
| 91 | + data = {} |
| 92 | + data.update(root_data_obj) |
| 93 | + |
| 94 | + for index, value in enumerate(metric.get('values', [])): |
| 95 | + header = metricHeaders[index].get('name').lower() |
| 96 | + data[header] = value |
| 97 | + |
| 98 | + data['viewid'] = self.view_id |
| 99 | + data['timestamp'] = self.since |
| 100 | + |
| 101 | + ga_file.write(json.dumps(data) + ('' if row_counter == len(rows) else '\n')) |
| 102 | + |
| 103 | + s3_conn.load_file(file_name, self.s3_key, self.s3_bucket, True) |
| 104 | + os.remove(file_name) |
0 commit comments