diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5b496aa --- /dev/null +++ b/.gitignore @@ -0,0 +1,10 @@ +venv/ +.idea/ +__pycache__/ +RunOnCluster.egg-info/ +CellProfiler-plugins/ +*/__pycache__/ +__init__.py +setup.py +loadimagesfromomero.py +*.pyc diff --git a/CPRynner/CPRynner.py b/CPRynner/CPRynner.py index 3990dad..ffd4eff 100644 --- a/CPRynner/CPRynner.py +++ b/CPRynner/CPRynner.py @@ -3,20 +3,21 @@ clusterview and runnoncluster plugins. """ -from future import * +import os +import tempfile +import wx # Libsubmit creates a .script file in the working directory. # To avoid clutter, we run in a temp directory -import tempfile, os -workdir = tempfile.mkdtemp() -os.chdir(workdir) - -import wx -from rynner.rynner import Rynner +from cellprofiler_core.preferences import get_default_output_directory from libsubmit import SSHChannel -from libsubmit.providers.slurm.slurm import SlurmProvider -from libsubmit.launchers.launchers import SimpleLauncher from libsubmit.channels.errors import SSHException +from libsubmit.launchers.launchers import SimpleLauncher +from libsubmit.providers.slurm.slurm import SlurmProvider +from rynner.rynner import Rynner + +workdir = tempfile.mkdtemp() +os.chdir(workdir) class clusterSettingDialog(wx.Dialog): @@ -24,80 +25,147 @@ class clusterSettingDialog(wx.Dialog): A dialog window for setting cluster parameters """ - def __init__(self, cluster_address, tasks_per_node, work_dir, setup_script ): + def __init__( + self, cluster_address, tasks_per_node, work_dir, setup_script, run_command + ): """Constructor""" - super(clusterSettingDialog, self).__init__(None, title="Login", size = (420,480)) + super().__init__(None, title="Login", size=(420, 540)) self.panel = wx.Panel(self) # cluster_address field cluster_address_sizer = wx.BoxSizer(wx.HORIZONTAL) - cluster_address_label = wx.StaticText(self.panel, label="Cluster Address:", size=(100, -1)) - cluster_address_label.SetToolTip(wx.ToolTip( - "The URL of the cluster. If logged in, changing this will cause you to logout." - )) - cluster_address_sizer.Add(cluster_address_label, 0, wx.ALL|wx.CENTER, 5, ) - self.cluster_address = wx.TextCtrl(self.panel, value = cluster_address, size=(300, -1)) + cluster_address_label = wx.StaticText( + self.panel, label="Cluster Address:", size=(100, -1) + ) + cluster_address_label.SetToolTip( + wx.ToolTip( + "The URL of the cluster. If logged in, changing this will cause you to logout." + ) + ) + cluster_address_sizer.Add( + cluster_address_label, + 0, + wx.ALL | wx.CENTER, + 5, + ) + self.cluster_address = wx.TextCtrl( + self.panel, style=wx.TE_PROCESS_ENTER, value=cluster_address, size=(300, -1) + ) cluster_address_sizer.Add(self.cluster_address, 0, wx.ALL, 5) # tasks_per_node field tasks_per_node_sizer = wx.BoxSizer(wx.HORIZONTAL) - tasks_per_node_label = wx.StaticText(self.panel, label="Tasks Per Node:", size=(300, -1)) - tasks_per_node_label.SetToolTip(wx.ToolTip( - "The number of individual processes that can be run on a single node. Usually the number of cpu core per node." - )) - tasks_per_node_sizer.Add(tasks_per_node_label, 0, wx.ALL|wx.CENTER, 5) - self.tasks_per_node = wx.SpinCtrl(self.panel, value = str(tasks_per_node), size=(100, -1)) + tasks_per_node_label = wx.StaticText( + self.panel, label="Tasks Per Node:", size=(300, -1) + ) + tasks_per_node_label.SetToolTip( + wx.ToolTip( + "The number of individual processes that can be run on a single node. Usually the number of cpu core per node." + ) + ) + tasks_per_node_sizer.Add(tasks_per_node_label, 0, wx.ALL | wx.CENTER, 5) + self.tasks_per_node = wx.SpinCtrl( + self.panel, value=str(tasks_per_node), size=(100, -1) + ) tasks_per_node_sizer.Add(self.tasks_per_node, 0, wx.ALL, 5) - + # max_runtime field - max_runtime = str( cluster_max_runtime() ) + max_runtime = str(cluster_max_runtime()) max_runtime_sizer = wx.BoxSizer(wx.HORIZONTAL) - max_runtime_label = wx.StaticText(self.panel, label="Runtime limit (hours):", size=(300, -1)) - max_runtime_label.SetToolTip(wx.ToolTip( - "The maximum runtime limit on the cluster. You can set a lower limit for individual jobs." - )) - max_runtime_sizer.Add(max_runtime_label, 0, wx.ALL|wx.CENTER, 5) - self.max_runtime = wx.SpinCtrl(self.panel, value = str(max_runtime), size=(100, -1)) + max_runtime_label = wx.StaticText( + self.panel, label="Runtime limit (hours):", size=(300, -1) + ) + max_runtime_label.SetToolTip( + wx.ToolTip( + "The maximum runtime limit on the cluster. You can set a lower limit for individual jobs." + ) + ) + max_runtime_sizer.Add(max_runtime_label, 0, wx.ALL | wx.CENTER, 5) + self.max_runtime = wx.SpinCtrl( + self.panel, value=str(max_runtime), size=(100, -1) + ) max_runtime_sizer.Add(self.max_runtime, 0, wx.ALL, 5) # work_dir field work_dir_sizer = wx.BoxSizer(wx.HORIZONTAL) - work_dir_label = wx.StaticText(self.panel, label="Working Directory:", size=(100, -1)) - work_dir_label.SetToolTip(wx.ToolTip( - "Path to the directory in which the job is run. The {username} tag is replaced by your username on the cluster and is useful when multiple people use the same local machine." - )) - work_dir_sizer.Add(work_dir_label, 0, wx.ALL|wx.CENTER, 5) - self.work_dir = wx.TextCtrl(self.panel, value = work_dir, size=(300, -1)) + work_dir_label = wx.StaticText( + self.panel, label="Working Directory:", size=(100, -1) + ) + work_dir_label.SetToolTip( + wx.ToolTip( + "Path to the directory in which the job is run. The {username} tag is replaced by your username on the cluster and is useful when multiple people use the same local machine." + ) + ) + work_dir_sizer.Add(work_dir_label, 0, wx.ALL | wx.CENTER, 5) + self.work_dir = wx.TextCtrl( + self.panel, style=wx.TE_PROCESS_ENTER, value=work_dir, size=(300, -1) + ) work_dir_sizer.Add(self.work_dir, 0, wx.ALL, 5) # setup_script field setup_script_label_sizer = wx.BoxSizer(wx.HORIZONTAL) - setup_script_label = wx.StaticText(self.panel, label="Setup Script:", size=(100, -1)) - setup_script_label.SetToolTip(wx.ToolTip( - "Run at the start of each job. Used for setting up the run, for example loading the CellProfiler module. Can be any general bash script, but may be limited by the cluster. For example, compute nodes often lack internet access and cannot install python packages." - )) - setup_script_label_sizer.Add(setup_script_label, 0, wx.ALL|wx.CENTER, 5) + setup_script_label = wx.StaticText( + self.panel, label="Setup Script:", size=(100, -1) + ) + setup_script_label.SetToolTip( + wx.ToolTip( + "Run at the start of each job. Used for setting up the run, for example loading the CellProfiler module. Can be any general bash script, but may be limited by the cluster. For example, compute nodes often lack internet access and cannot install python packages." + ) + ) + setup_script_label_sizer.Add(setup_script_label, 0, wx.ALL | wx.CENTER, 5) setup_script_field_sizer = wx.BoxSizer(wx.HORIZONTAL) - self.setup_script = wx.TextCtrl(self.panel, value = setup_script, size=(400, 80), style=wx.TE_MULTILINE) + self.setup_script = wx.TextCtrl( + self.panel, value=setup_script, size=(400, 80), style=wx.TE_MULTILINE + ) setup_script_field_sizer.Add(self.setup_script, 0, wx.ALL, 5) + # run_command field + run_command_label_sizer = wx.BoxSizer(wx.HORIZONTAL) + run_command_label = wx.StaticText( + self.panel, label="Run Command:", size=(100, -1) + ) + run_command_label.SetToolTip( + wx.ToolTip( + "Bash command to execute CellProfiler on the nodes specified in the job script. Command line options for pipeline, image sets and output are specified automatically later." + ) + ) + run_command_label_sizer.Add(run_command_label, 0, wx.ALL | wx.CENTER, 5) + + run_command_field_sizer = wx.BoxSizer(wx.HORIZONTAL) + self.run_command = wx.TextCtrl( + self.panel, value=run_command, size=(400, 80), style=wx.TE_MULTILINE + ) + run_command_field_sizer.Add(self.run_command, 0, wx.ALL, 5) + # The Ok and Cancel button button_sizer = wx.BoxSizer(wx.HORIZONTAL) self.ok_button = wx.Button(self.panel, wx.ID_OK, label="Ok", size=(60, 30)) - button_sizer.Add(self.ok_button, 0, wx.ALL , 5) + button_sizer.Add(self.ok_button, 0, wx.ALL, 5) + + self.cancel_btn = wx.Button( + self.panel, wx.ID_CANCEL, label="Cancel", size=(60, 30) + ) + button_sizer.Add(self.cancel_btn, 0, wx.ALL, 5) - self.cancel_btn = wx.Button(self.panel, wx.ID_CANCEL, label="Cancel", size=(60, 30)) - button_sizer.Add(self.cancel_btn, 0, wx.ALL , 5) - # Bind enter press to the Ok button - button_event = wx.PyCommandEvent(wx.EVT_BUTTON.typeId,self.ok_button.GetId()) - self.cluster_address.Bind( wx.EVT_TEXT_ENTER, lambda e: wx.PostEvent(self, button_event) ) - self.tasks_per_node.Bind( wx.EVT_TEXT_ENTER, lambda e: wx.PostEvent(self, button_event) ) - self.max_runtime.Bind( wx.EVT_TEXT_ENTER, lambda e: wx.PostEvent(self, button_event) ) - self.max_runtime.Bind( wx.EVT_TEXT_ENTER, lambda e: wx.PostEvent(self, button_event) ) - self.work_dir.Bind( wx.EVT_TEXT_ENTER, lambda e: wx.PostEvent(self, button_event) ) + button_event = wx.PyCommandEvent(wx.EVT_BUTTON.typeId, self.ok_button.GetId()) + self.cluster_address.Bind( + wx.EVT_TEXT_ENTER, lambda e: wx.PostEvent(self, button_event) + ) + self.tasks_per_node.Bind( + wx.EVT_TEXT_ENTER, lambda e: wx.PostEvent(self, button_event) + ) + self.max_runtime.Bind( + wx.EVT_TEXT_ENTER, lambda e: wx.PostEvent(self, button_event) + ) + self.max_runtime.Bind( + wx.EVT_TEXT_ENTER, lambda e: wx.PostEvent(self, button_event) + ) + self.work_dir.Bind( + wx.EVT_TEXT_ENTER, lambda e: wx.PostEvent(self, button_event) + ) # Build the layout main_sizer = wx.BoxSizer(wx.VERTICAL) @@ -107,8 +175,10 @@ def __init__(self, cluster_address, tasks_per_node, work_dir, setup_script ): main_sizer.Add(work_dir_sizer, 0, wx.ALL, 5) main_sizer.Add(setup_script_label_sizer, 0, wx.ALL, 5) main_sizer.Add(setup_script_field_sizer, 0, wx.ALL, 5) + main_sizer.Add(run_command_label_sizer, 0, wx.ALL, 5) + main_sizer.Add(run_command_field_sizer, 0, wx.ALL, 5) main_sizer.Add(button_sizer, 0, wx.ALL | wx.ALIGN_CENTER, 5) - + self.panel.SetSizer(main_sizer) @@ -116,53 +186,64 @@ class LoginDialog(wx.Dialog): """ A dialog window asking for a username and a password """ - - def __init__(self, username = ''): - """Constructor""" - super(LoginDialog, self).__init__(None, title="Login", size = (300,180)) + def __init__(self, username=""): + """Constructor""" + super().__init__(None, title="Login", size=(300, 180)) self.panel = wx.Panel(self) # username field username_sizer = wx.BoxSizer(wx.HORIZONTAL) username_label = wx.StaticText(self.panel, label="Username:") - username_sizer.Add(username_label, 0, wx.ALL|wx.CENTER, 5) - self.username = wx.TextCtrl(self.panel, value = username, size=(160, -1)) + username_sizer.Add(username_label, 0, wx.ALL | wx.CENTER, 5) + self.username = wx.TextCtrl( + self.panel, value=username, size=(160, -1), style=wx.TE_PROCESS_ENTER + ) username_sizer.Add(self.username, 0, wx.ALL, 5) - + # password field password_sizer = wx.BoxSizer(wx.HORIZONTAL) password_label = wx.StaticText(self.panel, label="Password: ") - password_sizer.Add(password_label, 0, wx.ALL|wx.CENTER, 5) - self.password = wx.TextCtrl(self.panel, size=(160, -1), style=wx.TE_PASSWORD|wx.TE_PROCESS_ENTER) + password_sizer.Add(password_label, 0, wx.ALL | wx.CENTER, 5) + self.password = wx.TextCtrl( + self.panel, size=(160, -1), style=wx.TE_PASSWORD | wx.TE_PROCESS_ENTER + ) password_sizer.Add(self.password, 0, wx.ALL, 5) - # The login and cancel button + # The login and cancel button button_sizer = wx.BoxSizer(wx.HORIZONTAL) self.ok_button = wx.Button(self.panel, wx.ID_OK, label="Login", size=(60, 30)) - button_sizer.Add(self.ok_button, 0, wx.ALL , 5) + button_sizer.Add(self.ok_button, 0, wx.ALL, 5) - self.cancel_btn = wx.Button(self.panel, wx.ID_CANCEL, label="Cancel", size=(60, 30)) - button_sizer.Add(self.cancel_btn, 0, wx.ALL , 5) + self.cancel_btn = wx.Button( + self.panel, wx.ID_CANCEL, label="Cancel", size=(60, 30) + ) + button_sizer.Add(self.cancel_btn, 0, wx.ALL, 5) - self.settings_button = wx.Button(self.panel, wx.ID_PREFERENCES, label="Settings", size=(60, 30)) - button_sizer.Add(self.settings_button, 0, wx.ALL , 5) + self.settings_button = wx.Button( + self.panel, wx.ID_PREFERENCES, label="Settings", size=(60, 30) + ) + button_sizer.Add(self.settings_button, 0, wx.ALL, 5) # Bind enter press to the Login button - button_event = wx.PyCommandEvent(wx.EVT_BUTTON.typeId,self.ok_button.GetId()) - self.username.Bind( wx.EVT_TEXT_ENTER, lambda e: wx.PostEvent(self, button_event) ) - self.password.Bind( wx.EVT_TEXT_ENTER, lambda e: wx.PostEvent(self, button_event) ) + button_event = wx.PyCommandEvent(wx.EVT_BUTTON.typeId, self.ok_button.GetId()) + self.username.Bind( + wx.EVT_TEXT_ENTER, lambda e: wx.PostEvent(self, button_event) + ) + self.password.Bind( + wx.EVT_TEXT_ENTER, lambda e: wx.PostEvent(self, button_event) + ) - # Bind - self.settings_button.Bind(wx.EVT_BUTTON, self.settings ) + # Bind + self.settings_button.Bind(wx.EVT_BUTTON, self.settings) # Build the layout main_sizer = wx.BoxSizer(wx.VERTICAL) main_sizer.Add(username_sizer, 0, wx.ALL, 5) main_sizer.Add(password_sizer, 0, wx.ALL, 5) main_sizer.Add(button_sizer, 0, wx.ALL | wx.ALIGN_CENTER, 5) - + self.panel.SetSizer(main_sizer) def settings(self, button_event): @@ -170,82 +251,99 @@ def settings(self, button_event): def _get_username_and_password(): - cnfg = wx.Config('CPRynner') + cnfg = wx.Config("CPRynner") - if cnfg.Read('cluster_address') == '': + if cnfg.Read("cluster_address") == "": wx.MessageBox( "The cluster address is not set. Please edit the cluster settings.", caption="Cluster Required", - style=wx.OK | wx.ICON_INFORMATION) + style=wx.OK | wx.ICON_INFORMATION, + ) update_cluster_parameters() - if cnfg.Exists('username'): - username = cnfg.Read('username') + if cnfg.Exists("username"): + username = cnfg.Read("username") else: - username = '' + username = "" - dialog = LoginDialog( username ) + dialog = LoginDialog(username) result = dialog.ShowModal() if result == wx.ID_OK: username = dialog.username.GetValue() password = dialog.password.GetValue() - cnfg.Write('username', username) + cnfg.Write("username", username) return [username, password] else: - return [None,None] + return [None, None] dialog.Destroy() def cluster_tasks_per_node(): - cnfg = wx.Config('CPRynner') - if cnfg.Exists('tasks_per_node'): - tasks_per_node = cnfg.Read('tasks_per_node') + cnfg = wx.Config("CPRynner") + if cnfg.Exists("tasks_per_node"): + tasks_per_node = cnfg.Read("tasks_per_node") else: - tasks_per_node = '' + tasks_per_node = "" return tasks_per_node + def cluster_setup_script(): - cnfg = wx.Config('CPRynner') - if cnfg.Exists('setup_script'): - setup_script = cnfg.Read('setup_script') + cnfg = wx.Config("CPRynner") + if cnfg.Exists("setup_script"): + setup_script = cnfg.Read("setup_script") + setup_script = setup_script.replace("\r\n", "\n") else: - setup_script = """\ -module load cellprofiler; -module load java;""" + setup_script = "module load cellprofiler; module load java;" return setup_script + +def cluster_run_command(): + cnfg = wx.Config("CPRynner") + if cnfg.Exists("run_command"): + run_command = cnfg.Read("run_command") + else: + run_command = "singularity exec $CELLPROFILER_IMG cellprofiler -c" + return run_command + + def cluster_work_dir(): - cnfg = wx.Config('CPRynner') - if cnfg.Exists('work_dir'): - work_dir = cnfg.Read('work_dir') + cnfg = wx.Config("CPRynner") + if cnfg.Exists("work_dir"): + work_dir = cnfg.Read("work_dir") else: - work_dir = '/scratch/{username}/CellProfiler/' + work_dir = "/scratch/{username}/CellProfiler/" return work_dir + def cluster_url(): - cnfg = wx.Config('CPRynner') - if cnfg.Exists('cluster_address'): - cluster_address = cnfg.Read('cluster_address') + cnfg = wx.Config("CPRynner") + if cnfg.Exists("cluster_address"): + cluster_address = cnfg.Read("cluster_address") else: - cluster_address = '' + cluster_address = "" return cluster_address + def cluster_max_runtime(): - cnfg = wx.Config('CPRynner') - if cnfg.Exists('max_runtime'): - max_runtime = cnfg.Read('max_runtime') + cnfg = wx.Config("CPRynner") + if cnfg.Exists("max_runtime"): + max_runtime = cnfg.Read("max_runtime") else: - max_runtime = '' + max_runtime = 0 return int(max_runtime) + def update_cluster_parameters(): cluster_address = cluster_url() work_dir = cluster_work_dir() tasks_per_node = cluster_tasks_per_node() setup_script = cluster_setup_script() - dialog = clusterSettingDialog( cluster_address, tasks_per_node, work_dir, setup_script ) + run_command = cluster_run_command() + dialog = clusterSettingDialog( + cluster_address, tasks_per_node, work_dir, setup_script, run_command + ) result = dialog.ShowModal() if result == wx.ID_OK: cluster_address = dialog.cluster_address.GetValue() @@ -254,71 +352,74 @@ def update_cluster_parameters(): work_dir = dialog.work_dir.GetValue() setup_script = dialog.setup_script.GetValue() - cnfg = wx.Config('CPRynner') - cnfg.Write('cluster_address', cluster_address) - cnfg.Write('tasks_per_node', str(tasks_per_node)) - cnfg.Write('max_runtime', str(max_runtime)) - cnfg.Write('work_dir', work_dir) - cnfg.Write('setup_script', setup_script) + cnfg = wx.Config("CPRynner") + cnfg.Write("cluster_address", cluster_address) + cnfg.Write("tasks_per_node", str(tasks_per_node)) + cnfg.Write("max_runtime", str(max_runtime)) + cnfg.Write("work_dir", work_dir) + cnfg.Write("setup_script", setup_script) + cnfg.Write("run_command", run_command) dialog.Destroy() def _create_rynner(): - ''' Create an instance of Rynner connected to the cluster - ''' + """Create an instance of Rynner connected to the cluster""" hostname = cluster_url() work_dir = cluster_work_dir() + local_dir = get_default_output_directory() tasks_per_node = cluster_tasks_per_node() username, password = _get_username_and_password() if username is not None: work_dir = work_dir.format(username=username) + print("Checking rynner directories on initialisation:") + print(f"Cluster working directory: {work_dir}") - tmpdir = tempfile.mkdtemp() - provider = SlurmProvider( - 'compute', + "compute", channel=SSHChannel( hostname=hostname, username=username, password=password, - script_dir=work_dir, + script_dir=work_dir, # Channel script_dir is where it pushes the script to ), - script_dir=tmpdir, + script_dir=local_dir, # Provider script_dir is where it saves the local copy nodes_per_block=1, tasks_per_node=int(tasks_per_node), - walltime="01:00:00", # Overwritten in runoncluster.py + walltime="01:00:00", # Overwritten in runoncluster.py init_blocks=1, max_blocks=1, - launcher = SimpleLauncher(), + launcher=SimpleLauncher(), ) return Rynner(provider, work_dir) else: return None + cprynner = None + + def CPRynner(): - ''' Return a shared instance of Rynner - ''' + """Return a shared instance of Rynner""" global cprynner if cprynner is None: try: cprynner = _create_rynner() except SSHException: wx.MessageBox( - 'Unable to contact the cluster. The cluster may be offline or you may have a problem with your internet connection.', - 'Info', wx.OK | wx.ICON_INFORMATION + "Unable to contact the cluster. The cluster may be offline or you may have a problem with your internet connection.", + "Info", + wx.OK | wx.ICON_INFORMATION, ) return None return cprynner + def logout(): - ''' Logout and scrap the rynner object - ''' + """Logout and scrap the rynner object""" global cprynner if cprynner is not None: CPRynner().provider.channel.close() cprynner = None - diff --git a/README.md b/README.md index 80d2b60..444d4bd 100644 --- a/README.md +++ b/README.md @@ -1,43 +1,44 @@ -# CellProfiler-RunOnCluster +# CellProfiler-RunOnCluster + [![DOI](https://zenodo.org/badge/DOI/10.5281/zenodo.3275888.svg)](https://doi.org/10.5281/zenodo.3275888) A CellProfiler module for submitting batch jobs to a cluster running slurm. ## Installation -### Windows - -A precompiled executable with default settings for the Sunbird cluster in Swansea is provided in [releases](https://github.com/sa2c/CellProfiler-RunOnCluster/releases/download/v1.0/CellProfiler.exe). These executables require that a Java Runtime Environment is installed. You can get one for example form the [Java downloads page](https://www.java.com/en/download/). -### Source Installation +Note as our plugin requires additional libraries which aren’t packaged with CellProfiler 4, you’ll need to build CellProfiler from source rather than using a pre-packaged version. ([Reference](https://cellprofiler-manual.s3.amazonaws.com/CellProfiler-4.0.6/help/other_plugins.html?highlight=plugins)). The most straightforward method is to copy the plugin sources to your Cellprofiler plugins directory. Using the default `CellProfiler\plugins` folder as your plugin directory is recommended, but you can still have the plugins in other locations. Follow the instructions for installing CellProfiler on the [Wiki](https://github.com/CellProfiler/CellProfiler/wiki). Choose your operating system on the right-side panel. Once you have installed CellProfiler, set the plugin's directory in Cellprofiler preferences, then save and restart CellProfiler. -When running on Linux or developing your own plugins, the most straight forward method is to copy the plugin sources to your Cellprofiler plugins directory. +Please download the [plugins](https://codeload.github.com/sa2c/CellProfiler-RunOnCluster/zip/refs/heads/master) to your plugins directory. In the plugins directory, install the requirements for the plugins: -Follow the instructions for installing CellProfiler on the [wiki](https://github.com/CellProfiler/CellProfiler/wiki). Choose you operating system on the right side panel. Once you have installed CellProfiler, set the plugins directory in Cellprofiler preferences. Download [the plugins](https://github.com/sa2c/CellProfiler-RunOnCluster/archive/master.zip) and copy move the files to your plugins directory. - -In the plugins directory, install the additional requirements for the plugins: ``` -python2 -m pip install -r requirements.txt +python -m pip install -r requirements.txt ``` ## Usage -### Submitting to Cluster +### Submitting jobs to the cluster Once you have tested your pipeline on your local machine, add all images to be processed into the Images plugin in the usual way. Add the `RunOnCluster` module in the `Other` category to the end of the pipeline. The module has four settings: - * Run Name: An identifier that allows you to recognize the pipeline and image batch. - * Number of images per measurement: If several image files are required for a single measurement, adjust this to the number of images required. - * Image type first: Select `Yes` if the image type appears before the measurement number in the image file name. Select `No` if the measurement number appears before the image type. - * Maximum Runtime (hours): The amount of time to reserve a node for on the cluster. The actual runtime can be lower, but not larger than this. If the run takes longer than the time given, it will be terminated before completion. Must be less than 72. - - Submit the pipeline by pressing `Analyze Images`. The plugin will copy the image files and the pipeline to the cluster and add the process to the queue. - ### Checking run status +- Run Name: An identifier that allows you to recognise the pipeline and image batch. +- Number of images per measurement: If several image files are required for a single measurement, adjust this to the number of images required. +- Image type first: Select `Yes` if the image type appears before the measurement number in the image file name. Select `No` if the measurement number appears before the image type. +- Maximum Runtime (hours): The amount of time to reserve a node on the cluster. The actual runtime can be lower, but not larger than this. If the run takes longer than the time given, it will be terminated before completion. Must be less than 72. +- Project Code: Specify your Supercomputing Wales project code, for example, scw0123. +- Partition: Select the partition you wish to run your job on. (Defaults to `compute` partition) +- Local script directory: Choose where local copies of remote scripts and batch data are saved. - Open the ClusterView module in the Data Tools menu. You will see a list of all runs submitted to the cluster. Under the run name the module will display `PENDING` for runs in queue or currently running and `COMPLETED` for runs that have stopped running. Click `Update` in the upper left corner to refresh the status of the runs. Use the `Download Results` button to download and inspect the results. - If you have already downloaded the results, the button label will change to `Download Again`. +Submit the pipeline by pressing `Analyze Images`. The plugin will copy the image files and the pipeline to the cluster and add the process to the queue. +### Checking job status +Please add the `ClusterView` plugin from the `Data Tools` category in the Module list into the pipeline, then you will be able to select a list of all runs submitted to the cluster. Under the run name, the module will display `PENDING` for runs in the queue or currently running and `COMPLETED` for runs that have stopped running. Click the `Update` button to refresh the status of the runs. Use the `Download Results` button to download and inspect the results. If you have already downloaded the results, the button label will change to `Download Again`. +## TODO +- [ ] Light refactoring of ClusterView functions to separate directory management from Rynner functions for easier testing. +- [ ] Include testing for directory and file management functions in ClusterView. +- [ ] (Optional) Consider folding the ClusterView window into RunOnCluster as a single module. +- [ ] MacOS version diff --git a/clusterview.py b/clusterview.py index edcdbe6..d17858b 100644 --- a/clusterview.py +++ b/clusterview.py @@ -1,5 +1,3 @@ -# coding=utf-8 - """ ClusterView ============= @@ -15,49 +13,55 @@ ============ ============ =============== YES YES NO ============ ============ =============== - - """ +import datetime import logging -logger = logging.getLogger(__package__) - -import numpy as np -import os, time, shutil +import os +import shutil import tempfile -import timeago, datetime -import wx +import time -import cellprofiler.module as cpm -import cellprofiler.setting as cps -import cellprofiler.preferences as cpprefs +import cellprofiler_core.module as cpm +import cellprofiler_core.preferences as cpprefs +import cellprofiler_core.setting as cps +import timeago +import wx +from cellprofiler_core.preferences import (ABSOLUTE_FOLDER_NAME, + DEFAULT_OUTPUT_FOLDER_NAME, + DEFAULT_OUTPUT_SUBFOLDER_NAME) import CPRynner.CPRynner as CPRynner +logger = logging.getLogger(__package__) + class YesToAllMessageDialog(wx.Dialog): - ''' + """ A message dialog with "yes", "no" and "yes to all" buttons, returning wx.ID_YES, wx.ID_NO and wx.ID_YESTOALL respectively - ''' + """ + def __init__(self, parent, message, title): - super(YesToAllMessageDialog, self).__init__(parent, title=title, size = (310,210) ) + super().__init__(parent, title=title, size=(310, 210)) self.panel = wx.Panel(self) - # First the message text + # First the message text text_sizer = wx.BoxSizer(wx.HORIZONTAL) - stmessage = wx.StaticText(self.panel, 11, message, size=(310,110)) + stmessage = wx.StaticText(self.panel, 11, message, size=(310, 110)) stmessage.Wrap(300) - text_sizer.Add(stmessage, 0, wx.ALL , 5) + text_sizer.Add(stmessage, 0, wx.ALL, 5) # Three buttons with the appropriate labels button_sizer = wx.BoxSizer(wx.HORIZONTAL) self.yes_btn = wx.Button(self.panel, wx.ID_YES, label="Yes", size=(60, 30)) - button_sizer.Add(self.yes_btn, 0, wx.ALL , 5) + button_sizer.Add(self.yes_btn, 0, wx.ALL, 5) self.no_btn = wx.Button(self.panel, wx.ID_NO, label="No", size=(60, 30)) - button_sizer.Add(self.no_btn, 0, wx.ALL , 5) - self.yestoall_btn = wx.Button(self.panel, wx.ID_YESTOALL, label="Yes to All", size=(90, 30)) - button_sizer.Add(self.yestoall_btn, 0, wx.ALL , 5) + button_sizer.Add(self.no_btn, 0, wx.ALL, 5) + self.yestoall_btn = wx.Button( + self.panel, wx.ID_YESTOALL, label="Yes to All", size=(90, 30) + ) + button_sizer.Add(self.yestoall_btn, 0, wx.ALL, 5) # Bind the buttons to functions self.yes_btn.Bind(wx.EVT_BUTTON, self.on_yes) @@ -70,7 +74,7 @@ def __init__(self, parent, message, title): main_sizer.Add(button_sizer, 0, wx.ALL | wx.ALIGN_CENTER, 5) self.panel.SetSizer(main_sizer) self.panel.Fit() - + def on_yes(self, event): # On 'yes' button click return wx.ID_YES self.EndModal(wx.ID_YES) @@ -88,14 +92,14 @@ def on_yes_to_all(self, event): class ClusterviewFrame(wx.Frame): - ''' + """ A frame containing information on queued and accomplished runs, update and logout buttons and a download button for each run - ''' + """ def __init__(self, parent, title): # First update runs, then create the window - super(ClusterviewFrame, self).__init__(parent, title=title, size = (400,400)) + super().__init__(parent, title=title, size=(400, 400)) self.update_time = datetime.datetime.now() self.update() self.InitUI() @@ -104,7 +108,7 @@ def __init__(self, parent, title): def InitUI(self): # The containers in the window are organised here self.panel = wx.lib.scrolledpanel.ScrolledPanel(self) - self.panel.SetBackgroundColour('#ededed') + self.panel.SetBackgroundColour("#ededed") self.vbox = wx.BoxSizer(wx.VERTICAL) self.build_view(self.vbox) @@ -123,8 +127,8 @@ def build_view(self, vbox): vbox.Add((-1, 5)) # The update button and info - btn = wx.Button(self.panel, label='Update', size=(90, 30)) - btn.Bind(wx.EVT_BUTTON, self.on_update_click ) + btn = wx.Button(self.panel, label="Update", size=(90, 30)) + btn.Bind(wx.EVT_BUTTON, self.on_update_click) update_time_text = wx.StaticText(self.panel, label="") update_time_text.SetFont(font) @@ -133,20 +137,19 @@ def build_view(self, vbox): # Add the button and text to a sizer hbox = wx.BoxSizer(wx.HORIZONTAL) - hbox.Add(btn, 0, wx.LEFT|wx.ALIGN_CENTER_VERTICAL, 8) - hbox.Add(update_time_text, 0, wx.LEFT|wx.ALIGN_CENTER_VERTICAL, 8) + hbox.Add(btn, 0, wx.LEFT | wx.ALIGN_CENTER_VERTICAL, 8) + hbox.Add(update_time_text, 0, wx.LEFT | wx.ALIGN_CENTER_VERTICAL, 8) vbox.Add(hbox, 0, wx.EXPAND, 10) # The logout and settings buttons in a separate sizer - logout_btn = wx.Button(self.panel, label='Logout', size=(90, 30)) - logout_btn.Bind(wx.EVT_BUTTON, self.on_logout_click ) - settings_btn = wx.Button(self.panel, label='Cluster Settings', size=(90, 30)) - settings_btn.Bind(wx.EVT_BUTTON, - self.on_cluster_settings_click) + logout_btn = wx.Button(self.panel, label="Logout", size=(90, 30)) + logout_btn.Bind(wx.EVT_BUTTON, self.on_logout_click) + settings_btn = wx.Button(self.panel, label="Cluster Settings", size=(90, 30)) + settings_btn.Bind(wx.EVT_BUTTON, self.on_cluster_settings_click) hbox = wx.BoxSizer(wx.HORIZONTAL) - hbox.Add((0,0), 1, wx.ALIGN_CENTER_VERTICAL) - hbox.Add(logout_btn, 0, wx.RIGHT|wx.ALIGN_CENTER_VERTICAL, 8) - hbox.Add(settings_btn, 0, wx.RIGHT|wx.ALIGN_CENTER_VERTICAL, 8) + hbox.Add((0, 0), 1, wx.ALIGN_CENTER_VERTICAL) + hbox.Add(logout_btn, 0, wx.RIGHT | wx.ALIGN_CENTER_VERTICAL, 8) + hbox.Add(settings_btn, 0, wx.RIGHT | wx.ALIGN_CENTER_VERTICAL, 8) vbox.Add(hbox, 0, wx.EXPAND, 10) # Margin and a separator @@ -156,28 +159,26 @@ def build_view(self, vbox): # Add a display for all runs in history self.run_displays = [] - for run in sorted(self.runs, key=lambda k: k['upload_time'], reverse = True): + for run in sorted(self.runs, key=lambda k: k["upload_time"], reverse=True): # Run name - st = wx.StaticText(self.panel, label=run.job_name+":") + st = wx.StaticText(self.panel, label=run.job_name + ":") st.SetFont(font) hbox1 = wx.BoxSizer(wx.HORIZONTAL) hbox1.Add(st, flag=wx.RIGHT, border=8) - vbox.Add(hbox1, flag=wx.EXPAND|wx.LEFT|wx.RIGHT|wx.TOP) + vbox.Add(hbox1, flag=wx.EXPAND | wx.LEFT | wx.RIGHT | wx.TOP) # The state of the run hbox2 = wx.BoxSizer(wx.HORIZONTAL) - time_since = str(datetime.datetime.fromtimestamp(int(run['status_time']))) - st2 = wx.StaticText( self.panel, - label=run.status+" since " + time_since - ) + time_since = str(datetime.datetime.fromtimestamp(int(run["status_time"]))) + st2 = wx.StaticText(self.panel, label=run.status + " since " + time_since) hbox2.Add(st2) vbox.Add(hbox2, flag=wx.LEFT | wx.TOP, border=10) - if run.status == 'PENDING': - starttime = run['starttime'] + if run.status == "PENDING": + starttime = run["starttime"] hbox3 = wx.BoxSizer(wx.HORIZONTAL) - st3 = wx.StaticText( self.panel, - label="Estimated start time " + starttime + st3 = wx.StaticText( + self.panel, label="Estimated start time " + starttime ) hbox3.Add(st3) vbox.Add(hbox3, flag=wx.LEFT | wx.TOP, border=10) @@ -186,26 +187,31 @@ def build_view(self, vbox): vbox.Add((-1, 5)) # The download button - if run.status == 'COMPLETED': - if hasattr(run, 'downloaded') and run.downloaded: - label = 'Download Again' + if run.status == "COMPLETED": + if hasattr(run, "downloaded") and run.downloaded: + label = "Download Again" else: - label = 'Download Results' + label = "Download Results" btn = wx.Button(self.panel, label=label, size=(130, 40)) - btn.Bind(wx.EVT_BUTTON, lambda e, r=run: self.on_download_click( e, r ) ) + btn.Bind(wx.EVT_BUTTON, lambda e, r=run: self.on_download_click(e, r)) hbox3 = wx.BoxSizer(wx.HORIZONTAL) hbox3.Add(btn) - vbox.Add(hbox3, flag=wx.ALIGN_RIGHT|wx.RIGHT, border=10) + vbox.Add(hbox3, flag=wx.ALIGN_RIGHT | wx.RIGHT, border=10) def set_timer(self, element): - ''' + """ Set a timer to update the time since last update - ''' + """ + def update_st(event): - element.SetLabel("Last updated: "+timeago.format(self.update_time, locale='en_GB')) + element.SetLabel( + "Last updated: " + timeago.format(self.update_time, locale="en_GB") + ) + def close(event): self.timer.Stop() self.Destroy() + self.timer = wx.Timer(self) self.timer.Start(1000) self.Bind(wx.EVT_TIMER, update_st, self.timer) @@ -214,14 +220,14 @@ def close(event): def on_download_click(self, event, run): self.download(run) - def on_update_click( self, event ): - ''' + def on_update_click(self, event): + """ Update runs and rebuild the layout - ''' + """ self.update() self.draw() - def on_logout_click( self, event ): + def on_logout_click(self, event): CPRynner.logout() self.runs = [] @@ -242,82 +248,85 @@ def draw(self): self.vbox.Layout() self.FitInside() - def update( self ): - ''' + def update(self): + """ Update the run list - ''' + """ rynner = CPRynner.CPRynner() if rynner is not None: - self.runs = [ r for r in rynner.get_runs() if 'upload_time' in r ] + self.runs = [r for r in rynner.get_runs() if "upload_time" in r] rynner.update(self.runs) - rynner.update_start_times(self.runs) + print(f"Number of runs found: {self.runs.count}") for run in self.runs: - run['status_time'] = rynner.read_time(run) + run["status_time"] = rynner.read_time(run) + print(f"Run {run} checked.") self.update_time = datetime.datetime.now() else: self.runs = [] - - - def download( self, run ): - ''' + def download(self, run): + """ Ask for a destination folder, download files in the results folders and move to the destination - ''' + """ target_directory = self.ask_for_output_dir() if not target_directory: return False - + # Download into a temporary directory tmpdir = tempfile.mkdtemp() self.download_to_tempdir(run, tmpdir) - + # Move the files to the selected folder, handling file names and csv files self.download_file_handling_setup() - has_been_downloaded = hasattr(run, 'downloaded') and run.downloaded + has_been_downloaded = hasattr(run, "downloaded") and run.downloaded for runfolder, localdir in run.downloads: - self.handle_result_file( - os.path.join(localdir, runfolder, 'results'), + self.handle_result_file( + os.path.join(localdir, runfolder, "results"), target_directory, - has_been_downloaded + has_been_downloaded, ) # Set a flag marking the run downloaded - run['downloaded'] = True - CPRynner.CPRynner().save_run_config( run ) + run["downloaded"] = True + CPRynner.CPRynner().save_run_config(run) self.update() self.draw() def ask_for_output_dir(self): - ''' + """ Ask for a destination for the downloaded files - ''' + """ default_target = cpprefs.get_default_output_directory() - dialog = wx.DirDialog (None, "Choose an output directory", default_target, - wx.DD_DEFAULT_STYLE | wx.DD_DIR_MUST_EXIST) + dialog = wx.DirDialog( + None, + "Choose an output directory", + default_target, + wx.DD_DEFAULT_STYLE | wx.DD_DIR_MUST_EXIST, + ) try: if dialog.ShowModal() == wx.ID_CANCEL: return False target_directory = dialog.GetPath() except Exception: - wx.LogError('Failed to open directory!') + wx.LogError("Failed to open directory!") raise finally: dialog.Destroy() return target_directory def download_to_tempdir(self, run, tmpdir): - ''' + """ Actually download the files from the cluster into tmpdir, showing a progress dialog - ''' - run.downloads = [ [d[0], tmpdir] for d in run.downloads ] + """ + run.downloads = [[d[0], tmpdir] for d in run.downloads] CPRynner.CPRynner().start_download(run) - dialog = wx.GenericProgressDialog("Downloading","Downloading files") + dialog = wx.GenericProgressDialog("Downloading", "Downloading files") maximum = dialog.GetRange() - while run['download_status'] < 1: - value = min( maximum, int(maximum*run['download_status']) ) + while run["download_status"] < 1: + value = min(maximum, int(maximum * run["download_status"])) dialog.Update(value) time.sleep(0.04) dialog.Destroy() @@ -327,30 +336,32 @@ def download_file_handling_setup(self): self.yes_to_all_clicked = False def rename_file(self, name): - ''' + """ Add a number at the end of a filename to create a unique new name - ''' + """ stripped_name, suffix = os.path.splitext(name) - n=2 - new_name = stripped_name + '_' +str(n)+suffix + n = 2 + new_name = stripped_name + "_" + str(n) + suffix while os.path.isfile(new_name): n += 1 - new_name = stripped_name + '_' +str(n)+suffix + new_name = stripped_name + "_" + str(n) + suffix return new_name - - def handle_result_file( self, filename, target_directory, has_been_downloaded ): - ''' + + def handle_result_file(self, filename, target_directory, has_been_downloaded): + """ Recursively check result files and move to the target directory. Handle conflicting file names and csv files Each run will create the same set of csv files to contain the measurement info. These need to be combined into one and the image numbers need to be fixed. We will ask how the files should be handled once for each file name and remember the answer in self.csv_dict - ''' + """ if os.path.isdir(filename): # Recursively walk directories for f in os.listdir(filename): - self.handle_result_file( os.path.join(filename, f), target_directory, has_been_downloaded) + self.handle_result_file( + os.path.join(filename, f), target_directory, has_been_downloaded + ) else: # Handle an actual file name = os.path.basename(filename) @@ -358,44 +369,53 @@ def handle_result_file( self, filename, target_directory, has_been_downloaded ): try: if not os.path.isfile(target_file): # No file name conflict, just move - shutil.move( filename, target_directory ) - if filename.endswith('.csv'): + shutil.move(filename, target_directory) + if filename.endswith(".csv"): # File is .csv, we need to remember this one has been handled already self.csv_dict[name] = name - elif name.endswith('.csv'): + elif name.endswith(".csv"): # File exists and is csv. Ask the user whether to append or to create a new file if name not in self.csv_dict: append = self.ask_csv_append(name, has_been_downloaded) if append: self.csv_dict[name] = name - self.handle_csv( filename, os.path.join(target_directory, name) ) + self.handle_csv( + filename, os.path.join(target_directory, name) + ) else: self.csv_dict[name] = self.rename_file(name) - shutil.move( filename, os.path.join(target_directory, self.csv_dict[name])) + shutil.move( + filename, + os.path.join(target_directory, self.csv_dict[name]), + ) else: - self.handle_csv( filename, os.path.join(target_directory, self.csv_dict[name])) + self.handle_csv( + filename, + os.path.join(target_directory, self.csv_dict[name]), + ) else: # File exists, use a new name new_name = self.rename_file(name) - shutil.move( filename, os.path.join(target_directory, new_name)) + shutil.move(filename, os.path.join(target_directory, new_name)) except Exception as e: print(e) wx.MessageBox( "Failed to move a file to the destination", caption="File error", - style=wx.OK | wx.ICON_INFORMATION) - raise(e) + style=wx.OK | wx.ICON_INFORMATION, + ) + raise e def ask_csv_append(self, name, has_been_downloaded): if self.yes_to_all_clicked: return True - message = 'The file '+name+' already exists. Append to the existing file?' + message = f"The file {name} already exists. " f"Append to the existing file?" if has_been_downloaded: - message += ' This file has already been downloaded and appending may result in dublication of data.' - dialog = YesToAllMessageDialog(self, message, 'Append to File') + message += " This file has already been downloaded and appending may result in duplication of data." + dialog = YesToAllMessageDialog(self, message, "Append to File") else: - dialog = YesToAllMessageDialog(self, message, 'Append to File') + dialog = YesToAllMessageDialog(self, message, "Append to File") answer = dialog.ShowModal() if answer == wx.ID_NO: @@ -403,18 +423,19 @@ def ask_csv_append(self, name, has_been_downloaded): if answer == wx.ID_YESTOALL: self.yes_to_all_clicked = True return True - - def handle_csv( self, source, destination ): - ''' Write the data rows of a csv file into an existing csv file. - Fix image numbering before writing ''' + def handle_csv(self, source, destination): + """ + Write the data rows of a csv file into an existing csv file. + Fix image numbering before writing + """ # First check if the file contains the image number - outfile = open(destination,"rb") + outfile = open(destination, "rb") header = outfile.next() has_image_num = False - for index, cell in enumerate(header.split(',')): - if cell == 'ImageNumber': + for index, cell in enumerate(header.split(",")): + if cell == "ImageNumber": image_num_cell = index has_image_num = True @@ -422,82 +443,404 @@ def handle_csv( self, source, destination ): if has_image_num: last_image_num = 1 for row in outfile: - image_num = int(row.split(',')[image_num_cell]) + image_num = int(row.split(",")[image_num_cell]) last_image_num = max(image_num, last_image_num) outfile.close() # Read the source file and write row by row to the destination - infile = open(source, 'rb') + infile = open(source, "rb") infile.next() - outfile = open(destination,"ab") + outfile = open(destination, "ab") for row in infile: # If the image number is included, correct the number if has_image_num: - cells = row.split(',') + cells = row.split(",") local_num = int(cells[image_num_cell]) - cells[image_num_cell] = str(image_num+local_num) - row = ','.join(cells) + cells[image_num_cell] = str(image_num + local_num) + row = ",".join(cells) outfile.write(row) outfile.close() infile.close() - + class clusterView(cpm.Module): module_name = "ClusterView" category = "Data Tools" variable_revision_number = 1 - @classmethod - def is_input_module(cls): - ''' This is a rather horrible hack... - Prevents CellProfiler from listing this in the add module window. - ''' - return True + def __init__(self): + super().__init__() def create_settings(self): - self.pipelineinfo = cps.HTMLText( - "", - "Use the 'Data Tools' menu to open the Cluster View", - size=(2,2) + + self.run_names = ["None"] + + doc_ = "Bring up old ClusterView frame." + self.frame_button = cps.do_something.DoSomething( + "", "Cluster View", self.run_as_data_tool, doc=doc_ + ) + + doc_ = "Select a CellProfiler job that has been run on the cluster." + self.choose_run = cps.choice.Choice( + "Select CellProfiler cluster run.", + choices=self.run_names, + value="None", + doc=doc_, + ) + + doc_ = "Update the module's internal list of CellProfiler cluster runs." + self.update_button = cps.do_something.DoSomething( + "Update job list.", "Update", self.update_module, doc=doc_ + ) + + doc_ = "Update the cluster login settings." + self.settings_button = cps.do_something.DoSomething( + "Change cluster settings.", "Settings", self.on_settings_click, doc=doc_ + ) + + doc_ = "Log out from the cluster. Log back in by updating cluster settings." + self.logout_button = cps.do_something.DoSomething( + "Logout from cluster.", "Logout", self.on_logout_click, doc=doc_ + ) + + doc_ = "Name of the selected run. Used as name for generated results folder." + self.run_folder_name = cps.text.Text( + "Run Name", self.choose_run.value, doc=doc_ + ) + + doc_ = "Choose destination folder for downloaded run files." + self.dest_folder = cps.text.Directory( + "File destination folder", + dir_choices=[ + DEFAULT_OUTPUT_FOLDER_NAME, + ABSOLUTE_FOLDER_NAME, + DEFAULT_OUTPUT_SUBFOLDER_NAME, + ], + value=DEFAULT_OUTPUT_SUBFOLDER_NAME, + doc=doc_, + ) + + doc_ = "Download results files for the selected run. Requires selected run to have status COMPLETE." + self.download_button = cps.do_something.DoSomething( + "Download results.", "Download", self.on_download_click, doc=doc_ ) def settings(self): - return [self.pipelineinfo] + result = [ + self.frame_button, + self.choose_run, + self.update_button, + self.settings_button, + self.logout_button, + self.run_folder_name, + self.dest_folder, + self.download_button, + ] + return result def post_pipeline_load(self, pipeline): - '''Fixup any measurement names that might have been ambiguously loaded + """ + Fixup any measurement names that might have been ambiguously loaded pipeline - for access to other module's measurements - ''' + """ pass def visible_settings(self): - return [self.pipelineinfo] + result = [ + self.frame_button, + self.choose_run, + self.update_button, + self.settings_button, + self.logout_button, + ] + if self.choose_run.value != "None": + self.run_folder_name.value = self.choose_run.value + result += [self.run_folder_name, self.dest_folder, self.download_button] + return result def run(self): pass def run_as_data_tool(self): - frame = ClusterviewFrame(wx.GetApp().frame, 'Cluster View') + frame = ClusterviewFrame(wx.GetApp().frame, "Cluster View") frame.Show() pass - def display(self, workspace, figure): + def update_module(self): + """ + Update the run list + """ + rynner = CPRynner.CPRynner() + if rynner is not None: + # if 'upload_time' in r ] + self.runs = [r for r in rynner.get_runs()] + rynner.update(self.runs) + for run in self.runs: + run["status_time"] = rynner.read_time(run) + self.update_time = datetime.datetime.now() + else: + self.runs = [] + self.run_names = [] + print(f"Number of runs detected: {len(self.runs)}") + for r in range(len(self.runs)): + run = self.runs[r] + print(f"{run['job_name']}") + self.run_names += [(run["job_name"] + " (" + run["qid"] + ")")] + self.run_names += ["None"] + doc_ = "Select a CellProfiler job that has been run on the cluster." + self.choose_run = cps.choice.Choice( + "Select CellProfiler cluster run.", + choices=self.run_names, + value="None", + doc=doc_, + ) pass + def on_logout_click(self): + CPRynner.logout() + self.runs = [] + self.run_names = ["None"] + + def on_settings_click(self): + cluster_address_orig = CPRynner.cluster_url() + CPRynner.update_cluster_parameters() + cluster_address_new = CPRynner.cluster_url() + + if cluster_address_orig != cluster_address_new: + CPRynner.logout() + self.runs = [] + self.update_module() + + def on_download_click(self): + # TO DO: light refactoring of function to separate directory management from Rynner and SFTP functions + run_ind = self.run_names.index(self.choose_run.value) + run = self.runs[run_ind] + if run["status"] != "COMPLETED": + status_message = ( + "Unable to download until run is completed. Current status: " + + run["status"] + ) + status_dialog = wx.MessageDialog( + None, status_message, caption="Run status", style=wx.OK + ) + dialog_result = status_dialog.ShowModal() + return None + folder_name = self.run_folder_name.value + dest_folder = self.dest_folder.get_absolute_path() + dest_dir = os.path.join(dest_folder, folder_name) + # Ask about subdirectory creation. + # Partition into new function i.e. select_download_directory? + if os.path.exists(dest_dir): + dir_message = ( + "Subdirectory named " + + folder_name + + " already exists. Download files into subdirectory folder anyway?" + ) + dir_dialog = wx.MessageDialog( + None, dir_message, caption="Folder option", style=wx.OK | wx.CANCEL + ) + dir_result = dir_dialog.ShowModal() + if dir_result == wx.ID_CANCEL: + return None + elif dir_result == wx.ID_OK: + target_dir = dest_dir + else: + message = "Create subdirectory for results files?" + dialog = wx.MessageDialog( + None, message, caption="Folder option", style=wx.YES_NO | wx.CANCEL + ) + dialog_result = dialog.ShowModal() + if dialog_result == wx.ID_CANCEL: + return None + elif dialog_result == wx.ID_NO: + target_dir = dest_folder + elif dialog_result == wx.ID_YES: + os.mkdir(dest_dir) + target_dir = dest_dir + # Begin download into destination directory. Adapted from previous ClusterView version. + # Download into a temporary directory + tmpdir = tempfile.mkdtemp() # Remove tempdir stage at some point? + self.download_to_tempdir(run, tmpdir) + # Move the files to the selected folder, handling file names and csv files + self.download_file_handling_setup() + has_been_downloaded = hasattr(run, "downloaded") and run.downloaded + for runfolder, localdir in run.downloads: + self.handle_result_file( + os.path.join(localdir, runfolder, "results"), + target_dir, + has_been_downloaded, + ) + # Set a flag marking the run downloaded + run["downloaded"] = True + CPRynner.CPRynner().save_run_config(run) + self.runs[run_ind] = run + complete_message = "Download of run " + self.choose_run.value + " complete." + complete_dialog = wx.MessageDialog( + None, complete_message, caption="Download status", style=wx.OK + ) + dialog_result = complete_dialog.ShowModal() + # Post download cleanup of temp files + tmp_run_dir = os.path.basename(localdir) + if tmp_run_dir[0:3] == "tmp": + shutil.rmtree(localdir) + print(f"Removed temporary directory {tmp_run_dir} from {tmpdir}.") + + def download_to_tempdir(self, run, tmpdir): + """ + Actually download the files from the cluster into tmpdir, + showing a progress dialog + """ + run.downloads = [[d[0], tmpdir] for d in run.downloads] + CPRynner.CPRynner().start_download(run) + dialog = wx.GenericProgressDialog("Downloading", "Downloading files") + maximum = dialog.GetRange() + while run["download_status"] < 1: + value = min(maximum, int(maximum * run["download_status"])) + dialog.Update(value) + time.sleep(0.04) + dialog.Destroy() + + def download_file_handling_setup(self): + self.csv_dict = {} + self.yes_to_all_clicked = False + + def rename_file(self, name): + """ + Add a number at the end of a filename to create a unique new name + """ + stripped_name, suffix = os.path.splitext(name) + n = 2 + new_name = stripped_name + "_" + str(n) + suffix + while os.path.isfile(new_name): + n += 1 + new_name = stripped_name + "_" + str(n) + suffix + return new_name + + def handle_result_file(self, filename, target_directory, has_been_downloaded): + """ + Recursively check result files and move to the target directory. Handle conflicting file names + and csv files + + Each run will create the same set of csv files to contain the measurement info. These need to be + combined into one and the image numbers need to be fixed. We will ask how the files should be handled + once for each file name and remember the answer in self.csv_dict + """ + if os.path.isdir(filename): + # Recursively walk directories + for f in os.listdir(filename): + self.handle_result_file( + os.path.join(filename, f), target_directory, has_been_downloaded + ) + else: + # Handle an actual file + name = os.path.basename(filename) + target_file = os.path.join(target_directory, name) + try: + if not os.path.isfile(target_file): + # No file name conflict, just move + shutil.move(filename, target_directory) + if filename.endswith(".csv"): + # File is .csv, we need to remember this one has been handled already + self.csv_dict[name] = name + elif name.endswith(".csv"): + # File exists and is csv. Ask the user whether to append or to create a new file + if name not in self.csv_dict: + append = self.ask_csv_append(name, has_been_downloaded) + if append: + self.csv_dict[name] = name + self.handle_csv( + filename, os.path.join(target_directory, name) + ) + else: + self.csv_dict[name] = self.rename_file(name) + shutil.move( + filename, + os.path.join(target_directory, self.csv_dict[name]), + ) + else: + self.handle_csv( + filename, + os.path.join(target_directory, self.csv_dict[name]), + ) + else: + # File exists, use a new name + new_name = self.rename_file(name) + shutil.move(filename, os.path.join(target_directory, new_name)) + except Exception as e: + print(e) + wx.MessageBox( + "Failed to move a file to the destination", + caption="File error", + style=wx.OK | wx.ICON_INFORMATION, + ) + raise e + + def ask_csv_append(self, name, has_been_downloaded): + if self.yes_to_all_clicked: + return True + + message = f"The file {name} already exists. " f"Append to the existing file?" + if has_been_downloaded: + message += " This file has already been downloaded and appending may result in duplication of data." + dialog = YesToAllMessageDialog(None, message, "Append to File") + else: + dialog = YesToAllMessageDialog(None, message, "Append to File") + answer = dialog.ShowModal() + + if answer == wx.ID_NO: + return False + if answer == wx.ID_YESTOALL: + self.yes_to_all_clicked = True + return True + + def handle_csv(self, source, destination): + """ + Write the data rows of a csv file into an existing csv file. + Fix image numbering before writing + """ + # First check if the file contains the image number + outfile = open(destination, "rb") + header = outfile.next() + has_image_num = False + for index, cell in enumerate(header.split(",")): + if cell == "ImageNumber": + image_num_cell = index + has_image_num = True + + # If the image number is included, find the largest value + if has_image_num: + last_image_num = 1 + for row in outfile: + image_num = int(row.split(",")[image_num_cell]) + last_image_num = max(image_num, last_image_num) + outfile.close() + + # Read the source file and write row by row to the destination + infile = open(source, "rb") + infile.next() + outfile = open(destination, "ab") + for row in infile: + # If the image number is included, correct the number + if has_image_num: + cells = row.split(",") + local_num = int(cells[image_num_cell]) + cells[image_num_cell] = str(image_num + local_num) + row = ",".join(cells) + outfile.write(row) + outfile.close() + infile.close() + def validate_module(self, pipeline): - '''Do further validation on this module's settings + """ + Do further validation on this module's settings pipeline - this module's pipeline Check to make sure the output measurements aren't duplicated by prior modules. - ''' + """ pass - def upgrade_settings(self, setting_values, variable_revision_number, - module_name, from_matlab): - return setting_values, variable_revision_number, from_matlab - def volumetric(self): return True diff --git a/requirements.txt b/requirements.txt index 63a58a4..17e9c91 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,66 +1,10 @@ -asn1crypto==0.24.0 -backports.functools-lru-cache==1.5 -bcrypt==3.1.6 -boto3==1.9.88 -botocore==1.12.88 -CellProfiler==3.0.0rc3 -centrosome==1.1.6 -certifi==2018.11.29 -cffi==1.11.5 -chardet==3.0.4 -cloudpickle==0.7.0 -configparser==3.7.1 -contextlib2==0.5.5 -cryptography==3.3.2 -cycler==0.10.0 -dask==1.1.1 -DateTime==4.3 -decorator==4.3.2 -deprecation==2.0.6 -docutils==0.14 -enum34==1.1.6 -future==0.17.1 -futures==3.2.0 -h5py==2.9.0 -idna==2.8 -inflect==2.1.0 -ipaddress==1.0.22 -javabridge==1.0.18 -jmespath==0.9.3 -joblib==0.13.1 -kiwisolver==1.0.1 -libsubmit==0.5.0 -mahotas==1.4.5 -matplotlib==2.2.3 -MySQL-python==1.2.5 -mysqlclient==1.3.9 -networkx==2.2 -numpy==1.16.1 -packaging==19.0 -paramiko==2.4.2 -pathlib==1.0.1 -Pillow==8.1.1 -prokaryote==2.4.1 -pyasn1==0.4.5 -pycparser==2.19 -PyNaCl==1.3.0 -pyparsing==2.3.1 -python-bioformats==1.5.2 -python-box==3.2.4 -python-dateutil==2.8.0 -pytz==2018.9 -PyWavelets==1.0.1 -pyzmq==15.3.0 -raven==6.10.0 -requests==2.21.0 --e git://github.com:M4rkD/Rynner.git # Rynner -s3transfer==0.1.13 -scikit-image==0.14.2 -scikit-learn==0.20.2 -scipy==1.2.0 -six==1.12.0 -timeago==1.0.9 -toolz==0.9.0 -urllib3>=1.24.2 -wxPython==3.0.2.0 -zope.interface==4.6.0 +CellProfiler==4.2.5 +cellprofiler-core==4.2.5 +dask==2022.4.0 +javabridge==1.0.19 +parsl~=1.1.0 +PyYAML~=5.4.1 +python-box==6.1.0 +libsubmit @ git+https://github.com/sa2c/libsubmit.git +Rynner @ git+https://github.com/sa2c/rynner.git@61ad5fccea5c6f57fd853fa21e7cc662d2fef655 +timeago==1.0.9 \ No newline at end of file diff --git a/runoncluster.py b/runoncluster.py index 9858071..b890d33 100644 --- a/runoncluster.py +++ b/runoncluster.py @@ -1,18 +1,14 @@ - """ RunOnCluster ================ - **RunOnCluster** submits the pipeline and images to run on an HPC cluster. - The plugin uses the Rynner library, which in turn uses libsubmit, to copy the input files and the pipeline to the cluster. The image files are divided into separate run folders for each core, which will then be processed independently. The download method in the ClusterView plugin automatically combines these back into a single result folder. - Should be placed at the end of the image processing pipeline. | @@ -24,58 +20,68 @@ ============ ============ =============== """ -import os, time, re -from future import * import logging -logger = logging.getLogger(__name__) -import numpy as np +import os +import re +import sys + +import cellprofiler_core import wx +from cellprofiler_core.constants.measurement import F_BATCH_DATA_H5 +from cellprofiler_core.measurement import Measurements +from cellprofiler_core.module import Module +from cellprofiler_core.pipeline import Pipeline +from cellprofiler_core.preferences import (ABSOLUTE_FOLDER_NAME, + DEFAULT_INPUT_FOLDER_NAME, + DEFAULT_INPUT_SUBFOLDER_NAME, + DEFAULT_OUTPUT_FOLDER_NAME, + DEFAULT_OUTPUT_SUBFOLDER_NAME, + get_default_output_directory, + get_plugin_directory) +from cellprofiler_core.setting import Binary, ValidationError +from cellprofiler_core.setting.do_something import DoSomething +from cellprofiler_core.setting.text import Directory, Integer, Text +from cellprofiler_core.workspace import Workspace + +from CPRynner.CPRynner import (CPRynner, cluster_max_runtime, + cluster_run_command, cluster_setup_script, + cluster_tasks_per_node, + update_cluster_parameters) -import cellprofiler -import cellprofiler.module as cpm -import cellprofiler.measurement as cpmeas -import cellprofiler.pipeline as cpp -import cellprofiler.setting as cps -import cellprofiler.preferences as cpprefs -import cellprofiler.workspace as cpw +logger = logging.getLogger(__name__) +sys.path.append(get_plugin_directory()) -from cellprofiler.measurement import F_BATCH_DATA_H5 -from CPRynner.CPRynner import CPRynner -from CPRynner.CPRynner import update_cluster_parameters -from CPRynner.CPRynner import cluster_tasks_per_node -from CPRynner.CPRynner import cluster_setup_script -from CPRynner.CPRynner import cluster_max_runtime +class RunOnCluster(Module): + module_name = "RunOnCluster" + category = "Other" + variable_revision_number = 9 + def __init__(self): + super().__init__() -class RunOnCluster(cpm.Module): - # - # How it works: - # - # - module_name = "RunOnCluster" - category = 'Other' - variable_revision_number = 8 + def update_settings(self, setting: list): + pass - def is_create_batch_module(self): + @staticmethod + def is_create_batch_module(): return True - def upload( self, run, dialog = None ): + @staticmethod + def upload(run, dialog=None): rynner = CPRynner() - if dialog == None: - dialog = wx.GenericProgressDialog("Uploading","Uploading files") - destroy_dialog = True - else: - destroy_dialog = False + if dialog is None: + dialog = wx.GenericProgressDialog("Uploading", "Uploading files") + destroy_dialog = True if dialog is None else False if rynner is not None: rynner.start_upload(run) maximum = dialog.GetRange() - while run['upload_status'] < 1: - value = min( maximum, int(maximum*run['upload_status']) ) + while run["upload_status"] < 1: + value = min(maximum, int(maximum * run["upload_status"])) dialog.Update(value) - dialog.Update(maximum-1) + dialog.Update(maximum - 1) if destroy_dialog: dialog.Destroy() @@ -83,54 +89,85 @@ def volumetric(self): return True def create_settings(self): - '''Create the module settings and name the module''' - self.runname = cps.Text( - "Run Name", - "Run_name", - doc = "Enter a recognizable identifier for the run (spaces will be replaced by undescores)", + """Create the module settings and name the module""" + + doc_ = ( + f"Enter a recognisable identifier for the run " + f"(spaces will be replaced by undescores)" ) - self.n_images_per_measurement = cellprofiler.setting.Integer( - "Number of images per measurement", - 1, - minval=1, - doc = "The number of image files in each measurement that must be present for the pipeline to run correctly. This is usually the number of image types in the NamesAndTypes module." + self.runname = Text("Run Name", "Run_name", doc=doc_) + + doc_ = ( + f"The number of image files in each measurement that must be " + f"present for the pipeline to run correctly. This is usually " + f"the number of image types in the NamesAndTypes module." ) - self.type_first = cellprofiler.setting.Binary( - text="Image type first", - value=True, - doc= "Wether the images are ordered by image type first. If not, ordering by measurement first is assumed." + self.n_images_per_measurement = Integer( + "Number of images per measurement", 1, minval=1, doc=doc_ ) - self.is_archive = cellprofiler.setting.Binary( - text="Is image archive", - value=False, - doc= "Set to Yes if the the images are included as a single image archive, such as an Ism file." + + doc_ = ( + f"Wether the images are ordered by image type first. " + f"If not, ordering by measurement first is assumed." + ) + self.type_first = Binary(text="Image type first", value=True, doc=doc_) + + doc_ = ( + f"Set to Yes if the the images are included as a single image " + f"archive, such as an Ism file." ) - self.measurements_in_archive = cellprofiler.setting.Integer( - "Number of measurements in the archive", - 1, - minval=1, - doc = "The number of measurements in the archive file." + self.is_archive = Binary(text="Is image archive", value=False, doc=doc_) + + doc_ = "The number of measurements in the archive file." + self.measurements_in_archive = Integer( + "Number of measurements in the archive", 1, minval=1, doc=doc_ + ) + + doc_ = ( + f"The maximum time for reserving a node on the cluster. Should" + f" be higher than the actual runtime, or the run may not " + f"complete. Runs with lower values will pass the queue " + f"more quickly." ) - self.max_walltime = cellprofiler.setting.Integer( - "Maximum Runtime (hours)", - 24, - doc = "The maximum time for reserving a node on the cluster. Should be higher than the actual runtime, or the run may not compelte. Runs with lower values will pass the queue more quickly." + self.max_walltime = Integer("Maximum Runtime (hours)", 24, doc=doc_) + + doc_ = ( + f"Enter a project code of an Supercomputing Wales project you " + f"wish to run under. This can be left empty if you have only " + f"one project." ) - self.account = cps.Text( - "Project Code", - "", - doc = "Enter a project code of an Supercomputing Wales project you wish to run under. This can be left empty if you have only one project.", + self.account = Text("Project Code", "", doc=doc_) + + doc_ = ( + f"Select the partition you wish to run your job on. This may " + f"be useful if you have a private partition you wish to utilise. " + f"Defaults to 'compute' partition." ) + self.partition = Text("Partition", "compute", doc=doc_) - self.cluster_settings_button = cps.DoSomething("", - "Cluster Settings", - update_cluster_parameters, - doc = "Change cluster and edit cluster settings." + doc_ = ( + f"Choose where local copies of remote scripts and batch data are" f" saved." + ) + self.script_directory = Directory( + "Local script directory", + dir_choices=[ + DEFAULT_OUTPUT_FOLDER_NAME, + DEFAULT_INPUT_FOLDER_NAME, + ABSOLUTE_FOLDER_NAME, + DEFAULT_OUTPUT_SUBFOLDER_NAME, + DEFAULT_INPUT_SUBFOLDER_NAME, + ], + value=DEFAULT_OUTPUT_SUBFOLDER_NAME, + doc=doc_, ) - self.batch_mode = cps.Binary("Hidden: in batch mode", False) - self.revision = cps.Integer("Hidden: revision number", 0) + doc_ = "Change cluster and edit cluster settings." + self.cluster_settings_button = DoSomething( + "", "Cluster Settings", update_cluster_parameters, doc=doc_ + ) + self.batch_mode = Binary("Hidden: in batch mode", False) + self.revision = Integer("Hidden: revision number", 0) def settings(self): result = [ @@ -141,6 +178,8 @@ def settings(self): self.measurements_in_archive, self.max_walltime, self.account, + self.partition, + self.script_directory, self.batch_mode, self.revision, ] @@ -154,7 +193,7 @@ def visible_settings(self): self.runname, self.is_archive, ] - + if self.is_archive.value: result += [self.measurements_in_archive] else: @@ -166,6 +205,8 @@ def visible_settings(self): result += [ self.max_walltime, self.account, + self.partition, + self.script_directory, self.cluster_settings_button, ] return result @@ -179,227 +220,288 @@ def help_settings(self): self.measurements_in_archive, self.max_walltime, self.account, + self.partition, + self.script_directory, ] return help_settings - def group_images( self, list, n_measurements, measurements_per_run, groups_first = True ): - ''' Divides a list of images into numbered groups and returns a list enumerated by the group numbers ''' + @staticmethod + def group_images(list_, n_measurements, measurements_per_run, groups_first=True): + """Divides a list of images into numbered groups + and returns a list enumerated by the group numbers""" + if groups_first: - images_per_run = len(list)/n_measurements * measurements_per_run - return [(int(i/images_per_run), name) for i, name in enumerate(list)] - else : - return [(int((i%n_measurements)/measurements_per_run), name) for i, name in enumerate(list)] + images_per_run = len(list_) / n_measurements * measurements_per_run + return [(int(i / images_per_run), name) for i, name in enumerate(list_)] + else: + return [ + (int((i % n_measurements) / measurements_per_run), name) + for i, name in enumerate(list_) + ] - def prepare_run(self, workspace): - '''Invoke the image_set_list pickling mechanism and save the pipeline''' + def assign_image_groups(self, file_list): + max_tasks = int(cluster_tasks_per_node()) + n_images = len(file_list) + if not self.is_archive.value: + n_measurements = int(n_images / self.n_images_per_measurement.value) + measurements_per_run = int(n_measurements / max_tasks) + 1 + grouped_images = self.group_images( + file_list, n_measurements, measurements_per_run, self.type_first.value + ) + image_group_list = list(zip(*grouped_images)) + n_image_groups = max(image_group_list[0]) + 1 + # Add image files to uploads + uploads = [[name, f"run{g}/images"] for g, name in grouped_images] + else: + if n_images > 1: + wx.MessageBox( + "Include only one image archive per run.", + caption="Image error", + style=wx.OK | wx.ICON_INFORMATION, + ) + return (None, None, None) + uploads = [[file_list[0], "images"]] + n_measurements = self.measurements_in_archive.value + n_image_groups = max_tasks + return (uploads, grouped_images, n_image_groups) + + @staticmethod + def sanitise_scripts(script): + # Split by semicolons then check for entries that are just spaces + split_script = script.split(";") + for i in range(len(split_script)): + ss = split_script[i] + ss = ss.lstrip(" ").rstrip(" ") + if len(ss) > 0 and i > 0: + ss = " " + ss + split_script[i] = ss + split_script = [ss for ss in split_script if len(ss) > 0] + end_script = ";".join(split_script).rstrip(" ").lstrip(" ") + if end_script[-1] != ";": + end_script = end_script + ";" + end_script = end_script.replace("\r\n", "\n") + end_script = end_script.lstrip(";") # Remove leading semicolons + return end_script + + def create_run_scripts( + self, workspace, rynner, uploads, n_image_groups, grouped_images + ): + max_tasks = int(cluster_tasks_per_node()) + run_command = cluster_run_command() + for g in range(n_image_groups): + runscript_name = f"cellprofiler_run{g}" + local_script_path = os.path.join(rynner.provider.script_dir, runscript_name) + if not self.is_archive.value: + n_measurements = int( + len([i for i in grouped_images if i[0] == g]) + / self.n_images_per_measurement.value + ) + script = ( + f"{run_command} -p Batch_data.h5 -o " + f"results -i images -f 1 -l {n_measurements}" + f" 2>>../cellprofiler_output" + ) + script = self.sanitise_scripts(script) + else: + n_images_per_group = int(n_measurements / max_tasks) + n_additional_images = int(n_measurements % max_tasks) + if g < n_additional_images: + first = int((n_images_per_group + 1) * g) + last = int((n_images_per_group + 1) * (g + 1)) + else: + first = int(n_images_per_group * g + n_additional_images) + last = int(n_images_per_group * (g + 1) + n_additional_images) + script = ( + f"{run_command} -p Batch_data.h5 -o " + f"results -i images -f {first} -l {last} 2>>" + f"../cellprofiler_output;" + ) + script = self.sanitise_scripts(script) + with open(local_script_path, "w") as file: + file.write(script) + uploads += [[local_script_path, f"run{g}"]] + # save the pipeline on a per-node basis in directories labelled by job and subjob + batch_subdir = os.path.join(self.runname.value.replace(" ", "_"), f"run{g}") + batch_dir = os.path.join(rynner.provider.script_dir, batch_subdir) + if not os.path.exists(batch_dir): + os.makedirs(batch_dir) + path = self.save_remote_pipeline( + workspace, os.path.join(batch_dir, F_BATCH_DATA_H5) + ) + # Add the pipeline + uploads += [[path, f"run{g}"]] + return uploads + + def create_job_script(self, n_image_groups): + setup_script = cluster_setup_script() + script = ( + f"{setup_script} printf %s\\\\n " + f"{{0..{n_image_groups - 1}}} | xargs -P 40 -n 1 -IX " + f'bash -c "cd runX ; ./cellprofiler_runX; ";' + ) + script = self.sanitise_scripts(script) + return script + def prepare_run(self, workspace): + """Invoke the image_set_list pickling mechanism and save the pipeline""" pipeline = workspace.pipeline - if pipeline.test_mode: return True if self.batch_mode.value: return True else: rynner = CPRynner() + # Change default script directory to one set in script_directory setting + rynner.provider.script_dir = self.script_directory.get_absolute_path() if rynner is not None: - # Get parameters - max_tasks = int(cluster_tasks_per_node()) - setup_script = cluster_setup_script() - # Set walltime - rynner.provider.walltime = str(self.max_walltime.value)+":00:00" - - # save the pipeline - path = self.save_pipeline(workspace) + rynner.provider.walltime = str(self.max_walltime.value) + ":00:00" # Create the run data structure file_list = pipeline.file_list - file_list = [name.replace('file:///','') for name in file_list] - file_list = [name.replace('file:','') for name in file_list] - file_list = [name.replace('%20',' ') for name in file_list] + file_list = [name.replace("file:///", "") for name in file_list] + file_list = [name.replace("file:", "") for name in file_list] + file_list = [name.replace("%20", " ") for name in file_list] if len(file_list) == 0: wx.MessageBox( - "No images found. Did you remember to add them to the Images module?", - caption="No images", - style=wx.OK | wx.ICON_INFORMATION) + ( + f"No images found. Did you remember to add them to the" + f" Images module?" + ), + caption="No images", + style=wx.OK | wx.ICON_INFORMATION, + ) return False - # Divide measurements to runs according to the number of cores on a node - n_images = len(file_list) - - if not self.is_archive.value: - n_measurements = int(n_images/self.n_images_per_measurement.value) - measurements_per_run = int(n_measurements/max_tasks) + 1 - - grouped_images = self.group_images( file_list, n_measurements, measurements_per_run, self.type_first.value) - n_image_groups = max(zip(*grouped_images)[0]) + 1 - - # Add image files to uploads - uploads = [[name, 'run{}/images'.format(g)] for g,name in grouped_images] - - else: - if n_images > 1: - wx.MessageBox( - "Include only one image archive per run.", - caption="Image error", - style=wx.OK | wx.ICON_INFORMATION) - return False - - uploads = [[file_list[0], 'images']] - - n_measurements = self.measurements_in_archive.value - n_image_groups = max_tasks - - - # Also add the pipeline - uploads += [[path,'.']] - - # The runs are downloaded in their separate folders. They can be processed later - output_dir = cpprefs.get_default_output_directory() - downloads = [['run{}'.format(g),output_dir] for g in range(n_image_groups)] - - # Create run scripts and add to uploads - for g in range(n_image_groups): - runscript_name = 'cellprofiler_run{}'.format(g) - local_script_path = os.path.join(rynner.provider.script_dir, runscript_name) - - if not self.is_archive.value: - n_measurements = len([ i for i in grouped_images if i[0]==g ]) / self.n_images_per_measurement.value - script = "cellprofiler -c -p ../Batch_data.h5 -o results -i images -f 1 -l {} 2>>../cellprofiler_output; rm -r images".format(n_measurements) - - else: - n_images_per_group = int(n_measurements/max_tasks) - n_additional_images = int(n_measurements%max_tasks) - - if g < n_additional_images: - first = (n_images_per_group+1)*g - last = (n_images_per_group+1)*(g+1) - else: - first = n_images_per_group*g + n_additional_images - last = n_images_per_group*(g+1) + n_additional_images - - script = "mkdir images; cp ../images/* images; cellprofiler -c -p ../Batch_data.h5 -o results -i images -f {} -l {} 2>>../cellprofiler_output; rm -r images".format(first, last) + # Divide measurements to runs + # according to the number of cores on a node in assign_image_groups + uploads, grouped_images, n_image_groups = self.assign_image_groups( + file_list + ) + if ( + (uploads is None) + or (grouped_images is None) + or (n_image_groups is None) + ): + return False - with open(local_script_path, "w") as file: - file.write(script) + # The runs are downloaded in their separate folders. + # They can be processed later + output_dir = get_default_output_directory() + downloads = [[f"run{g}", output_dir] for g in range(n_image_groups)] - uploads += [[local_script_path,"run{}".format(g)]] + # Create run scripts and add to uploads with create_run_scripts + uploads = self.create_run_scripts( + workspace, rynner, uploads, n_image_groups, grouped_images + ) + # Define the job to run in create_job_script + script = self.create_job_script(n_image_groups) - # Define the job to run - script = '{}; printf %s\\\\n {{0..{}}} | xargs -P 40 -n 1 -IX bash -c "cd runX ; ./cellprofiler_runX; ";'.format( - setup_script, n_image_groups-1 - ) - script = script.replace('\r\n','\n') - script = script.replace(';;', ';') - print(script) - run = rynner.create_run( - jobname = self.runname.value.replace(' ','_'), - script = script, - uploads = uploads, - downloads = downloads, + run = rynner.create_run( + jobname=self.runname.value.replace(" ", "_"), + script=script, + uploads=uploads, + downloads=downloads, ) - run['account'] = self.account.value + # Add account and partition information to run. + run["account"] = self.account.value + run["partition"] = self.partition.value # Copy the pipeline and images accross - dialog = wx.GenericProgressDialog("Uploading","Uploading files",style=wx.PD_APP_MODAL) + dialog = wx.GenericProgressDialog( + "Uploading", "Uploading files", style=wx.PD_APP_MODAL + ) + try: - self.upload(run, dialog) + self.upload(run, dialog) # Submit the run - dialog.Update( dialog.GetRange()-1, "Submitting" ) - success = CPRynner().submit(run) + dialog.Update(dialog.GetRange() - 1, "Submitting") + success = rynner.submit(run) dialog.Destroy() - + if success: wx.MessageBox( - "RunOnCluster submitted the run to the cluster", - caption="RunOnCluster: Batch job submitted", - style=wx.OK | wx.ICON_INFORMATION) + "RunOnCluster submitted the run to the cluster", + caption="RunOnCluster: Batch job submitted", + style=wx.OK | wx.ICON_INFORMATION, + ) else: wx.MessageBox( - "RunOnCluster failed to submit the run", - caption="RunOnCluster: Failure", - style=wx.OK | wx.ICON_INFORMATION) + "RunOnCluster failed to submit the run", + caption="RunOnCluster: Failure", + style=wx.OK | wx.ICON_INFORMATION, + ) except Exception as e: dialog.Destroy() raise e - return False def run(self, workspace): # The submission happens in prepare run. pass - def validate_module(self, pipeline): - '''Make sure the module settings are valid''' - # This must be the last module in the pipeline - if id(self) != id(pipeline.modules()[-1]): - raise cps.ValidationError("The RunOnCluster module must be " - "the last in the pipeline.", - self.runname) - - max_runtime = int(cluster_max_runtime()) - if self.max_walltime.value >= max_runtime: - raise cps.ValidationError( - "The maximum runtime must be less than "+str(max_runtime)+" hours.", - self.max_walltime) - def validate_module_warnings(self, pipeline): - '''Warn user re: Test mode ''' + """Warn user re: Test mode""" if pipeline.test_mode: - raise cps.ValidationError("RunOnCluster will not produce output in Test Mode", - self.runname) - - def alter_path(self, path, **varargs): - if path == cpprefs.get_default_output_directory(): - path = 'results' + raise ValidationError( + "RunOnCluster will not produce output in Test Mode", self.runname + ) + + @staticmethod + def alter_path(path): + if path == get_default_output_directory(): + path = "results" else: - path = os.path.join('results', os.path.basename(path)) - path = path.replace('\\', '/') + path = os.path.join("results", os.path.basename(path)) + path = path.replace("\\", "/") return path def save_pipeline(self, workspace, outf=None): - '''Save the pipeline in Batch_data.h5 - + """Save the pipeline in Batch_data.h5 Save the pickled image_set_list state in a setting and put this module in batch mode. - if outf is not None, it is used as a file object destination. - ''' + """ if outf is None: - path = cpprefs.get_default_output_directory() + path = get_default_output_directory() h5_path = os.path.join(path, F_BATCH_DATA_H5) else: h5_path = outf image_set_list = workspace.image_set_list pipeline = workspace.pipeline - m = cpmeas.Measurements(copy=workspace.measurements, - filename=h5_path) - + m = Measurements(copy=workspace.measurements, filename=h5_path) + try: - assert isinstance(pipeline, cpp.Pipeline) - assert isinstance(m, cpmeas.Measurements) + assert isinstance(pipeline, Pipeline) + assert isinstance(m, Measurements) orig_pipeline = pipeline pipeline = pipeline.copy() # this use of workspace.frame is okay, since we're called from # prepare_run which happens in the main wx thread. - target_workspace = cpw.Workspace(pipeline, None, None, None, - m, image_set_list, - workspace.frame) - # Assuming all results go to the same place, output folder can be set - # in the script - pipeline.prepare_to_create_batch(target_workspace, self.alter_path) + target_workspace = Workspace( + pipeline, None, None, None, m, image_set_list, workspace.frame + ) + # Assuming all results go to the same place, + # output folder can be set in the script + self_copy = pipeline.module(self.module_num) - self_copy.revision.value = int(re.sub(r"\.|rc\d{1}", "", cellprofiler.__version__)) + self_copy.revision.value = int( + re.sub(r"\.|rc\d{1}", "", cellprofiler_core.__version__) + ) + self_copy.batch_mode.value = True + # Pipeline is readied for saving at this point + pipeline.prepare_to_create_batch(target_workspace, self.alter_path) + pipeline.write_pipeline_measurement(m) orig_pipeline.write_pipeline_measurement(m, user_pipeline=True) @@ -407,20 +509,52 @@ def save_pipeline(self, workspace, outf=None): finally: m.close() - def upgrade_settings(self, setting_values, variable_revision_number, - module_name, from_matlab): - # The first version of this module was created for CellProfiler - # version 8. + def save_remote_pipeline(self, workspace, outf=None): + + if outf is None: + path = get_default_output_directory() + h5_path = os.path.join(path, F_BATCH_DATA_H5) + else: + h5_path = outf + + image_set_list = workspace.image_set_list + pipeline = workspace.pipeline + m = Measurements(copy=workspace.measurements, filename=h5_path) + + try: + assert isinstance(pipeline, Pipeline) + assert isinstance(m, Measurements) + + orig_pipeline = pipeline + pipeline = pipeline.copy() + # this use of workspace.frame is okay, since we're called from + # prepare_run which happens in the main wx thread. + target_workspace = Workspace( + pipeline, None, None, None, m, image_set_list, workspace.frame + ) + # Assuming all results go to the same place, + # output folder can be set in the script + + self_copy = pipeline.module(self.module_num) + self_copy.revision.value = int( + re.sub(r"\.|rc\d{1}", "", cellprofiler_core.__version__) + ) + + # Trim RunOnCluster and ClusterView modules from submitted pipeline + for module in reversed(pipeline.modules()): + if ( + module.module_name == "RunOnCluster" + or module.module_name == "ClusterView" + ): + pipeline.remove_module(module.module_num) + + self_copy.batch_mode.value = True - if from_matlab and variable_revision_number == 8: - # There is no matlab implementation - raise NotImplementedError("Attempting to import RunOnCluster from Matlab.") - - if (not from_matlab) and variable_revision_number == 8: - pass + pipeline.prepare_to_create_batch(target_workspace, self.alter_path) - if variable_revision_number < 8: - # There are no older implementations - raise NotImplementedError("Importing unkown version of RunOnCluster.") + pipeline.write_pipeline_measurement(m) + orig_pipeline.write_pipeline_measurement(m, user_pipeline=True) - return setting_values, variable_revision_number, from_matlab + return h5_path + finally: + m.close() diff --git a/test_runoncluster.py b/test_runoncluster.py deleted file mode 100644 index 2853825..0000000 --- a/test_runoncluster.py +++ /dev/null @@ -1,31 +0,0 @@ -from runoncluster import RunOnCluster - -def test_instantiate(): - rc = RunOnCluster() - assert rc is not None - -def test_create_settings(): - rc = RunOnCluster() - rc.create_settings() - -def test_settings(): - rc = RunOnCluster() - rc.settings() - -def test_visible_settings(): - rc = RunOnCluster() - rc.visible_settings() - -def test_help_settings(): - rc = RunOnCluster() - rc.help_settings() - -def test_group_images(): - rc = RunOnCluster() - imagelist = [1,2,3,4,5,6,7,8] - output = rc.group_images(imagelist, 4, 2, groups_first = False) - assert output == [(0,1),(0,2),(1,3),(1,4),(0,5),(0,6),(1,7),(1,8)] - - output = rc.group_images(imagelist, 4, 2, groups_first = True) - assert output == [(0,1),(0,2),(0,3),(0,4),(1,5),(1,6),(1,7),(1,8)] - diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_clusterview.py b/tests/test_clusterview.py new file mode 100644 index 0000000..3b6ff92 --- /dev/null +++ b/tests/test_clusterview.py @@ -0,0 +1,37 @@ +import pytest + +from ..clusterview import ClusterView + + +@pytest.fixture(scope="session") +def cv(): + clv = ClusterView() + return clv + + +def test_instantiate(cv): + assert cv is not None + + +# For testing settings types +@pytest.mark.parametrize( + "setting, obj_type", + [ + ("frane_button", "DoSomething"), + ("choose_run", "Choice"), + ("update_button", "DoSomething"), + ("settings_button", "DoSomething"), + ("logout_button", "DoSomething"), + ("run_folder_name", "Text"), + ("dest_folder", "Directory"), + ("download_button", "DoSomething"), + ], +) +def test_create_settings(cv, setting, obj_type): + # Test all settings are expected type. + cv.settings() + cv_props = vars(cv) + assert type(cv_props.get(setting)).__name__ == obj_type + + +# Other tests pending on light refactoring of on_download_click diff --git a/tests/test_runoncluster.py b/tests/test_runoncluster.py new file mode 100644 index 0000000..276c3dd --- /dev/null +++ b/tests/test_runoncluster.py @@ -0,0 +1,173 @@ +import pytest + +from ..runoncluster import RunOnCluster + + +@pytest.fixture(scope="session") +def rc(): + roc = RunOnCluster() + return roc + + +def test_instantiate(rc): + assert rc is not None + + +# For testing settings types +@pytest.mark.parametrize( + "setting, obj_type", + [ + ("runname", "Text"), + ("is_archive", "Binary"), + ("n_images_per_measurement", "Integer"), + ("type_first", "Binary"), + ("measurements_in_archive", "Integer"), + ("max_walltime", "Integer"), + ("account", "Text"), + ("partition", "Text"), + ("script_directory", "Directory"), + ("batch_mode", "Binary"), + ("revision", "Integer"), + ], +) +def test_create_settings(rc, setting, obj_type): + # Test all settings are expected type. + rc.create_settings() + rc_props = vars(rc) + assert type(rc_props.get(setting)).__name__ == obj_type + + +# For testing image grouping +@pytest.mark.parametrize( + "image_group, n_measurements, measurements_per_run, groups_first, grouped_images", + [ + ( + ["a", "b", "c", "d", "e", "f", "g", "h"], + 1, + 1, + True, + [ + (0, "a"), + (0, "b"), + (0, "c"), + (0, "d"), + (0, "e"), + (0, "f"), + (0, "g"), + (0, "h"), + ], + ), + ( + ["a", "b", "c", "d", "e", "f", "g", "h"], + 2, + 1, + True, + [ + (0, "a"), + (0, "b"), + (0, "c"), + (0, "d"), + (1, "e"), + (1, "f"), + (1, "g"), + (1, "h"), + ], + ), + ( + ["a", "b", "c", "d", "e", "f", "g", "h"], + 4, + 1, + True, + [ + (0, "a"), + (0, "b"), + (1, "c"), + (1, "d"), + (2, "e"), + (2, "f"), + (3, "g"), + (3, "h"), + ], + ), + ( + ["a", "b", "c", "d", "e", "f", "g", "h"], + 4, + 1, + False, + [ + (0, "a"), + (1, "b"), + (2, "c"), + (3, "d"), + (0, "e"), + (1, "f"), + (2, "g"), + (3, "h"), + ], + ), + ( + ["a", "b", "c", "d", "e", "f", "g", "h"], + 4, + 2, + True, + [ + (0, "a"), + (0, "b"), + (0, "c"), + (0, "d"), + (1, "e"), + (1, "f"), + (1, "g"), + (1, "h"), + ], + ), + ( + ["a", "b", "c", "d", "e", "f", "g", "h"], + 4, + 2, + False, + [ + (0, "a"), + (0, "b"), + (1, "c"), + (1, "d"), + (0, "e"), + (0, "f"), + (1, "g"), + (1, "h"), + ], + ), + (["a", "b", "c", "d"], 4, 1, True, [(0, "a"), (1, "b"), (2, "c"), (3, "d")]), + (["a", "b", "c", "d"], 8, 1, True, [(0, "a"), (2, "b"), (4, "c"), (6, "d")]), + (["a", "b", "c", "d"], 1, 8, True, [(0, "a"), (0, "b"), (0, "c"), (0, "d")]), + (["a"], 1, 1, True, [(0, "a")]), + (["a"], 2, 1, True, [(0, "a")]), + ([], 1, 1, True, []), + ], +) +def test_group_images( + rc, image_group, n_measurements, measurements_per_run, groups_first, grouped_images +): + output = rc.group_images( + image_group, n_measurements, measurements_per_run, groups_first + ) + assert output == grouped_images + + +# For testing script sanitisation +@pytest.mark.parametrize( + "scripts,cleaned_scripts", + [ + ("some; bash; script;", "some; bash; script;"), + (";some ;bash ;script", "some; bash; script;"), + ("some bash script", "some bash script;"), + ("some -options ;; and -commands ;", "some -options; and -commands;"), + (";some bash; script \r\n;", "some bash; script \n;"), + ( + ";some; nightmarish -bash \r\n script ;; \n", + "some; nightmarish -bash \n script; \n;", + ), + ], +) +def test_sanitise_scripts(rc, scripts, cleaned_scripts): + assert rc.sanitise_scripts(scripts) == cleaned_scripts