Update submitter for pre/post tasks#8
Conversation
…s after adding pre/post tasks on Meshroom
There was a problem hiding this comment.
Pull request overview
This PR updates the Tractor submitter integration to work with Meshroom’s new pre/post-task “OrderedTasks” submission model, shifting graph “cooking” to Meshroom and simplifying the submitter to straightforward task creation + dependency wiring.
Changes:
- Renames the Tractor stdout-wrapper script from
tractorSubtaskWrapperto the more generaltractorExpanderand updates related references/env vars. - Refactors Tractor submission to build tasks from Meshroom
OrderedTasksand introduces task-type handling (expanding/chunk/preprocess/postprocess/placeholder). - Removes the old
tractorJobCreation.pyimplementation and adaptsTaskInfo/chunk queuing to the new task-type approach.
Reviewed changes
Copilot reviewed 6 out of 7 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
test/createJob.py |
Updates test job creation to call the renamed tractorExpander. |
script/tractorExpander.py |
Renamed/generalized wrapper that reserves stdout for Tractor expansion commands and forwards normal output to stderr. |
meshroom/tractorSubmitter/tractorSubmitter.py |
Major refactor: constructs Tractor tasks from Meshroom OrderedTasks, adds task-type mapping, and introduces new in-file Job/Task wrappers. |
meshroom/tractorSubmitter/api/tractorJobCreation.py |
Removed legacy job/task graph cooking implementation (now handled upstream by Meshroom). |
meshroom/tractorSubmitter/api/subtaskCreator.py |
Updates expander integration + rewrites chunk subtask emission; introduces a currently broken getChunks helper. |
meshroom/tractorSubmitter/api/base.py |
Reworks TaskInfo to use reqPackages + task-type flags (expanding/pre/post/chunk/placeholder) and updates command cooking accordingly. |
Comments suppressed due to low confidence (1)
meshroom/tractorSubmitter/tractorSubmitter.py:121
serialsubtasksis computed fromrootTasks, but whenserialsubtasksis True, no root task is created androotTasksis otherwise unused. If you intend a consistent graph shape, it may be simpler/safer to always create a single job root task and attach only true roots beneath it.
rootTasks = self.getRootTasks()
serialsubtasks = len(rootTasks) == 1
if not serialsubtasks:
rootTaskName = self.jobInfo.name + " (root)"
rootTask = tractorJob.newTask(title=rootTaskName, argv=None, serialsubtasks=serialsubtasks)
for task in rootTasks:
rootTask.addChild(task.tractorTask)
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| def getChunks(chunkParams): | ||
| it = None | ||
| ignoreIterations = chunkParams.get("ignoreIterations", []) | ||
| if chunkParams: | ||
| start, end = chunkParams.get("start", -1), chunkParams.get("end", -2) | ||
| size = 1 | ||
| frameRange = list(range(start, end+1, 1)) | ||
| if frameRange: | ||
| it = [ | ||
| Chunk(i, ) | ||
| ] | ||
| slices = [frameRange[i : i+1] for i in range(0, len(frameRange))] | ||
| it = [Chunk(i, item[0], item[-1]) for i, item in enumerate(slices) | ||
| if i not in ignoreIterations] | ||
| return it |
| self.taskTependencies = {} # task: [tasks that the task depends on] | ||
|
|
||
| def addTask(self, task: Task): | ||
| self.tasks.append(task) | ||
| self.taskTependencies[task] = [] | ||
|
|
||
| def addTaskDependency(self, parentTask: Task, childTask: Task): | ||
| parentTask.tractorTask.addChild(childTask.tractorTask) | ||
| self.taskTependencies[parentTask].append(childTask) | ||
|
|
||
| def getRootTasks(self): | ||
| """ Get all tasks that are not children of other tasks """ | ||
| tasksWithoutDeps = set(self.tasks) | ||
| for _, childTasks in self.taskTependencies.items(): |
| # Create the job task (no command, at the graph root) | ||
| rootTasks = self.getRootTasks() | ||
| serialsubtasks = len(rootTasks) == 1 | ||
| if not serialsubtasks: | ||
| rootTaskName = self.jobInfo.name + " (root)" | ||
| rootTask = tractorJob.newTask(title=rootTaskName, argv=None, serialsubtasks=serialsubtasks) | ||
| for task in rootTasks: | ||
| rootTask.addChild(task.tractorTask) | ||
| # Cook tasks | ||
| taskToTractorTask = {} | ||
| for task in self.tasks: | ||
| tractorTask = task.tractorTask | ||
| tractorJob.addChild(tractorTask) | ||
| taskToTractorTask[task] = tractorTask |
| elif orderedTask.taskType == OrderedTaskType.PLACEHOLDER: | ||
| taskType = (None, None) |
| def createJob(self, orderedTasks: OrderedTasks, filepath, submitLabel="{projectName}") -> TractorJob: | ||
| # Create job | ||
| projectName = os.path.splitext(os.path.basename(filepath))[0] | ||
| name = submitLabel.format(projectName=projectName) | ||
| comment = filepath | ||
| maxNodeSize = max([node.size for node in nodes]) | ||
| mainTags = { |
| res = job.submit(share=self.share, dryRun=self.dryRun) | ||
| if self.dryRun: | ||
| return True |
cbentejac
left a comment
There was a problem hiding this comment.
Minor comments, mostly for code readability. Functionally speaking, this looks good to me.
| # if taskInfos.expandingTask: | ||
| # cmd.expand = taskInfos.expandingFile |
There was a problem hiding this comment.
| # if taskInfos.expandingTask: | |
| # cmd.expand = taskInfos.expandingFile |
|
|
||
| @staticmethod | ||
| def createDummyTask(tractorJob: tractorAuthor.Job): | ||
| """ tractor API will raise a RequiredValueError if no task are |
There was a problem hiding this comment.
| """ tractor API will raise a RequiredValueError if no task are | |
| """ Tractor API will raise a RequiredValueError if no task is |
| tractorJob.priority = PRIORITY_DICT.get(priority, PRIORITY_DICT["normal"]) | ||
|
|
||
| if dryRun: | ||
| logging.info("TractorSubmitter: Job in TCL format :") |
There was a problem hiding this comment.
| logging.info("TractorSubmitter: Job in TCL format :") | |
| logging.info("TractorSubmitter: Job in TCL format:") |
| self.placeholderTask = (taskType_=="placeholder") | ||
| self.expandingTask = (taskType_=="expanding") | ||
| self.preprocessTask = (taskType_=="preprocess") | ||
| self.postprocessTask = (taskType_=="postprocess") | ||
| self.chunkTask = (taskType_=="chunk") |
There was a problem hiding this comment.
Nitpicking comment, for readability purposes:
| self.placeholderTask = (taskType_=="placeholder") | |
| self.expandingTask = (taskType_=="expanding") | |
| self.preprocessTask = (taskType_=="preprocess") | |
| self.postprocessTask = (taskType_=="postprocess") | |
| self.chunkTask = (taskType_=="chunk") | |
| self.placeholderTask = (taskType_ == "placeholder") | |
| self.expandingTask = (taskType_ == "expanding") | |
| self.preprocessTask = (taskType_ == "preprocess") | |
| self.postprocessTask = (taskType_ == "postprocess") | |
| self.chunkTask = (taskType_ == "chunk") |
| service=None, licenses=None, tags=None, | ||
| expandingTask=False, chunkParams=None): | ||
| environment=None, reqPackages=None, service=None, | ||
| licenses=None, taskType=None, tags=None): |
There was a problem hiding this comment.
It'd be useful to mention somewhere that taskType is either None or a tuple, either here or at line 224 (which very quickly made me question my understanding of what was happening here 👼 ).
| currentDir = os.path.dirname(os.path.realpath(__file__)) | ||
| binDir = os.path.dirname(os.path.dirname(os.path.dirname(currentDir))) | ||
|
|
||
| CreatedTask = namedtuple("task", ["task", "chunkParams"]) |
There was a problem hiding this comment.
| CreatedTask = namedtuple("task", ["task", "chunkParams"]) |
This is not used anywhere else in the code, it can be removed (and so can the from collections import namedtuple statement).
Description
This PR aims to update the submitter so that it works with the pre/post task feature in Meshroom.
It requires this PR in order to work.
Make sure to follow these steps :
submitmethod fromBaseSubmitter#4Content of the PR
Changes :
OrderedTasksobject.Additions :
tractorSubtaskWrapperscript has been renamed totractorExpanderto make it more general-purpose.Removal :
sizeconcept in the chunks. That was also a feature that we didn't use so there was no reason to keep it.