Skip to content

Commit 2a69ad5

Browse files
author
zhang_lu
authored
Merge pull request #179 from djwu563/release/v0.2.2
Add support for setting a domain for workers
2 parents 7ea8a84 + 8c02680 commit 2a69ad5

File tree

6 files changed

+24
-17
lines changed

6 files changed

+24
-17
lines changed

omagent-core/src/omagent_core/clients/devices/cli/client.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,18 +42,19 @@ def __init__(
4242
self._config_path = config_path
4343
self._workers = workers
4444
self._input_prompt = input_prompt
45+
self._task_to_domain = {}
4546

4647
def start_interactor(self):
4748
workflow_instance_id = None
4849
try:
4950
absolute_path = Path(self._config_path).resolve()
5051
worker_config = build_from_file(self._config_path)
5152
self._task_handler_interactor = TaskHandler(
52-
worker_config=worker_config, workers=self._workers
53+
worker_config=worker_config, workers=self._workers, task_to_domain=self._task_to_domain
5354
)
5455
self._task_handler_interactor.start_processes()
5556
workflow_instance_id = self._interactor.start_workflow_with_input(
56-
workflow_input={}
57+
workflow_input={}, task_to_domain=self._task_to_domain
5758
)
5859

5960
stream_name = f"{workflow_instance_id}_output"
@@ -180,11 +181,11 @@ def start_processor(self):
180181
try:
181182
worker_config = build_from_file(self._config_path)
182183
self._task_handler_processor = TaskHandler(
183-
worker_config=worker_config, workers=self._workers
184+
worker_config=worker_config, workers=self._workers, task_to_domain=self._task_to_domain
184185
)
185186
self._task_handler_processor.start_processes()
186187
workflow_instance_id = self._processor.start_workflow_with_input(
187-
workflow_input={}
188+
workflow_input={}, task_to_domain=self._task_to_domain
188189
)
189190
user_input = input(
190191
f"{Fore.GREEN}Please input a folder path of images:(WaitPress Enter to finish the entire input.):\n>>>{Style.RESET_ALL}"

omagent-core/src/omagent_core/clients/devices/programmatic/client.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,21 +22,22 @@ def __init__(
2222
self._config_path = config_path
2323
self._workers = workers
2424
self._task_handler_processor = None
25+
self._task_to_domain = {}
2526

2627
def start_processor(self):
2728
worker_config = build_from_file(self._config_path)
2829
self._task_handler_processor = TaskHandler(
2930
worker_config=worker_config, workers=self._workers
3031
)
3132
self._task_handler_processor.start_processes()
32-
self._processor.start_workflow_with_input(workflow_input={})
33+
self._processor.start_workflow_with_input(workflow_input={}, task_to_domain=self._task_to_domain)
3334

3435
def start_processor_with_input(self, workflow_input: dict):
3536
try:
3637
if self._task_handler_processor is None:
3738
worker_config = build_from_file(self._config_path)
3839
self._task_handler_processor = TaskHandler(
39-
worker_config=worker_config, workers=self._workers
40+
worker_config=worker_config, workers=self._workers, task_to_domain=self._task_to_domain
4041
)
4142
self._task_handler_processor.start_processes()
4243
return self._process_workflow(self._processor, workflow_input)
@@ -46,8 +47,9 @@ def start_processor_with_input(self, workflow_input: dict):
4647
def start_batch_processor(self, workflow_input_list: list[dict], max_tasks: int = 10):
4748
results = [None] * len(workflow_input_list)
4849
worker_config = build_from_file(self._config_path)
49-
self._task_handler_processor = TaskHandler(worker_config=worker_config, workers=self._workers)
50-
self._task_handler_processor.start_processes()
50+
if self._task_handler_processor is None:
51+
self._task_handler_processor = TaskHandler(worker_config=worker_config, workers=self._workers, task_to_domain=self._task_to_domain)
52+
self._task_handler_processor.start_processes()
5153

5254
result_queue = multiprocessing.Queue()
5355
active_processes = []
@@ -87,7 +89,7 @@ def _process_workflow(self, workflow: ConductorWorkflow, workflow_input: dict):
8789
workflow_instance_id = None
8890
try:
8991
workflow_instance_id = workflow.start_workflow_with_input(
90-
workflow_input=workflow_input
92+
workflow_input=workflow_input, task_to_domain=self._task_to_domain
9193
)
9294
while True:
9395
status = workflow.get_workflow(workflow_id=workflow_instance_id).status

omagent-core/src/omagent_core/clients/devices/webpage/client.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ def __init__(
3636
self._config_path = config_path
3737
self._workers = workers
3838
self._workflow_instance_id = None
39+
self._worker_config = build_from_file(self._config_path)
40+
self._task_to_domain = {}
3941
self._incomplete_message = ""
4042
self._custom_css = """
4143
#OmAgent {
@@ -70,9 +72,8 @@ def __init__(
7072
"""
7173

7274
def start_interactor(self):
73-
worker_config = build_from_file(self._config_path)
7475
self._task_handler_interactor = TaskHandler(
75-
worker_config=worker_config, workers=self._workers
76+
worker_config=self._worker_config, workers=self._workers, task_to_domain=self._task_to_domain
7677
)
7778
self._task_handler_interactor.start_processes()
7879
try:
@@ -113,9 +114,8 @@ def stop_interactor(self):
113114
self._task_handler_interactor.stop_processes()
114115

115116
def start_processor(self):
116-
worker_config = build_from_file(self._config_path)
117117
self._task_handler_processor = TaskHandler(
118-
worker_config=worker_config, workers=self._workers
118+
worker_config=self._worker_config, workers=self._workers, task_to_domain=self._task_to_domain
119119
)
120120
self._task_handler_processor.start_processes()
121121

@@ -161,7 +161,7 @@ def stop_processor(self):
161161
def add_message(self, history, message):
162162
if self._workflow_instance_id is None:
163163
self._workflow_instance_id = self._interactor.start_workflow_with_input(
164-
workflow_input={}
164+
workflow_input={}, task_to_domain=self._task_to_domain
165165
)
166166
contents = []
167167
for x in message["files"]:
@@ -184,7 +184,7 @@ def add_message(self, history, message):
184184
def add_processor_message(self, history, message):
185185
if self._workflow_instance_id is None:
186186
self._workflow_instance_id = self._processor.start_workflow_with_input(
187-
workflow_input={}
187+
workflow_input={}, task_to_domain=self._task_to_domain
188188
)
189189
image_items = []
190190
for idx, x in enumerate(message["files"]):

omagent-core/src/omagent_core/engine/automator/task_handler.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ def __init__(
5454
workers: List[BaseWorker] = [],
5555
metrics_settings: MetricsSettings = None,
5656
import_modules: List[str] = None,
57+
task_to_domain: dict[str, str] = None,
5758
):
5859
"""Initialize a new TaskHandler instance.
5960
@@ -95,7 +96,8 @@ def __init__(
9596
)
9697
workers.extend([worker_cls(**config) for _ in range(concurrency)])
9798
for worker in workers:
98-
worker.task_definition_name = worker.id or worker.name
99+
if task_to_domain is not None:
100+
task_to_domain[worker.task_definition_name] = worker.domain
99101
self.__create_task_runner_processes(
100102
workers, container.conductor_config, metrics_settings
101103
)

omagent-core/src/omagent_core/engine/automator/task_runner.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,8 @@ def __poll_task(self) -> Task:
9696
else:
9797
task = None
9898
else:
99+
if domain is not None:
100+
params["domain"] = domain
99101
task = self.task_client.poll(tasktype=task_definition_name, **params)
100102

101103
finish_time = time.time()

omagent-core/src/omagent_core/engine/worker/base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ class BaseWorker(BotBase, ABC):
5757
_task_type: Optional[str] = None
5858

5959
def model_post_init(self, __context: Any) -> None:
60-
self.task_definition_name = self.name
60+
self.task_definition_name = self.id or self.name
6161
self.next_task_index = 0
6262
self._task_definition_name_cache = None
6363

0 commit comments

Comments
 (0)