Skip to content

Commit ac7d7df

Browse files
romain-inteltuulos
andauthored
Addressed an issue with class members (#804)
* Addressed an issue with class members Class members that were not parameters caused an error (regression in 2.4.0). This addresses this issue. This patch also clarifies and makes explicit that modifying class variables is not safe; an error message is now raised. * Avoid persisting class variables multiple times. This treats class variables more like parameters (constants) and persists them once in the renamed persist_constants (used to be persist_parameters). * add constants test * Make test pass (hopefully) * add CLASS_VARS to tests Co-authored-by: Ville Tuulos <[email protected]>
1 parent d8646b9 commit ac7d7df

File tree

11 files changed

+132
-37
lines changed

11 files changed

+132
-37
lines changed

docs/lifecycle.dot

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ digraph Metaflow {
5353
command_run [label="{command|run}", fillcolor=tan]
5454
new_run_id [label="{metadata|new_run_id}", fillcolor=lightgoldenrod1]
5555
runtime_init [label="{decorator|runtime_init}", fillcolor=lightblue2]
56-
local_params [label="{runtime|persist_parameters}", fillcolor=lightpink2]
56+
local_params [label="{runtime|persist_constants}", fillcolor=lightpink2]
5757
start_run_heartbeat [label="{metadata|start_run_heartbeat}", fillcolor=lightgoldenrod1]
5858
schedule_local_task [shape="circle", label="Schedule\nTask", width=1, fillcolor=grey78]
5959
runtime_finished [label="{decorator|runtime_finished}", fillcolor=lightblue2]
@@ -99,7 +99,7 @@ digraph Metaflow {
9999
stepfunctions_run [label="{AWS Step Functions|start_execution}", fillcolor=lightpink2]
100100
stepfunctions_bootstrap_batch [shape="circle", label="Bootstrap\nAWS Batch", width=1, fillcolor=grey78]
101101
stepfunctions_init [label="{command|init}" fillcolor=tan]
102-
stepfunctions_params [label="{runtime|persist_parameters}", fillcolor=lightpink2]
102+
stepfunctions_params [label="{runtime|persist_constants}", fillcolor=lightpink2]
103103
stepfunctions_task [shape="circle", label="Execute\nTask", width=1, fillcolor=grey78]
104104
}
105105

metaflow/cli.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -592,8 +592,8 @@ def init(obj, run_id=None, task_id=None, tags=None, **kwargs):
592592
obj.monitor,
593593
run_id=run_id,
594594
)
595-
parameters.set_parameters(obj.flow, kwargs)
596-
runtime.persist_parameters(task_id=task_id)
595+
obj.flow._set_constants(kwargs)
596+
runtime.persist_constants(task_id=task_id)
597597

598598

599599
def common_run_options(func):
@@ -711,7 +711,7 @@ def resume(
711711
max_num_splits=max_num_splits,
712712
max_log_size=max_log_size * 1024 * 1024,
713713
)
714-
runtime.persist_parameters()
714+
runtime.persist_constants()
715715
runtime.execute()
716716

717717
write_run_id(run_id_file, runtime.run_id)
@@ -766,8 +766,8 @@ def run(
766766
write_latest_run_id(obj, runtime.run_id)
767767
write_run_id(run_id_file, runtime.run_id)
768768

769-
parameters.set_parameters(obj.flow, kwargs)
770-
runtime.persist_parameters()
769+
obj.flow._set_constants(kwargs)
770+
runtime.persist_constants()
771771
runtime.execute()
772772

773773

metaflow/datastore/task_datastore.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -643,7 +643,7 @@ def persist(self, flow):
643643
for var in dir(flow):
644644
if var.startswith("__") or var in flow._EPHEMERAL:
645645
continue
646-
# Skip over properties of the class (Parameters)
646+
# Skip over properties of the class (Parameters or class variables)
647647
if hasattr(flow.__class__, var) and isinstance(
648648
getattr(flow.__class__, var), property
649649
):

metaflow/flowspec.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import sys
44
import inspect
55
import traceback
6+
from types import FunctionType, MethodType
67

78
from . import cmd_with_io
89
from .parameters import Parameter
@@ -106,6 +107,44 @@ def script_name(self):
106107
fname = fname[:-1]
107108
return os.path.basename(fname)
108109

110+
def _set_constants(self, kwargs):
111+
# Persist values for parameters and other constants (class level variables)
112+
# only once. This method is called before persist_constants is called to
113+
# persist all values set using setattr
114+
seen = set()
115+
for var, param in self._get_parameters():
116+
norm = param.name.lower()
117+
if norm in seen:
118+
raise MetaflowException(
119+
"Parameter *%s* is specified twice. "
120+
"Note that parameter names are "
121+
"case-insensitive." % param.name
122+
)
123+
seen.add(norm)
124+
seen.clear()
125+
self._success = True
126+
127+
for var, param in self._get_parameters():
128+
seen.add(var)
129+
val = kwargs[param.name.replace("-", "_").lower()]
130+
# Support for delayed evaluation of parameters. This is used for
131+
# includefile in particular
132+
if callable(val):
133+
val = val()
134+
val = val.split(param.separator) if val and param.separator else val
135+
setattr(self, var, val)
136+
137+
# Do the same for class variables which will be forced constant as modifications
138+
# to them don't propagate well since we create a new process for each step and
139+
# re-read the flow file
140+
for var in dir(self.__class__):
141+
if var[0] == "_" or var in self._NON_PARAMETERS or var in seen:
142+
continue
143+
val = getattr(self.__class__, var)
144+
if isinstance(val, (MethodType, FunctionType, property, type)):
145+
continue
146+
setattr(self, var, val)
147+
109148
def _get_parameters(self):
110149
for var in dir(self):
111150
if var[0] == "_" or var in self._NON_PARAMETERS:

metaflow/includefile.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ class FilePathClass(click.ParamType):
232232
# + If the value is already such a string, nothing happens and it returns that same value
233233
# + If the value is a LocalFile, it will persist the local file and return the path
234234
# of the persisted file
235-
# - The artifact will be persisted prior to any run (for non-scheduled runs through persist_parameters)
235+
# - The artifact will be persisted prior to any run (for non-scheduled runs through persist_constants)
236236
# + This will therefore persist a simple string
237237
# - When the parameter is loaded again, the load_parameter in the IncludeFile class will get called
238238
# which will download and return the bytes of the persisted file.

metaflow/parameters.py

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -250,26 +250,3 @@ def wrapper(cmd):
250250
return cmd
251251

252252
return wrapper
253-
254-
255-
def set_parameters(flow, kwargs):
256-
seen = set()
257-
for var, param in flow._get_parameters():
258-
norm = param.name.lower()
259-
if norm in seen:
260-
raise MetaflowException(
261-
"Parameter *%s* is specified twice. "
262-
"Note that parameter names are "
263-
"case-insensitive." % param.name
264-
)
265-
seen.add(norm)
266-
267-
flow._success = True
268-
for var, param in flow._get_parameters():
269-
val = kwargs[param.name.replace("-", "_").lower()]
270-
# Support for delayed evaluation of parameters. This is used for
271-
# includefile in particular
272-
if callable(val):
273-
val = val()
274-
val = val.split(param.separator) if val and param.separator else val
275-
setattr(flow, var, val)

metaflow/runtime.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ def _new_task(self, step, input_paths=None, **kwargs):
176176
def run_id(self):
177177
return self._run_id
178178

179-
def persist_parameters(self, task_id=None):
179+
def persist_constants(self, task_id=None):
180180
task = self._new_task("_parameters", task_id=task_id)
181181
if not task.is_cloned:
182182
task.persist(self._flow)
@@ -614,7 +614,7 @@ def __init__(
614614
if task_id is None:
615615
task_id = str(metadata.new_task_id(run_id, step))
616616
else:
617-
# task_id is preset only by persist_parameters() or control tasks.
617+
# task_id is preset only by persist_constants() or control tasks.
618618
if ubf_context == UBF_CONTROL:
619619
metadata.register_task_id(
620620
run_id, step, task_id, 0, sys_tags=[CONTROL_TASK_TAG]

metaflow/task.py

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import os
44
import time
55

6+
from types import MethodType, FunctionType
7+
68
from .metaflow_config import MAX_ATTEMPTS
79
from .metadata import MetaDatum
810
from .datastore import Inputs, TaskDataStoreSet
@@ -53,14 +55,18 @@ def _exec_step_function(self, step_function, input_obj=None):
5355
step_function(input_obj)
5456

5557
def _init_parameters(self, parameter_ds, passdown=True):
58+
def set_cls_var(_, __):
59+
raise AttributeError("Flow level attributes are not modifiable")
60+
61+
cls = self.flow.__class__
5662
# overwrite Parameters in the flow object
5763
vars = []
5864
for var, param in self.flow._get_parameters():
5965
# make the parameter a read-only property
6066
# note x=x binds the current value of x to the closure
6167
def property_setter(
6268
_,
63-
cls=self.flow.__class__,
69+
cls=cls,
6470
param=param,
6571
var=var,
6672
parameter_ds=parameter_ds,
@@ -69,11 +75,26 @@ def property_setter(
6975
setattr(cls, var, property(fget=lambda _, val=v: val))
7076
return v
7177

72-
setattr(self.flow.__class__, var, property(fget=property_setter))
78+
setattr(cls, var, property(fget=property_setter))
7379
vars.append(var)
80+
81+
param_only_vars = list(vars)
82+
# make class-level values read-only to be more consistent across steps in a flow
83+
# they are also only persisted once and so we similarly pass them down if
84+
# required
85+
for var in dir(cls):
86+
if var[0] == "_" or var in cls._NON_PARAMETERS or var in vars:
87+
continue
88+
val = getattr(cls, var)
89+
# Exclude methods, properties and other classes
90+
if isinstance(val, (MethodType, FunctionType, property, type)):
91+
continue
92+
setattr(cls, var, property(fget=lambda _, val=val: val, fset=set_cls_var))
93+
vars.append(var)
94+
7495
if passdown:
7596
self.flow._datastore.passdown_partial(parameter_ds, vars)
76-
return vars
97+
return param_only_vars
7798

7899
def _init_data(self, run_id, join_type, input_paths):
79100
# We prefer to use the parallelized version to initialize datastores

test/core/metaflow_test/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ class MetaflowTest(object):
9090
PRIORITY = 999999999
9191
PARAMETERS = {}
9292
INCLUDE_FILES = {}
93+
CLASS_VARS = {}
9394
HEADER = ""
9495

9596
def check_results(self, flow, checker):

test/core/metaflow_test/formatter.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,9 @@ def _flow_lines(self):
8888
yield 0, self.test.HEADER
8989
yield 0, "class %s(FlowSpec):" % self.flow_name
9090

91+
for var, val in self.test.CLASS_VARS.items():
92+
yield 1, '%s = %s' % (var, val)
93+
9194
for var, parameter in self.test.PARAMETERS.items():
9295
kwargs = ["%s=%s" % (k, v) for k, v in parameter.items()]
9396
yield 1, '%s = Parameter("%s", %s)' % (var, var, ",".join(kwargs))

0 commit comments

Comments
 (0)