Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/linter.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.8"]
python-version: ["3.9"]

steps:
- name: Checkout repository
uses: actions/checkout@v3
uses: actions/checkout@v4
if: ${{ !env.ACT }} # skip during local actions testing
with:
fetch-depth: 0

- name: Setup Python
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
cache: pip
Expand Down Expand Up @@ -51,4 +51,4 @@ jobs:
run: pylint -E yascheduler

- name: pyupgrade
run: pyupgrade --py38-plus --keep-percent-format
run: pyupgrade --py39-plus --keep-percent-format
6 changes: 3 additions & 3 deletions .github/workflows/pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ jobs:

steps:
- name: Checkout repository
uses: actions/checkout@v3
uses: actions/checkout@v4
if: ${{ !env.ACT }} # skip during local actions testing

- name: Setup Python
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: 3.11
python-version: 3.13
cache: pip
cache-dependency-path: pyproject.toml

Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ jobs:

steps:
- name: Checkout repository
uses: actions/checkout@v3
uses: actions/checkout@v4
if: ${{ !env.ACT }} # skip during local actions testing
with:
fetch-depth: 0

- name: Setup Python
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: 3.11
python-version: 3.13
cache: pip
cache-dependency-path: pyproject.toml

Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ jobs:

steps:
- name: Checkout repository
uses: actions/checkout@v3
uses: actions/checkout@v4
if: ${{ !env.ACT }} # skip during local actions testing
with:
fetch-depth: 0

- name: Setup Python
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: 3.11
python-version: 3.13
cache: pip
cache-dependency-path: pyproject.toml

Expand Down
4 changes: 3 additions & 1 deletion examples/submit_any_engine_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
parser = argparse.ArgumentParser()
parser.add_argument("-f", dest="file", action="store", type=str, required=True)
parser.add_argument("-e", dest="engine", action="store", type=str, required=True)
parser.add_argument("-l", dest="localrepo", action="store", type=bool, required=False, default=False)
parser.add_argument(
"-l", dest="localrepo", action="store", type=bool, required=False, default=False
)
args = parser.parse_args()

input_data = {}
Expand Down
2 changes: 1 addition & 1 deletion examples/submit_pcrystal_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
FOLDER = work_folder
print("**To save calc in an input folder**")

f34_name = os.path.basename(target).split('.')[0] + '.f34' # e.g. archive with *.f34
f34_name = os.path.basename(target).split(".")[0] + ".f34" # e.g. archive with *.f34

if os.path.exists(os.path.join(work_folder, "fort.34")):
assert "EXTERNAL" in SETUP_INPUT
Expand Down
4 changes: 3 additions & 1 deletion examples/submit_topas_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@


