Skip to content

Commit 1e1a3c7

Browse files
mvdbeekkysrpex
authored andcommitted
Move get_pulsar_app_config() and _ensure_manager_config() from CoexecutionLaunchMixin to BaseRemoteConfiguredJobClient
1 parent 1ff90c9 commit 1e1a3c7

File tree

2 files changed

+94
-57
lines changed

2 files changed

+94
-57
lines changed

pulsar/client/client.py

Lines changed: 93 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,91 @@ def _build_setup_message(self, command_line, dependencies_description, env, remo
374374
launch_params["setup_params"] = setup_params
375375
return launch_params
376376

377+
def get_pulsar_app_config(
378+
self,
379+
pulsar_app_config,
380+
container,
381+
wait_after_submission,
382+
manager_name,
383+
manager_type,
384+
dependencies_description,
385+
):
386+
387+
pulsar_app_config = pulsar_app_config or {}
388+
manager_config = self._ensure_manager_config(
389+
pulsar_app_config,
390+
manager_name,
391+
manager_type,
392+
)
393+
394+
if (
395+
"staging_directory" not in manager_config
396+
and "staging_directory" not in pulsar_app_config
397+
):
398+
pulsar_app_config["staging_directory"] = CONTAINER_STAGING_DIRECTORY
399+
400+
if self.amqp_key_prefix:
401+
pulsar_app_config["amqp_key_prefix"] = self.amqp_key_prefix
402+
403+
if "monitor" not in manager_config:
404+
manager_config["monitor"] = (
405+
MonitorStyle.BACKGROUND.value
406+
if wait_after_submission
407+
else MonitorStyle.NONE.value
408+
)
409+
if "persistence_directory" not in pulsar_app_config:
410+
pulsar_app_config["persistence_directory"] = os.path.join(
411+
CONTAINER_STAGING_DIRECTORY, "persisted_data"
412+
)
413+
elif "manager" in pulsar_app_config and manager_name != "_default_":
414+
log.warning(
415+
"'manager' set in app config but client has non-default manager '%s', this will cause communication"
416+
" failures, remove `manager` from app or client config to fix",
417+
manager_name,
418+
)
419+
420+
using_dependencies = container is None and dependencies_description is not None
421+
if using_dependencies and "dependency_resolution" not in pulsar_app_config:
422+
# Setup default dependency resolution for container above...
423+
dependency_resolution = {
424+
"cache": False,
425+
"use": True,
426+
"default_base_path": "/pulsar_dependencies",
427+
"cache_dir": "/pulsar_dependencies/_cache",
428+
"resolvers": [
429+
{ # TODO: add CVMFS resolution...
430+
"type": "conda",
431+
"auto_init": True,
432+
"auto_install": True,
433+
"prefix": "/pulsar_dependencies/conda",
434+
},
435+
{
436+
"type": "conda",
437+
"auto_init": True,
438+
"auto_install": True,
439+
"prefix": "/pulsar_dependencies/conda",
440+
"versionless": True,
441+
},
442+
],
443+
}
444+
pulsar_app_config["dependency_resolution"] = dependency_resolution
445+
return pulsar_app_config
446+
447+
def _ensure_manager_config(self, pulsar_app_config, manager_name, manager_type):
448+
if "manager" in pulsar_app_config:
449+
manager_config = pulsar_app_config["manager"]
450+
elif "managers" in pulsar_app_config:
451+
managers_config = pulsar_app_config["managers"]
452+
if manager_name not in managers_config:
453+
managers_config[manager_name] = {}
454+
manager_config = managers_config[manager_name]
455+
else:
456+
manager_config = {}
457+
pulsar_app_config["manager"] = manager_config
458+
if "type" not in manager_config:
459+
manager_config["type"] = manager_type
460+
return manager_config
461+
377462

378463
class MessagingClientManagerProtocol(ClientManagerProtocol):
379464
status_cache: Dict[str, Dict[str, Any]]
@@ -492,7 +577,7 @@ def launch(
492577
container_info=None,
493578
token_endpoint=None,
494579
pulsar_app_config=None,
495-
staging_manifest=None
580+
staging_manifest=None,
496581
) -> Optional[ExternalId]:
497582
"""
498583
"""
@@ -514,48 +599,15 @@ def launch(
514599

515600
manager_name = self.client_manager.manager_name
516601
manager_type = "coexecution" if container is not None else "unqueued"
517-
pulsar_app_config = pulsar_app_config or {}
518-
manager_config = self._ensure_manager_config(
519-
pulsar_app_config, manager_name, manager_type,
602+
pulsar_app_config = self.get_pulsar_app_config(
603+
pulsar_app_config=pulsar_app_config,
604+
container=container,
605+
wait_after_submission=wait_after_submission,
606+
manager_name=manager_name,
607+
manager_type=manager_type,
608+
dependencies_description=dependencies_description,
520609
)
521610

522-
if "staging_directory" not in manager_config and "staging_directory" not in pulsar_app_config:
523-
pulsar_app_config["staging_directory"] = CONTAINER_STAGING_DIRECTORY
524-
525-
if self.amqp_key_prefix:
526-
pulsar_app_config["amqp_key_prefix"] = self.amqp_key_prefix
527-
528-
if "monitor" not in manager_config:
529-
manager_config["monitor"] = MonitorStyle.BACKGROUND.value if wait_after_submission else MonitorStyle.NONE.value
530-
if "persistence_directory" not in pulsar_app_config:
531-
pulsar_app_config["persistence_directory"] = os.path.join(CONTAINER_STAGING_DIRECTORY, "persisted_data")
532-
elif "manager" in pulsar_app_config and manager_name != '_default_':
533-
log.warning(
534-
"'manager' set in app config but client has non-default manager '%s', this will cause communication"
535-
" failures, remove `manager` from app or client config to fix", manager_name)
536-
537-
using_dependencies = container is None and dependencies_description is not None
538-
if using_dependencies and "dependency_resolution" not in pulsar_app_config:
539-
# Setup default dependency resolution for container above...
540-
dependency_resolution = {
541-
"cache": False,
542-
"use": True,
543-
"default_base_path": "/pulsar_dependencies",
544-
"cache_dir": "/pulsar_dependencies/_cache",
545-
"resolvers": [{ # TODO: add CVMFS resolution...
546-
"type": "conda",
547-
"auto_init": True,
548-
"auto_install": True,
549-
"prefix": '/pulsar_dependencies/conda',
550-
}, {
551-
"type": "conda",
552-
"auto_init": True,
553-
"auto_install": True,
554-
"prefix": '/pulsar_dependencies/conda',
555-
"versionless": True,
556-
}]
557-
}
558-
pulsar_app_config["dependency_resolution"] = dependency_resolution
559611
base64_message = to_base64_json(launch_params)
560612
base64_app_conf = to_base64_json(pulsar_app_config)
561613
pulsar_container_image = self.pulsar_container_image
@@ -607,21 +659,6 @@ def _pulsar_script_args(self, manager_name, base64_job, base64_app_conf, wait_ar
607659
manager_args.extend(["--base64", base64_job, "--app_conf_base64", base64_app_conf])
608660
return manager_args
609661

610-
def _ensure_manager_config(self, pulsar_app_config, manager_name, manager_type):
611-
if "manager" in pulsar_app_config:
612-
manager_config = pulsar_app_config["manager"]
613-
elif "managers" in pulsar_app_config:
614-
managers_config = pulsar_app_config["managers"]
615-
if manager_name not in managers_config:
616-
managers_config[manager_name] = {}
617-
manager_config = managers_config[manager_name]
618-
else:
619-
manager_config = {}
620-
pulsar_app_config["manager"] = manager_config
621-
if "type" not in manager_config:
622-
manager_config["type"] = manager_type
623-
return manager_config
624-
625662
def _launch_containers(
626663
self,
627664
pulsar_submit_container: CoexecutionContainerCommand,

pulsar/client/util.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ def _copy_and_close(object, output):
6060
@wraps(_b64encode)
6161
def b64encode(val, **kwargs):
6262
try:
63-
return _b64encode(val, **kwargs)
63+
return _b64encode(val, **kwargs).decode("utf-8")
6464
except TypeError:
6565
return _b64encode(val.encode('UTF-8'), **kwargs).decode('UTF-8')
6666

0 commit comments

Comments
 (0)