Skip to content

Commit 7c8371a

Browse files
mvdbeekkysrpex
authored andcommitted
Move get_pulsar_app_config() and _ensure_manager_config() from CoexecutionLaunchMixin to BaseRemoteConfiguredJobClient
1 parent f217ea2 commit 7c8371a

File tree

2 files changed

+93
-57
lines changed

2 files changed

+93
-57
lines changed

pulsar/client/client.py

Lines changed: 92 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,90 @@ def _build_setup_message(self, command_line, dependencies_description, env, remo
374374
launch_params["setup_params"] = setup_params
375375
return launch_params
376376

377+
def get_pulsar_app_config(
378+
self,
379+
pulsar_app_config,
380+
container,
381+
wait_after_submission,
382+
manager_name,
383+
manager_type,
384+
dependencies_description,
385+
):
386+
387+
pulsar_app_config = pulsar_app_config or {}
388+
manager_config = self._ensure_manager_config(
389+
pulsar_app_config,
390+
manager_name,
391+
manager_type,
392+
)
393+
394+
if (
395+
"staging_directory" not in manager_config and "staging_directory" not in pulsar_app_config
396+
):
397+
pulsar_app_config["staging_directory"] = CONTAINER_STAGING_DIRECTORY
398+
399+
if self.amqp_key_prefix:
400+
pulsar_app_config["amqp_key_prefix"] = self.amqp_key_prefix
401+
402+
if "monitor" not in manager_config:
403+
manager_config["monitor"] = (
404+
MonitorStyle.BACKGROUND.value
405+
if wait_after_submission
406+
else MonitorStyle.NONE.value
407+
)
408+
if "persistence_directory" not in pulsar_app_config:
409+
pulsar_app_config["persistence_directory"] = os.path.join(
410+
CONTAINER_STAGING_DIRECTORY, "persisted_data"
411+
)
412+
elif "manager" in pulsar_app_config and manager_name != "_default_":
413+
log.warning(
414+
"'manager' set in app config but client has non-default manager '%s', this will cause communication"
415+
" failures, remove `manager` from app or client config to fix",
416+
manager_name,
417+
)
418+
419+
using_dependencies = container is None and dependencies_description is not None
420+
if using_dependencies and "dependency_resolution" not in pulsar_app_config:
421+
# Setup default dependency resolution for container above...
422+
dependency_resolution = {
423+
"cache": False,
424+
"use": True,
425+
"default_base_path": "/pulsar_dependencies",
426+
"cache_dir": "/pulsar_dependencies/_cache",
427+
"resolvers": [
428+
{ # TODO: add CVMFS resolution...
429+
"type": "conda",
430+
"auto_init": True,
431+
"auto_install": True,
432+
"prefix": "/pulsar_dependencies/conda",
433+
},
434+
{
435+
"type": "conda",
436+
"auto_init": True,
437+
"auto_install": True,
438+
"prefix": "/pulsar_dependencies/conda",
439+
"versionless": True,
440+
},
441+
],
442+
}
443+
pulsar_app_config["dependency_resolution"] = dependency_resolution
444+
return pulsar_app_config
445+
446+
def _ensure_manager_config(self, pulsar_app_config, manager_name, manager_type):
447+
if "manager" in pulsar_app_config:
448+
manager_config = pulsar_app_config["manager"]
449+
elif "managers" in pulsar_app_config:
450+
managers_config = pulsar_app_config["managers"]
451+
if manager_name not in managers_config:
452+
managers_config[manager_name] = {}
453+
manager_config = managers_config[manager_name]
454+
else:
455+
manager_config = {}
456+
pulsar_app_config["manager"] = manager_config
457+
if "type" not in manager_config:
458+
manager_config["type"] = manager_type
459+
return manager_config
460+
377461

378462
class MessagingClientManagerProtocol(ClientManagerProtocol):
379463
status_cache: Dict[str, Dict[str, Any]]
@@ -492,7 +576,7 @@ def launch(
492576
container_info=None,
493577
token_endpoint=None,
494578
pulsar_app_config=None,
495-
staging_manifest=None
579+
staging_manifest=None,
496580
) -> Optional[ExternalId]:
497581
"""
498582
"""
@@ -514,48 +598,15 @@ def launch(
514598

515599
manager_name = self.client_manager.manager_name
516600
manager_type = "coexecution" if container is not None else "unqueued"
517-
pulsar_app_config = pulsar_app_config or {}
518-
manager_config = self._ensure_manager_config(
519-
pulsar_app_config, manager_name, manager_type,
601+
pulsar_app_config = self.get_pulsar_app_config(
602+
pulsar_app_config=pulsar_app_config,
603+
container=container,
604+
wait_after_submission=wait_after_submission,
605+
manager_name=manager_name,
606+
manager_type=manager_type,
607+
dependencies_description=dependencies_description,
520608
)
521609

522-
if "staging_directory" not in manager_config and "staging_directory" not in pulsar_app_config:
523-
pulsar_app_config["staging_directory"] = CONTAINER_STAGING_DIRECTORY
524-
525-
if self.amqp_key_prefix:
526-
pulsar_app_config["amqp_key_prefix"] = self.amqp_key_prefix
527-
528-
if "monitor" not in manager_config:
529-
manager_config["monitor"] = MonitorStyle.BACKGROUND.value if wait_after_submission else MonitorStyle.NONE.value
530-
if "persistence_directory" not in pulsar_app_config:
531-
pulsar_app_config["persistence_directory"] = os.path.join(CONTAINER_STAGING_DIRECTORY, "persisted_data")
532-
elif "manager" in pulsar_app_config and manager_name != '_default_':
533-
log.warning(
534-
"'manager' set in app config but client has non-default manager '%s', this will cause communication"
535-
" failures, remove `manager` from app or client config to fix", manager_name)
536-
537-
using_dependencies = container is None and dependencies_description is not None
538-
if using_dependencies and "dependency_resolution" not in pulsar_app_config:
539-
# Setup default dependency resolution for container above...
540-
dependency_resolution = {
541-
"cache": False,
542-
"use": True,
543-
"default_base_path": "/pulsar_dependencies",
544-
"cache_dir": "/pulsar_dependencies/_cache",
545-
"resolvers": [{ # TODO: add CVMFS resolution...
546-
"type": "conda",
547-
"auto_init": True,
548-
"auto_install": True,
549-
"prefix": '/pulsar_dependencies/conda',
550-
}, {
551-
"type": "conda",
552-
"auto_init": True,
553-
"auto_install": True,
554-
"prefix": '/pulsar_dependencies/conda',
555-
"versionless": True,
556-
}]
557-
}
558-
pulsar_app_config["dependency_resolution"] = dependency_resolution
559610
base64_message = to_base64_json(launch_params)
560611
base64_app_conf = to_base64_json(pulsar_app_config)
561612
pulsar_container_image = self.pulsar_container_image
@@ -607,21 +658,6 @@ def _pulsar_script_args(self, manager_name, base64_job, base64_app_conf, wait_ar
607658
manager_args.extend(["--base64", base64_job, "--app_conf_base64", base64_app_conf])
608659
return manager_args
609660

610-
def _ensure_manager_config(self, pulsar_app_config, manager_name, manager_type):
611-
if "manager" in pulsar_app_config:
612-
manager_config = pulsar_app_config["manager"]
613-
elif "managers" in pulsar_app_config:
614-
managers_config = pulsar_app_config["managers"]
615-
if manager_name not in managers_config:
616-
managers_config[manager_name] = {}
617-
manager_config = managers_config[manager_name]
618-
else:
619-
manager_config = {}
620-
pulsar_app_config["manager"] = manager_config
621-
if "type" not in manager_config:
622-
manager_config["type"] = manager_type
623-
return manager_config
624-
625661
def _launch_containers(
626662
self,
627663
pulsar_submit_container: CoexecutionContainerCommand,

pulsar/client/util.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ def _copy_and_close(object, output):
6060
@wraps(_b64encode)
6161
def b64encode(val, **kwargs):
6262
try:
63-
return _b64encode(val, **kwargs)
63+
return _b64encode(val, **kwargs).decode("utf-8")
6464
except TypeError:
6565
return _b64encode(val.encode('UTF-8'), **kwargs).decode('UTF-8')
6666

0 commit comments

Comments
 (0)