Skip to content

Commit 40d6503

Browse files
fix schema
1 parent c1190c6 commit 40d6503

File tree

1 file changed

+28
-153
lines changed

1 file changed

+28
-153
lines changed

schemas/google_analytics_schemas.py

Lines changed: 28 additions & 153 deletions
Original file line numberDiff line numberDiff line change
@@ -1,153 +1,28 @@
1-
import json
2-
from datetime import datetime
3-
from tempfile import NamedTemporaryFile
4-
5-
from airflow.hooks.S3_hook import S3Hook
6-
from airflow.models import BaseOperator
7-
8-
from GoogleAnalyticsPlugin.hooks.google_analytics_hook import GoogleAnalyticsHook
9-
10-
11-
class GoogleAnalyticsReportingToS3Operator(BaseOperator):
12-
"""
13-
Google Analytics Reporting To S3 Operator
14-
15-
:param google_analytics_conn_id: The Google Analytics connection id.
16-
:type google_analytics_conn_id: string
17-
:param view_id: The view id for associated report.
18-
:type view_id: string/array
19-
:param since: The date up from which to pull GA data.
20-
This can either be a string in the format
21-
of '%Y-%m-%d %H:%M:%S' or '%Y-%m-%d'
22-
but in either case it will be
23-
passed to GA as '%Y-%m-%d'.
24-
:type since: string
25-
:param until: The date up to which to pull GA data.
26-
This can either be a string in the format
27-
of '%Y-%m-%d %H:%M:%S' or '%Y-%m-%d'
28-
but in either case it will be
29-
passed to GA as '%Y-%m-%d'.
30-
:type until: string
31-
:param s3_conn_id: The s3 connection id.
32-
:type s3_conn_id: string
33-
:param s3_bucket: The S3 bucket to be used to store
34-
the Google Analytics data.
35-
:type s3_bucket: string
36-
:param s3_key: The S3 key to be used to store
37-
the Hubspot data.
38-
:type s3_key: string
39-
"""
40-
41-
template_fields = ('s3_key',
42-
'since',
43-
'until')
44-
45-
def __init__(self,
46-
google_analytics_conn_id,
47-
view_id,
48-
since,
49-
until,
50-
dimensions,
51-
metrics,
52-
s3_conn_id,
53-
s3_bucket,
54-
s3_key,
55-
page_size=1000,
56-
include_empty_rows=True,
57-
sampling_level=None,
58-
*args,
59-
**kwargs):
60-
super().__init__(*args, **kwargs)
61-
62-
self.google_analytics_conn_id = google_analytics_conn_id
63-
self.view_id = view_id
64-
self.since = since
65-
self.until = until
66-
self.sampling_level = sampling_level
67-
self.dimensions = dimensions
68-
self.metrics = metrics
69-
self.page_size = page_size
70-
self.include_empty_rows = include_empty_rows
71-
self.s3_conn_id = s3_conn_id
72-
self.s3_bucket = s3_bucket
73-
self.s3_key = s3_key
74-
75-
self.metricMap = {
76-
'METRIC_TYPE_UNSPECIFIED': 'varchar(255)',
77-
'CURRENCY': 'decimal(20,5)',
78-
'INTEGER': 'int(11)',
79-
'FLOAT': 'decimal(20,5)',
80-
'PERCENT': 'decimal(20,5)',
81-
'TIME': 'time'
82-
}
83-
84-
if self.page_size > 10000:
85-
raise Exception('Please specify a page size equal to or lower than 10000.')
86-
87-
if not isinstance(self.include_empty_rows, bool):
88-
raise Exception('Please specificy "include_empty_rows" as a boolean.')
89-
90-
def execute(self, context):
91-
ga_conn = GoogleAnalyticsHook(self.google_analytics_conn_id)
92-
s3_conn = S3Hook(self.s3_conn_id)
93-
try:
94-
since_formatted = datetime.strptime(self.since, '%Y-%m-%d %H:%M:%S').strftime('%Y-%m-%d')
95-
except:
96-
since_formatted = str(self.since)
97-
try:
98-
until_formatted = datetime.strptime(self.until, '%Y-%m-%d %H:%M:%S').strftime('%Y-%m-%d')
99-
except:
100-
until_formatted = str(self.until)
101-
report = ga_conn.get_analytics_report(self.view_id,
102-
since_formatted,
103-
until_formatted,
104-
self.sampling_level,
105-
self.dimensions,
106-
self.metrics,
107-
self.page_size,
108-
self.include_empty_rows)
109-
110-
columnHeader = report.get('columnHeader', {})
111-
# Right now all dimensions are hardcoded to varchar(255), will need a map if any non-varchar dimensions are used in the future
112-
# Unfortunately the API does not send back types for Dimensions like it does for Metrics (yet..)
113-
dimensionHeaders = [
114-
{'name': header.replace('ga:', ''), 'type': 'varchar(255)'}
115-
for header
116-
in columnHeader.get('dimensions', [])
117-
]
118-
metricHeaders = [
119-
{'name': entry.get('name').replace('ga:', ''),
120-
'type': self.metricMap.get(entry.get('type'), 'varchar(255)')}
121-
for entry
122-
in columnHeader.get('metricHeader', {}).get('metricHeaderEntries', [])
123-
]
124-
125-
with NamedTemporaryFile("w") as ga_file:
126-
rows = report.get('data', {}).get('rows', [])
127-
128-
for row_counter, row in enumerate(rows):
129-
root_data_obj = {}
130-
dimensions = row.get('dimensions', [])
131-
metrics = row.get('metrics', [])
132-
133-
for index, dimension in enumerate(dimensions):
134-
header = dimensionHeaders[index].get('name').lower()
135-
root_data_obj[header] = dimension
136-
137-
for metric in metrics:
138-
data = {}
139-
data.update(root_data_obj)
140-
141-
for index, value in enumerate(metric.get('values', [])):
142-
header = metricHeaders[index].get('name').lower()
143-
data[header] = value
144-
145-
data['viewid'] = self.view_id
146-
data['timestamp'] = self.since
147-
148-
ga_file.write(json.dumps(data) + ('' if row_counter == len(rows) else '\n'))
149-
150-
s3_conn.load_file(ga_file.name,
151-
self.s3_key,
152-
self.s3_bucket,
153-
True)
1+
google_analytics_reporting_schema = [{'name': 'viewid',
2+
'type': 'int'},
3+
{'name': 'datehourminute',
4+
'type': 'timestamp'},
5+
{'name': 'keyword',
6+
'type': 'varchar(255)'},
7+
{'name': 'referralPath',
8+
'type': 'varchar(255)'},
9+
{'name': 'campaign',
10+
'type': 'varchar(255)'},
11+
{'name': 'sourceMedium',
12+
'type': 'varchar(255)'},
13+
{'name': 'dimension3',
14+
'type': 'varchar(255)'},
15+
{'name': 'dimension10',
16+
'type': 'varchar(255)'},
17+
{'name': 'pageViews',
18+
'type': 'int4'},
19+
{'name': 'bounces',
20+
'type': 'int4'},
21+
{'name': 'users',
22+
'type': 'int4'},
23+
{'name': 'newUsers',
24+
'type': 'int4'},
25+
{'name': 'goal1starts',
26+
'type': 'int4'},
27+
{'name': 'goal1completions',
28+
'type': 'int4'}]

0 commit comments

Comments
 (0)