yac = Yascheduler()
result = yac.queue_submit_task(LABEL, {"calc.inp": PATTERN_REQUEST, "structure.inc": ""}, "topas")
result = yac.queue_submit_task(
LABEL, {"calc.inp": PATTERN_REQUEST, "structure.inc": ""}, "topas"
)
print(LABEL)
print(result)
15 changes: 7 additions & 8 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ classifiers = [
"Topic :: Software Development :: Libraries :: Python Modules",
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Framework :: AiiDA"
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Framework :: AiiDA",
]
requires-python = ">=3.8"
requires-python = ">=3.9"
dependencies = [
"aiohttp~=3.8",
"asyncssh~=2.11",
Expand All @@ -46,7 +46,6 @@ dependencies = [
"python-daemon~=2.3",
"typing-extensions >= 4.2.0; python_version < '3.11'",
"upcloud_api~=2.0",
"importlib_metadata; python_version < '3.8'",
]

[project.optional-dependencies]
Expand Down Expand Up @@ -96,7 +95,7 @@ remove-duplicate-keys = true
remove-unused-variables = true

[tool.black]
target-version = ['py38', 'py39', 'py310', 'py311']
target-version = ['py39', 'py310', 'py311', 'py312', 'py313']


[tool.commitizen]
Expand All @@ -111,7 +110,7 @@ changelog_incremental = true

[tool.isort]
profile = "black"
py_version = 38
py_version = 39

[tool.pylint.MASTER]
load-plugins=[
Expand All @@ -120,7 +119,7 @@ load-plugins=[

[tool.pylint.main]
jobs = 0
py-version = "3.8"
py-version = "3.9"
recursive = true
suggestion-mode = true

Expand Down
9 changes: 2 additions & 7 deletions yascheduler/__init__.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
import sys
import importlib.metadata

from .client import Yascheduler
from .variables import CONFIG_FILE, LOG_FILE, PID_FILE

if sys.version_info < (3, 8):
import importlib_metadata
else:
import importlib.metadata as importlib_metadata

__version__ = importlib_metadata.version("yascheduler")
__version__ = importlib.metadata.version("yascheduler")
__all__ = [
"CONFIG_FILE",
"LOG_FILE",
Expand Down
113 changes: 84 additions & 29 deletions yascheduler/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,61 @@

import asyncio
import logging
import sys
from concurrent.futures import ThreadPoolExecutor
from functools import wraps
from pathlib import PurePath
from typing import Any, Mapping, Optional, Sequence, Union
from typing import (
Any,
Callable,
Coroutine,
Mapping,
Optional,
Sequence,
TypeVar,
Union,
)

from attrs import asdict

from .config import Config
from .db import DB, TaskModel, TaskStatus
from .db import DB, TaskStatus
from .scheduler import Scheduler
from .variables import CONFIG_FILE

if sys.version_info < (3, 10):
from typing_extensions import ParamSpec
else:
from typing import ParamSpec

ReturnT_co = TypeVar("ReturnT_co", covariant=True)
ParamT = ParamSpec("ParamT")


def to_sync(
func: Callable[ParamT, Coroutine[Any, Any, ReturnT_co]],
) -> Callable[ParamT, ReturnT_co]:
"""
Wraps async function and run it sync in thread.
"""

@wraps(func)
def outer(*args: ParamT.args, **kwargs: ParamT.kwargs):
"""
Execute the async method synchronously in sync and async runtime.
"""
coro = func(*args, **kwargs)
try:
asyncio.get_running_loop() # Triggers RuntimeError if no running event loop

# Create a separate thread so we can block before returning
with ThreadPoolExecutor(1) as pool:
return pool.submit(lambda: asyncio.run(coro)).result()
except RuntimeError:
return asyncio.run(coro)

return outer


class Yascheduler:
"""Yascheduler client"""
Expand All @@ -31,30 +76,36 @@ def __init__(
self.config = Config.from_config_parser(config_path)
self._logger = logger

def queue_submit_task(
async def queue_submit_task_async(
self,
label: str,
metadata: Mapping[str, Any],
engine_name: str,
webhook_onsubmit=False,
) -> int:
"""Submit new task"""

async def async_fn() -> TaskModel:
yac = await Scheduler.create(config=self.config, log=self._logger)
task = await yac.create_new_task(
label=label,
metadata=metadata,
engine_name=engine_name,
webhook_onsubmit=webhook_onsubmit,
)
await yac.stop()
return task

task = asyncio.run(async_fn())
yac = await Scheduler.create(config=self.config, log=self._logger)
task = await yac.create_new_task(
label=label,
metadata=metadata,
engine_name=engine_name,
webhook_onsubmit=webhook_onsubmit,
)
await yac.stop()
return task.task_id

def queue_get_tasks(
def queue_submit_task(
self,
label: str,
metadata: Mapping[str, Any],
engine_name: str,
webhook_onsubmit=False,
) -> int:
"""Submit new task"""
fn = to_sync(self.queue_submit_task_async)
return fn(label, metadata, engine_name, webhook_onsubmit)

async def queue_get_tasks_async(
self,
jobs: Optional[Sequence[int]] = None,
status: Optional[Sequence[int]] = None,
Expand All @@ -64,24 +115,28 @@ def queue_get_tasks(
raise ValueError("jobs can be selected only by status or by task ids")
# raise ValueError if unknown task status
status = [TaskStatus(x) for x in status] if status else None

async def fn_get_by_statuses(statuses: Sequence[TaskStatus]):
db = await DB.create(self.config.db)
return await db.get_tasks_by_status(statuses)

async def fn_get_by_ids(ids: Sequence[int]):
db = await DB.create(self.config.db)
return await db.get_tasks_by_jobs(ids)

db = await DB.create(self.config.db)
if status:
tasks = asyncio.run(fn_get_by_statuses(status))
tasks = await db.get_tasks_by_status(status)
elif jobs:
tasks = asyncio.run(fn_get_by_ids(jobs))
tasks = await db.get_tasks_by_jobs(jobs)
else:
return []

return [asdict(t) for t in tasks]

def queue_get_tasks(
self,
jobs: Optional[Sequence[int]] = None,
status: Optional[Sequence[int]] = None,
) -> Sequence[Mapping[str, Any]]:
"""Get tasks by ids or statuses"""
return to_sync(self.queue_get_tasks_async)(jobs, status)

async def queue_get_task_async(self, task_id: int) -> Optional[Mapping[str, Any]]:
"""Get task by id"""
for task_dict in await self.queue_get_tasks_async(jobs=[task_id]):
return task_dict

def queue_get_task(self, task_id: int) -> Optional[Mapping[str, Any]]:
"""Get task by id"""
for task_dict in self.queue_get_tasks(jobs=[task_id]):
Expand Down
2 changes: 2 additions & 0 deletions yascheduler/clouds/cloud_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class CloudSetupNodeError(Exception):
@define(frozen=True)
class CloudConfig(PCloudConfig):
"Cloud config init"

bootcmd: Sequence[Union[str, Sequence[str]]] = field(factory=tuple)
package_upgrade: bool = field(default=False)
packages: Sequence[str] = field(factory=list)
Expand All @@ -44,6 +45,7 @@ def render_base64(self) -> str:
@define(frozen=True)
class CloudAPI(PCloudAPI[TConfigCloud_contra]):
"Cloud API protocol"

adapter: PCloudAdapter[TConfigCloud_contra] = field()
config: TConfigCloud_contra = field()
local_config: ConfigLocal = field()
Expand Down
5 changes: 5 additions & 0 deletions yascheduler/clouds/protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

class PCloudConfig(Protocol):
"Cloud config init protocol"

bootcmd: Sequence[Union[str, Sequence[str]]]
package_upgrade: bool
packages: Sequence[str]
Expand Down Expand Up @@ -65,6 +66,7 @@ async def __call__(

class PCloudAdapter(Protocol[TConfigCloud_contra]):
"Cloud adapter protocol"

name: str
supported_platform_checks: Sequence[SupportedPlatformChecker]
create_node: CreateNodeCallable[TConfigCloud_contra]
Expand Down Expand Up @@ -100,6 +102,7 @@ def get_op_semaphore(self) -> asyncio.Semaphore:

class PCloudAPI(Protocol[TConfigCloud_contra]):
"Cloud API protocol"

name: str
config: TConfigCloud_contra
local_config: ConfigLocal
Expand Down Expand Up @@ -155,13 +158,15 @@ async def delete_node(self, host: str):
@define(frozen=True)
class CloudCapacity:
"Cloud capacity object"

name: str
max: int
current: int


class PCloudAPIManager(Protocol):
"Cloud API manager protocol"

apis: Mapping[str, PCloudAPI]
db: DB
log: logging.Logger
Expand Down
Loading