Skip to content

Commit 95da251

Browse files
ferrasFerras Hamad
andauthored
allow tagging of metadata and add attempt status to metadata table (#371)
* allow tagging of metadata and add attempt status to metadata table * update name, value, and type of metadata Co-authored-by: Ferras Hamad <[email protected]>
1 parent 640f11d commit 95da251

File tree

8 files changed

+28
-10
lines changed

8 files changed

+28
-10
lines changed

metaflow/datastore/local.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ def done(self):
229229
raise
230230
self.metadata.register_metadata(
231231
self.run_id, self.step_name, self.task_id,
232-
[MetaDatum(field='attempt-done', value=str(self.attempt), type='attempt-done')])
232+
[MetaDatum(field='attempt-done', value=str(self.attempt), type='attempt-done', tags=[])])
233233

234234
self._is_done_set = True
235235

metaflow/datastore/s3.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ def done(self):
254254

255255
self.metadata.register_metadata(
256256
self.run_id, self.step_name, self.task_id,
257-
[MetaDatum(field='attempt-done', value=str(self.attempt), type='attempt-done')])
257+
[MetaDatum(field='attempt-done', value=str(self.attempt), type='attempt-done', tags=[])])
258258

259259
self._is_done_set = True
260260

metaflow/metadata/metadata.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
'name ds_type url type sha')
1313

1414
MetaDatum = namedtuple('MetaDatum',
15-
'field value type')
15+
'field value type tags')
1616

1717

1818
class MetadataProviderMeta(type):
@@ -430,6 +430,7 @@ def _metadata_to_json(self, run_id, step_name, task_id, metadata):
430430
'field_name': datum.field,
431431
'type': datum.type,
432432
'value': datum.value,
433+
'tags': datum.tags,
433434
'user_name': user,
434435
'ts_epoch': int(round(time.time() * 1000))} for datum in metadata]
435436

@@ -457,7 +458,8 @@ def _register_code_package_metadata(self, run_id, step_name, task_id):
457458
metadata.append(MetaDatum(
458459
field='code-package',
459460
value=json.dumps({'ds_type': code_ds, 'sha': code_sha, 'location': code_url}),
460-
type='code-package'))
461+
type='code-package',
462+
tags=[]))
461463
if metadata:
462464
self.register_metadata(run_id, step_name, task_id, metadata)
463465

metaflow/plugins/aws/batch/batch_decorator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ def task_pre_step(self,
182182
meta['aws-batch-job-attempt'] = os.environ['AWS_BATCH_JOB_ATTEMPT']
183183
meta['aws-batch-ce-name'] = os.environ['AWS_BATCH_CE_NAME']
184184
meta['aws-batch-jq-name'] = os.environ['AWS_BATCH_JQ_NAME']
185-
entries = [MetaDatum(field=k, value=v, type=k) for k, v in meta.items()]
185+
entries = [MetaDatum(field=k, value=v, type=k, tags=[]) for k, v in meta.items()]
186186
# Register book-keeping metadata for debugging.
187187
metadata.register_metadata(run_id, step_name, task_id, entries)
188188

metaflow/plugins/aws/step_functions/step_functions_decorator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ def task_pre_step(self,
2424
meta['aws-step-functions-execution'] = os.environ['METAFLOW_RUN_ID']
2525
meta['aws-step-functions-state-machine'] =\
2626
os.environ['SFN_STATE_MACHINE']
27-
entries = [MetaDatum(field=k, value=v, type=k) for k, v in meta.items()]
27+
entries = [MetaDatum(field=k, value=v, type=k, tags=[]) for k, v in meta.items()]
2828
# Register book-keeping metadata for debugging.
2929
metadata.register_metadata(run_id, step_name, task_id, entries)
3030

metaflow/plugins/conda/conda_step_decorator.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,8 @@ def task_pre_step(
224224
meta.register_metadata(run_id, step_name, task_id,
225225
[MetaDatum(field='conda_env_id',
226226
value=self._env_id(),
227-
type='conda_env_id')])
227+
type='conda_env_id',
228+
tags=[])])
228229

229230
def runtime_step_cli(self, cli_args, retry_count, max_user_code_retries):
230231
if self.is_enabled() and 'batch' not in cli_args.commands:

metaflow/runtime.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -642,7 +642,8 @@ def save_logs(self, logtype, logs):
642642
'ds_type': self._ds.TYPE,
643643
'location': location,
644644
'attempt': self.retries}),
645-
type='log_path')]
645+
type='log_path',
646+
tags=[])]
646647
self.metadata.register_metadata(self.run_id,
647648
self.step,
648649
self.task_id,

metaflow/task.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -257,10 +257,12 @@ def run_step(self,
257257
task_id,
258258
[MetaDatum(field='attempt',
259259
value=str(retry_count),
260-
type='attempt'),
260+
type='attempt',
261+
tags=[]),
261262
MetaDatum(field='origin-run-id',
262263
value=str(origin_run_id),
263-
type='origin-run-id')])
264+
type='origin-run-id',
265+
tags=[])])
264266

265267
step_func = getattr(self.flow, step_name)
266268
node = self.flow._graph[step_name]
@@ -447,6 +449,18 @@ def run_step(self,
447449
}
448450
logger.log(msg)
449451

452+
attempt_ok = str(bool(int(self.flow._task_ok)))
453+
self.metadata.register_metadata(run_id,
454+
step_name,
455+
task_id,
456+
[MetaDatum(field='attempt_ok',
457+
value=attempt_ok,
458+
type='internal_attempt_status',
459+
tags=["attempt_id:{0}".
460+
format(str(retry_count))
461+
])
462+
])
463+
450464
output.save_metadata('task_end', {})
451465
output.persist(self.flow)
452466

0 commit comments

Comments
 (0)