Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions metaflow/metaflow_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -533,11 +533,6 @@
# lexicographic ordering of attempts. This won't work if MAX_ATTEMPTS > 99.
MAX_ATTEMPTS = 6

# Feature flag (experimental features that are *explicitly* unsupported)

# Process configs even when using the click_api for Runner/Deployer
CLICK_API_PROCESS_CONFIG = from_conf("CLICK_API_PROCESS_CONFIG", False)


# PINNED_CONDA_LIBS are the libraries that metaflow depends on for execution
# and are needed within a conda environment
Expand Down
120 changes: 58 additions & 62 deletions metaflow/runner/click_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
from metaflow.exception import MetaflowException
from metaflow.flowspec import _FlowState
from metaflow.includefile import FilePathClass
from metaflow.metaflow_config import CLICK_API_PROCESS_CONFIG
from metaflow.parameters import JSONTypeClass, flow_context
from metaflow.user_configs.config_options import (
ConfigValue,
Expand Down Expand Up @@ -446,83 +445,80 @@ def _compute_flow_parameters(self):
self._cached_computed_parameters = []

config_options = None
if CLICK_API_PROCESS_CONFIG:
with flow_context(self._flow_cls) as _:
# We are going to resolve the configs first and then get the parameters.
# Note that configs may update/add parameters so the order is important
# Since part of the processing of configs happens by click, we need to
# "fake" it.

# Extract any config options as well as datastore and quiet options
method_params = self._chain[0][self._API_NAME]
opts = method_params["options"]
defaults = method_params["defaults"]

ds = opts.get("datastore", defaults["datastore"])
quiet = opts.get("quiet", defaults["quiet"])
is_default = False
config_file = opts.get("config")
if config_file is None:
is_default = True
config_file = defaults.get("config")

if config_file:
config_file = dict(
map(
lambda x: (
x[0],
ConvertPath.convert_value(x[1], is_default),
),
config_file,
)
with flow_context(self._flow_cls) as _:
# We are going to resolve the configs first and then get the parameters.
# Note that configs may update/add parameters so the order is important
# Since part of the processing of configs happens by click, we need to
# "fake" it.

# Extract any config options as well as datastore and quiet options
method_params = self._chain[0][self._API_NAME]
opts = method_params["options"]
defaults = method_params["defaults"]

ds = opts.get("datastore", defaults["datastore"])
quiet = opts.get("quiet", defaults["quiet"])
is_default = False
config_file = opts.get("config")
if config_file is None:
is_default = True
config_file = defaults.get("config")

if config_file:
config_file = dict(
map(
lambda x: (
x[0],
ConvertPath.convert_value(x[1], is_default),
),
config_file,
)
)

is_default = False
config_value = opts.get("config-value")
if config_value is None:
is_default = True
config_value = defaults.get("config_value")

if config_value:
config_value = dict(
map(
lambda x: (
x[0],
ConvertDictOrStr.convert_value(x[1], is_default),
),
config_value,
)
is_default = False
config_value = opts.get("config-value")
if config_value is None:
is_default = True
config_value = defaults.get("config_value")

if config_value:
config_value = dict(
map(
lambda x: (
x[0],
ConvertDictOrStr.convert_value(x[1], is_default),
),
config_value,
)
)

if (config_file is None) ^ (config_value is None):
# If we have one, we should have the other
raise MetaflowException(
"Options were not properly set -- this is an internal error."
)
if (config_file is None) ^ (config_value is None):
# If we have one, we should have the other
raise MetaflowException(
"Options were not properly set -- this is an internal error."
)

if config_file:
# Process both configurations; the second one will return all the merged
# configuration options properly processed.
self._config_input.process_configs(
self._flow_cls.__name__, "config", config_file, quiet, ds
)
config_options = self._config_input.process_configs(
self._flow_cls.__name__, "config_value", config_value, quiet, ds
)
if config_file:
# Process both configurations; the second one will return all the merged
# configuration options properly processed.
self._config_input.process_configs(
self._flow_cls.__name__, "config", config_file, quiet, ds
)
config_options = self._config_input.process_configs(
self._flow_cls.__name__, "config_value", config_value, quiet, ds
)

# At this point, we are like in start() in cli.py -- we obtained the
# properly processed config_options which we can now use to process
# the config decorators (including StepMutator/FlowMutator)
# Note that if CLICK_API_PROCESS_CONFIG is False, we still do this because
# it will init all parameters (config_options will be None)
# We ignore any errors if we don't check the configs in the click API.

# Init all values in the flow mutators and then process them
for decorator in self._flow_cls._flow_state.get(_FlowState.FLOW_MUTATORS, []):
decorator.external_init()

new_cls = self._flow_cls._process_config_decorators(
config_options, process_configs=CLICK_API_PROCESS_CONFIG
config_options, process_configs=True
)
if new_cls:
self._flow_cls = new_cls
Expand Down
16 changes: 4 additions & 12 deletions metaflow/runner/deployer_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

from typing import Any, ClassVar, Dict, Optional, TYPE_CHECKING, Type, List

from metaflow.metaflow_config import CLICK_API_PROCESS_CONFIG

from .subprocess_manager import SubprocessManager
from .utils import get_lower_level_group, handle_timeout, temporary_fifo, with_dir

Expand Down Expand Up @@ -150,16 +148,10 @@ def _create(
) -> "metaflow.runner.deployer.DeployedFlow":
with temporary_fifo() as (attribute_file_path, attribute_file_fd):
# every subclass needs to have `self.deployer_kwargs`
# TODO: Get rid of CLICK_API_PROCESS_CONFIG in the near future
if CLICK_API_PROCESS_CONFIG:
# We need to run this in the cwd because configs depend on files
# that may be located in paths relative to the directory the user
# wants to run in
with with_dir(self.cwd):
command = get_lower_level_group(
self.api, self.top_level_kwargs, self.TYPE, self.deployer_kwargs
).create(deployer_attribute_file=attribute_file_path, **kwargs)
else:
# We need to run this in the cwd because configs depend on files
# that may be located in paths relative to the directory the user
# wants to run in
with with_dir(self.cwd):
command = get_lower_level_group(
self.api, self.top_level_kwargs, self.TYPE, self.deployer_kwargs
).create(deployer_attribute_file=attribute_file_path, **kwargs)
Expand Down
30 changes: 4 additions & 26 deletions metaflow/runner/metaflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

from metaflow import Run

from metaflow.metaflow_config import CLICK_API_PROCESS_CONFIG

from metaflow.plugins import get_runner_cli

from .utils import (
Expand Down Expand Up @@ -377,12 +375,7 @@ def run(self, **kwargs) -> ExecutingRun:
ExecutingRun containing the results of the run.
"""
with temporary_fifo() as (attribute_file_path, attribute_file_fd):
if CLICK_API_PROCESS_CONFIG:
with with_dir(self.cwd):
command = self.api(**self.top_level_kwargs).run(
runner_attribute_file=attribute_file_path, **kwargs
)
else:
with with_dir(self.cwd):
command = self.api(**self.top_level_kwargs).run(
runner_attribute_file=attribute_file_path, **kwargs
)
Expand Down Expand Up @@ -414,12 +407,7 @@ def resume(self, **kwargs) -> ExecutingRun:
ExecutingRun containing the results of the resumed run.
"""
with temporary_fifo() as (attribute_file_path, attribute_file_fd):
if CLICK_API_PROCESS_CONFIG:
with with_dir(self.cwd):
command = self.api(**self.top_level_kwargs).resume(
runner_attribute_file=attribute_file_path, **kwargs
)
else:
with with_dir(self.cwd):
command = self.api(**self.top_level_kwargs).resume(
runner_attribute_file=attribute_file_path, **kwargs
)
Expand Down Expand Up @@ -453,12 +441,7 @@ async def async_run(self, **kwargs) -> ExecutingRun:
ExecutingRun representing the run that was started.
"""
with temporary_fifo() as (attribute_file_path, attribute_file_fd):
if CLICK_API_PROCESS_CONFIG:
with with_dir(self.cwd):
command = self.api(**self.top_level_kwargs).run(
runner_attribute_file=attribute_file_path, **kwargs
)
else:
with with_dir(self.cwd):
command = self.api(**self.top_level_kwargs).run(
runner_attribute_file=attribute_file_path, **kwargs
)
Expand Down Expand Up @@ -491,12 +474,7 @@ async def async_resume(self, **kwargs) -> ExecutingRun:
ExecutingRun representing the resumed run that was started.
"""
with temporary_fifo() as (attribute_file_path, attribute_file_fd):
if CLICK_API_PROCESS_CONFIG:
with with_dir(self.cwd):
command = self.api(**self.top_level_kwargs).resume(
runner_attribute_file=attribute_file_path, **kwargs
)
else:
with with_dir(self.cwd):
command = self.api(**self.top_level_kwargs).resume(
runner_attribute_file=attribute_file_path, **kwargs
)
Expand Down
8 changes: 4 additions & 4 deletions metaflow/user_configs/config_parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -569,10 +569,10 @@ def resolve_delayed_evaluator(
if ignore_errors:
# Assumption is that default value of None is always allowed.
# This code path is *only* used when evaluating Parameters AND they
# use configs in their attributes AND the runner/deployer is being used
# AND CLICK_API_PROCESS_CONFIG is False. In those cases, all attributes in
# Parameter can be set to None except for required and show_default
# and even in those cases, a wrong value will have very limited consequence.
# use configs in their attributes AND the runner/deployer is being used.
# In those cases, all attributes in Parameter can be set to None except
# for required and show_default and even in those cases, a wrong value
# will have very limited consequence.
return None
raise e

Expand Down
2 changes: 1 addition & 1 deletion test/core/tests/basic_config_parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class BasicConfigTest(MetaflowTest):
"silly_config": {
"required": True,
"parser": "silly_parser",
"default": "'silly.txt'",
"default": "'basic_config_silly.txt'",
},
"config2": {},
# Test using a function to get the value
Expand Down
Loading