diff --git a/.github/workflows/backend-test.yml b/.github/workflows/backend-test.yml index b6fd9066cf7af..029f4c114e76d 100644 --- a/.github/workflows/backend-test.yml +++ b/.github/workflows/backend-test.yml @@ -84,6 +84,7 @@ jobs: RUST_LOG_STYLE: never CARGO_NET_GIT_FETCH_WITH_CLI: true WMDEBUG_FORCE_V0_WORKSPACE_DEPENDENCIES: 1 + WMDEBUG_FORCE_RUNNABLE_SETTINGS_V0: 1 run: | deno --version && bun -v && go version && python3 --version cd windmill-duckdb-ffi-internal && ./build_dev.sh && cd .. diff --git a/backend/.sqlx/query-05b69dcef0f4f649513e186e73089979c49b4b8113ee832ea7539b56a0415f32.json b/backend/.sqlx/query-05b69dcef0f4f649513e186e73089979c49b4b8113ee832ea7539b56a0415f32.json deleted file mode 100644 index f221e8da2fc53..0000000000000 --- a/backend/.sqlx/query-05b69dcef0f4f649513e186e73089979c49b4b8113ee832ea7539b56a0415f32.json +++ /dev/null @@ -1,156 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "select hash, tag, concurrency_key, concurrent_limit, concurrency_time_window_s, debounce_key, debounce_delay_s, cache_ttl, cache_ignore_s3_path, language as \"language: ScriptLang\", dedicated_worker, priority, delete_after_use, timeout, has_preprocessor, on_behalf_of_email, created_by, path from script where hash = $1 AND workspace_id = $2", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "hash", - "type_info": "Int8" - }, - { - "ordinal": 1, - "name": "tag", - "type_info": "Varchar" - }, - { - "ordinal": 2, - "name": "concurrency_key", - "type_info": "Varchar" - }, - { - "ordinal": 3, - "name": "concurrent_limit", - "type_info": "Int4" - }, - { - "ordinal": 4, - "name": "concurrency_time_window_s", - "type_info": "Int4" - }, - { - "ordinal": 5, - "name": "debounce_key", - "type_info": "Varchar" - }, - { - "ordinal": 6, - "name": "debounce_delay_s", - "type_info": "Int4" - }, - { - "ordinal": 7, - "name": "cache_ttl", - "type_info": "Int4" - }, - { - "ordinal": 8, - "name": "cache_ignore_s3_path", - "type_info": "Bool" - }, - { - "ordinal": 9, - "name": "language: ScriptLang", - "type_info": { - "Custom": { - "name": "script_lang", - "kind": { - "Enum": [ - "python3", - "deno", - "go", - "bash", - "postgresql", - "nativets", - "bun", - "mysql", - "bigquery", - "snowflake", - "graphql", - "powershell", - "mssql", - "php", - "bunnative", - "rust", - "ansible", - "csharp", - "oracledb", - "nu", - "java", - "duckdb", - "ruby" - ] - } - } - } - }, - { - "ordinal": 10, - "name": "dedicated_worker", - "type_info": "Bool" - }, - { - "ordinal": 11, - "name": "priority", - "type_info": "Int2" - }, - { - "ordinal": 12, - "name": "delete_after_use", - "type_info": "Bool" - }, - { - "ordinal": 13, - "name": "timeout", - "type_info": "Int4" - }, - { - "ordinal": 14, - "name": "has_preprocessor", - "type_info": "Bool" - }, - { - "ordinal": 15, - "name": "on_behalf_of_email", - "type_info": "Text" - }, - { - "ordinal": 16, - "name": "created_by", - "type_info": "Varchar" - }, - { - "ordinal": 17, - "name": "path", - "type_info": "Varchar" - } - ], - "parameters": { - "Left": [ - "Int8", - "Text" - ] - }, - "nullable": [ - false, - true, - true, - true, - true, - true, - true, - true, - true, - false, - true, - true, - true, - true, - true, - true, - false, - false - ] - }, - "hash": "05b69dcef0f4f649513e186e73089979c49b4b8113ee832ea7539b56a0415f32" -} diff --git a/backend/.sqlx/query-b179a3f876ca659bed892d464bf51a733cc86a3204fcd9edccda63fddc97dced.json b/backend/.sqlx/query-14276a040cb4db88d71fccdc3579e8c0bb132b70668301b535872d1632753e30.json similarity index 55% rename from backend/.sqlx/query-b179a3f876ca659bed892d464bf51a733cc86a3204fcd9edccda63fddc97dced.json rename to backend/.sqlx/query-14276a040cb4db88d71fccdc3579e8c0bb132b70668301b535872d1632753e30.json index 44d146f2088dd..c607060231f64 100644 --- a/backend/.sqlx/query-b179a3f876ca659bed892d464bf51a733cc86a3204fcd9edccda63fddc97dced.json +++ b/backend/.sqlx/query-14276a040cb4db88d71fccdc3579e8c0bb132b70668301b535872d1632753e30.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "WITH inserted_job AS (\n INSERT INTO v2_job (id, workspace_id, raw_code, raw_lock, raw_flow, tag, parent_job,\n created_by, permissioned_as, runnable_id, runnable_path, args, kind, trigger,\n script_lang, same_worker, pre_run_error, permissioned_as_email, visible_to_owner,\n flow_innermost_root_job, root_job, concurrent_limit, concurrency_time_window_s, timeout, flow_step_id,\n cache_ttl, priority, trigger_kind, script_entrypoint_override, preprocessed)\n VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18,\n $19, $20, $38, $21, $22, $23, $24, $25, $26, $39::job_trigger_kind,\n ($12::JSONB)->>'_ENTRYPOINT_OVERRIDE', $27)\n ),\n inserted_runtime AS (\n INSERT INTO v2_job_runtime (id, ping) VALUES ($1, null)\n ),\n inserted_job_perms AS (\n INSERT INTO job_perms (job_id, email, username, is_admin, is_operator, folders, groups, workspace_id, end_user_email) \n values ($1, $32, $33, $34, $35, $36, $37, $2, $41) \n ON CONFLICT (job_id) DO UPDATE SET email = EXCLUDED.email, username = EXCLUDED.username, is_admin = EXCLUDED.is_admin, is_operator = EXCLUDED.is_operator, folders = EXCLUDED.folders, groups = EXCLUDED.groups, workspace_id = EXCLUDED.workspace_id, end_user_email = EXCLUDED.end_user_email\n )\n INSERT INTO v2_job_queue\n (workspace_id, id, running, scheduled_for, started_at, tag, priority, cache_ignore_s3_path)\n VALUES ($2, $1, $28, COALESCE($29, now()), CASE WHEN $27 OR $40 THEN now() END, $30, $31, $42)", + "query": "WITH inserted_job AS (\n INSERT INTO v2_job (\n id, -- 1\n workspace_id, -- 2\n raw_code, -- 3\n raw_lock, -- 4\n raw_flow, -- 5\n tag, -- 6\n parent_job, -- 7\n created_by, -- 8\n permissioned_as, -- 9\n runnable_id, -- 10\n runnable_path, -- 11\n args, -- 12\n kind, -- 13\n trigger, -- 14\n script_lang, -- 15\n same_worker, -- 16\n pre_run_error, -- 17 \n permissioned_as_email, -- 18\n visible_to_owner, -- 19\n flow_innermost_root_job, -- 20\n root_job, -- 38\n concurrent_limit, -- 21\n concurrency_time_window_s, -- 22\n timeout, -- 23\n flow_step_id, -- 24\n cache_ttl, -- 25\n priority, -- 26\n trigger_kind, -- 39\n script_entrypoint_override, -- 12\n preprocessed -- 27,\n ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18,\n $19, $20, $38, $21, $22, $23, $24, $25, $26, $39::job_trigger_kind,\n ($12::JSONB)->>'_ENTRYPOINT_OVERRIDE', $27)\n ),\n inserted_runtime AS (\n INSERT INTO v2_job_runtime (id, ping) VALUES ($1, null)\n ),\n inserted_job_perms AS (\n INSERT INTO job_perms (job_id, email, username, is_admin, is_operator, folders, groups, workspace_id, end_user_email) \n values ($1, $32, $33, $34, $35, $36, $37, $2, $41) \n ON CONFLICT (job_id) DO UPDATE SET email = EXCLUDED.email, username = EXCLUDED.username, is_admin = EXCLUDED.is_admin, is_operator = EXCLUDED.is_operator, folders = EXCLUDED.folders, groups = EXCLUDED.groups, workspace_id = EXCLUDED.workspace_id, end_user_email = EXCLUDED.end_user_email\n )\n INSERT INTO v2_job_queue\n (workspace_id, id, running, scheduled_for, started_at, tag, priority, cache_ignore_s3_path, runnable_settings_handle)\n VALUES ($2, $1, $28, COALESCE($29, now()), CASE WHEN $27 OR $40 THEN now() END, $30, $31, $42, $43)", "describe": { "columns": [], "parameters": { @@ -128,10 +128,11 @@ }, "Bool", "Varchar", - "Bool" + "Bool", + "Int8" ] }, "nullable": [] }, - "hash": "b179a3f876ca659bed892d464bf51a733cc86a3204fcd9edccda63fddc97dced" + "hash": "14276a040cb4db88d71fccdc3579e8c0bb132b70668301b535872d1632753e30" } diff --git a/backend/.sqlx/query-23759cb515e926e272bbc8e5d8a0a9d039b99bc2026e381e99ef41cdaf6ea19f.json b/backend/.sqlx/query-23759cb515e926e272bbc8e5d8a0a9d039b99bc2026e381e99ef41cdaf6ea19f.json deleted file mode 100644 index 249c2303196d0..0000000000000 --- a/backend/.sqlx/query-23759cb515e926e272bbc8e5d8a0a9d039b99bc2026e381e99ef41cdaf6ea19f.json +++ /dev/null @@ -1,16 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n INSERT INTO script\n (workspace_id, hash, path, parent_hashes, summary, description, content, created_by, schema, is_template, extra_perms, lock, language, kind, tag, draft_only, envs, concurrent_limit, concurrency_time_window_s, cache_ttl, cache_ignore_s3_path, dedicated_worker, ws_error_handler_muted, priority, restart_unless_cancelled, delete_after_use, timeout, concurrency_key, visible_to_runner_only, no_main_func, codebase, has_preprocessor, on_behalf_of_email, schema_validation, assets, debounce_key, debounce_delay_s)\n\n SELECT workspace_id, $1, path, array_prepend($2::bigint, COALESCE(parent_hashes, '{}'::bigint[])), summary, description, content, created_by, schema, is_template, extra_perms, NULL, language, kind, tag, draft_only, envs, concurrent_limit, concurrency_time_window_s, cache_ttl, cache_ignore_s3_path, dedicated_worker, ws_error_handler_muted, priority, restart_unless_cancelled, delete_after_use, timeout, concurrency_key, visible_to_runner_only, no_main_func, codebase, has_preprocessor, on_behalf_of_email, schema_validation, assets, debounce_key, debounce_delay_s\n\n FROM script WHERE hash = $2 AND workspace_id = $3;\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int8", - "Int8", - "Text" - ] - }, - "nullable": [] - }, - "hash": "23759cb515e926e272bbc8e5d8a0a9d039b99bc2026e381e99ef41cdaf6ea19f" -} diff --git a/backend/.sqlx/query-451d9cde90d14071e21ffb5f615052b7ba7fc315fc301ed5c0ff50d9a3ab0d4a.json b/backend/.sqlx/query-451d9cde90d14071e21ffb5f615052b7ba7fc315fc301ed5c0ff50d9a3ab0d4a.json new file mode 100644 index 0000000000000..9e7ffed082741 --- /dev/null +++ b/backend/.sqlx/query-451d9cde90d14071e21ffb5f615052b7ba7fc315fc301ed5c0ff50d9a3ab0d4a.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO runnable_settings (hash, debouncing_settings, concurrency_settings)\n VALUES ($1, $2, $3)\n ON CONFLICT (hash)\n DO NOTHING", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8", + "Int8", + "Int8" + ] + }, + "nullable": [] + }, + "hash": "451d9cde90d14071e21ffb5f615052b7ba7fc315fc301ed5c0ff50d9a3ab0d4a" +} diff --git a/backend/.sqlx/query-6c97ab28ab47b75fb3ff39ea70fa3627f08b61bbd33aecb9ea816f8f78a04ec5.json b/backend/.sqlx/query-5bf200f2c8db25ddf231b564503c6c70f7f3958564a79bb0c6b3863b1ebb0cbf.json similarity index 83% rename from backend/.sqlx/query-6c97ab28ab47b75fb3ff39ea70fa3627f08b61bbd33aecb9ea816f8f78a04ec5.json rename to backend/.sqlx/query-5bf200f2c8db25ddf231b564503c6c70f7f3958564a79bb0c6b3863b1ebb0cbf.json index eb0dabe8208d2..903d9baf17511 100644 --- a/backend/.sqlx/query-6c97ab28ab47b75fb3ff39ea70fa3627f08b61bbd33aecb9ea816f8f78a04ec5.json +++ b/backend/.sqlx/query-5bf200f2c8db25ddf231b564503c6c70f7f3958564a79bb0c6b3863b1ebb0cbf.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "SELECT \n v2_job_queue.workspace_id,\n v2_job_queue.id,\n v2_job.args as \"args: sqlx::types::Json>>\",\n v2_job.parent_job,\n v2_job.created_by,\n v2_job_queue.started_at,\n scheduled_for,\n runnable_path,\n kind as \"kind: JobKind\",\n runnable_id as \"runnable_id: ScriptHash\",\n canceled_reason,\n canceled_by,\n permissioned_as,\n permissioned_as_email,\n flow_status as \"flow_status: sqlx::types::Json>\",\n v2_job.tag,\n script_lang as \"script_lang: ScriptLang\",\n same_worker,\n pre_run_error,\n concurrent_limit,\n concurrency_time_window_s,\n flow_innermost_root_job,\n root_job,\n timeout,\n flow_step_id,\n cache_ttl,\n cache_ignore_s3_path,\n v2_job_queue.priority,\n preprocessed,\n script_entrypoint_override,\n trigger,\n trigger_kind as \"trigger_kind: JobTriggerKind\",\n visible_to_owner,\n NULL as permissioned_as_end_user_email\n FROM v2_job_queue INNER JOIN v2_job ON v2_job.id = v2_job_queue.id LEFT JOIN v2_job_status ON v2_job_status.id = v2_job_queue.id WHERE v2_job_queue.id = $1", + "query": "SELECT \n v2_job_queue.workspace_id,\n v2_job_queue.id,\n v2_job.args as \"args: sqlx::types::Json>>\",\n v2_job.parent_job,\n v2_job.created_by,\n v2_job_queue.started_at,\n v2_job_queue.runnable_settings_handle,\n scheduled_for,\n runnable_path,\n kind as \"kind: JobKind\",\n runnable_id as \"runnable_id: ScriptHash\",\n canceled_reason,\n canceled_by,\n permissioned_as,\n permissioned_as_email,\n flow_status as \"flow_status: sqlx::types::Json>\",\n v2_job.tag,\n script_lang as \"script_lang: ScriptLang\",\n same_worker,\n pre_run_error,\n concurrent_limit,\n concurrency_time_window_s,\n flow_innermost_root_job,\n root_job,\n timeout,\n flow_step_id,\n cache_ttl,\n cache_ignore_s3_path,\n v2_job_queue.priority,\n preprocessed,\n script_entrypoint_override,\n trigger,\n trigger_kind as \"trigger_kind: JobTriggerKind\",\n visible_to_owner,\n NULL as permissioned_as_end_user_email\n FROM v2_job_queue INNER JOIN v2_job ON v2_job.id = v2_job_queue.id LEFT JOIN v2_job_status ON v2_job_status.id = v2_job_queue.id WHERE v2_job_queue.id = $1", "describe": { "columns": [ { @@ -35,16 +35,21 @@ }, { "ordinal": 6, + "name": "runnable_settings_handle", + "type_info": "Int8" + }, + { + "ordinal": 7, "name": "scheduled_for", "type_info": "Timestamptz" }, { - "ordinal": 7, + "ordinal": 8, "name": "runnable_path", "type_info": "Varchar" }, { - "ordinal": 8, + "ordinal": 9, "name": "kind: JobKind", "type_info": { "Custom": { @@ -79,42 +84,42 @@ } }, { - "ordinal": 9, + "ordinal": 10, "name": "runnable_id: ScriptHash", "type_info": "Int8" }, { - "ordinal": 10, + "ordinal": 11, "name": "canceled_reason", "type_info": "Text" }, { - "ordinal": 11, + "ordinal": 12, "name": "canceled_by", "type_info": "Varchar" }, { - "ordinal": 12, + "ordinal": 13, "name": "permissioned_as", "type_info": "Varchar" }, { - "ordinal": 13, + "ordinal": 14, "name": "permissioned_as_email", "type_info": "Varchar" }, { - "ordinal": 14, + "ordinal": 15, "name": "flow_status: sqlx::types::Json>", "type_info": "Jsonb" }, { - "ordinal": 15, + "ordinal": 16, "name": "tag", "type_info": "Varchar" }, { - "ordinal": 16, + "ordinal": 17, "name": "script_lang: ScriptLang", "type_info": { "Custom": { @@ -150,77 +155,77 @@ } }, { - "ordinal": 17, + "ordinal": 18, "name": "same_worker", "type_info": "Bool" }, { - "ordinal": 18, + "ordinal": 19, "name": "pre_run_error", "type_info": "Text" }, { - "ordinal": 19, + "ordinal": 20, "name": "concurrent_limit", "type_info": "Int4" }, { - "ordinal": 20, + "ordinal": 21, "name": "concurrency_time_window_s", "type_info": "Int4" }, { - "ordinal": 21, + "ordinal": 22, "name": "flow_innermost_root_job", "type_info": "Uuid" }, { - "ordinal": 22, + "ordinal": 23, "name": "root_job", "type_info": "Uuid" }, { - "ordinal": 23, + "ordinal": 24, "name": "timeout", "type_info": "Int4" }, { - "ordinal": 24, + "ordinal": 25, "name": "flow_step_id", "type_info": "Varchar" }, { - "ordinal": 25, + "ordinal": 26, "name": "cache_ttl", "type_info": "Int4" }, { - "ordinal": 26, + "ordinal": 27, "name": "cache_ignore_s3_path", "type_info": "Bool" }, { - "ordinal": 27, + "ordinal": 28, "name": "priority", "type_info": "Int2" }, { - "ordinal": 28, + "ordinal": 29, "name": "preprocessed", "type_info": "Bool" }, { - "ordinal": 29, + "ordinal": 30, "name": "script_entrypoint_override", "type_info": "Varchar" }, { - "ordinal": 30, + "ordinal": 31, "name": "trigger", "type_info": "Varchar" }, { - "ordinal": 31, + "ordinal": 32, "name": "trigger_kind: JobTriggerKind", "type_info": { "Custom": { @@ -246,12 +251,12 @@ } }, { - "ordinal": 32, + "ordinal": 33, "name": "visible_to_owner", "type_info": "Bool" }, { - "ordinal": 33, + "ordinal": 34, "name": "permissioned_as_end_user_email", "type_info": "Text" } @@ -268,6 +273,7 @@ true, false, true, + true, false, true, false, @@ -298,5 +304,5 @@ null ] }, - "hash": "6c97ab28ab47b75fb3ff39ea70fa3627f08b61bbd33aecb9ea816f8f78a04ec5" + "hash": "5bf200f2c8db25ddf231b564503c6c70f7f3958564a79bb0c6b3863b1ebb0cbf" } diff --git a/backend/.sqlx/query-5c056ad6cc8967393729288437205c605a24118021fdb2b21b6b61695dc4ff28.json b/backend/.sqlx/query-5c056ad6cc8967393729288437205c605a24118021fdb2b21b6b61695dc4ff28.json new file mode 100644 index 0000000000000..c253fc59980c8 --- /dev/null +++ b/backend/.sqlx/query-5c056ad6cc8967393729288437205c605a24118021fdb2b21b6b61695dc4ff28.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO script\n (workspace_id, hash, path, parent_hashes, summary, description, content, created_by, schema, is_template, extra_perms, lock, language, kind, tag, draft_only, envs, concurrent_limit, concurrency_time_window_s, cache_ttl, cache_ignore_s3_path, dedicated_worker, ws_error_handler_muted, priority, restart_unless_cancelled, delete_after_use, timeout, concurrency_key, visible_to_runner_only, no_main_func, codebase, has_preprocessor, on_behalf_of_email, schema_validation, assets, debounce_key, debounce_delay_s, runnable_settings_handle)\n\n SELECT workspace_id, $1, path, array_prepend($2::bigint, COALESCE(parent_hashes, '{}'::bigint[])), summary, description, content, created_by, schema, is_template, extra_perms, NULL, language, kind, tag, draft_only, envs, concurrent_limit, concurrency_time_window_s, cache_ttl, cache_ignore_s3_path, dedicated_worker, ws_error_handler_muted, priority, restart_unless_cancelled, delete_after_use, timeout, concurrency_key, visible_to_runner_only, no_main_func, codebase, has_preprocessor, on_behalf_of_email, schema_validation, assets, debounce_key, debounce_delay_s, runnable_settings_handle\n\n FROM script WHERE hash = $2 AND workspace_id = $3;\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8", + "Int8", + "Text" + ] + }, + "nullable": [] + }, + "hash": "5c056ad6cc8967393729288437205c605a24118021fdb2b21b6b61695dc4ff28" +} diff --git a/backend/.sqlx/query-7b5ad10af2a9b34fa86429499ea24c0c09c6e7e9ebfa3af90035570133f7c579.json b/backend/.sqlx/query-a1745a4f525b251d2f5a602ab2b2ede46b4471e21b11f607573a844013911abe.json similarity index 92% rename from backend/.sqlx/query-7b5ad10af2a9b34fa86429499ea24c0c09c6e7e9ebfa3af90035570133f7c579.json rename to backend/.sqlx/query-a1745a4f525b251d2f5a602ab2b2ede46b4471e21b11f607573a844013911abe.json index 326d16b58b4d0..f787efc97b627 100644 --- a/backend/.sqlx/query-7b5ad10af2a9b34fa86429499ea24c0c09c6e7e9ebfa3af90035570133f7c579.json +++ b/backend/.sqlx/query-a1745a4f525b251d2f5a602ab2b2ede46b4471e21b11f607573a844013911abe.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "SELECT \n j.id, j.workspace_id, j.runnable_id AS \"runnable_id: ScriptHash\", q.scheduled_for, q.started_at, j.parent_job, j.flow_innermost_root_job, j.runnable_path, j.kind as \"kind!: JobKind\", j.permissioned_as, \n j.created_by, j.script_lang AS \"script_lang: ScriptLang\", j.permissioned_as_email, j.flow_step_id, j.trigger_kind AS \"trigger_kind: JobTriggerKind\", j.trigger, j.priority, j.concurrent_limit, j.tag, j.cache_ttl, q.cache_ignore_s3_path\n FROM v2_job j LEFT JOIN v2_job_queue q ON j.id = q.id\n WHERE j.id = $1 AND j.workspace_id = $2", + "query": "SELECT \n j.id, j.workspace_id, j.runnable_id AS \"runnable_id: ScriptHash\", q.scheduled_for, q.started_at, j.parent_job, j.flow_innermost_root_job, j.runnable_path, j.kind as \"kind!: JobKind\", j.permissioned_as, \n j.created_by, j.script_lang AS \"script_lang: ScriptLang\", j.permissioned_as_email, j.flow_step_id, j.trigger_kind AS \"trigger_kind: JobTriggerKind\", j.trigger, j.priority, j.concurrent_limit, j.tag, j.cache_ttl, q.cache_ignore_s3_path, q.runnable_settings_handle\n FROM v2_job j LEFT JOIN v2_job_queue q ON j.id = q.id\n WHERE j.id = $1 AND j.workspace_id = $2", "describe": { "columns": [ { @@ -189,6 +189,11 @@ "ordinal": 20, "name": "cache_ignore_s3_path", "type_info": "Bool" + }, + { + "ordinal": 21, + "name": "runnable_settings_handle", + "type_info": "Int8" } ], "parameters": { @@ -218,8 +223,9 @@ true, false, true, + true, true ] }, - "hash": "7b5ad10af2a9b34fa86429499ea24c0c09c6e7e9ebfa3af90035570133f7c579" + "hash": "a1745a4f525b251d2f5a602ab2b2ede46b4471e21b11f607573a844013911abe" } diff --git a/backend/.sqlx/query-6b6f8f7b4a6b6e7e41a9da8b6dfdbcae842ff252cc355bd91aeeb5e26dcc74f3.json b/backend/.sqlx/query-a2e52f033120a3f0b64e0a5ba125df7ce0d25096a23f0b655846a1c07b41f620.json similarity index 50% rename from backend/.sqlx/query-6b6f8f7b4a6b6e7e41a9da8b6dfdbcae842ff252cc355bd91aeeb5e26dcc74f3.json rename to backend/.sqlx/query-a2e52f033120a3f0b64e0a5ba125df7ce0d25096a23f0b655846a1c07b41f620.json index 80619c8b2a8af..f12b35a81c8a5 100644 --- a/backend/.sqlx/query-6b6f8f7b4a6b6e7e41a9da8b6dfdbcae842ff252cc355bd91aeeb5e26dcc74f3.json +++ b/backend/.sqlx/query-a2e52f033120a3f0b64e0a5ba125df7ce0d25096a23f0b655846a1c07b41f620.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "SELECT COALESCE((SELECT MIN(started_at) as min_started_at\n FROM v2_job_queue INNER JOIN v2_job ON v2_job.id = v2_job_queue.id\n WHERE v2_job.runnable_path = $1 AND v2_job.kind != 'dependencies' AND v2_job_queue.running = true AND v2_job_queue.workspace_id = $2 AND v2_job_queue.canceled_by IS NULL AND v2_job.concurrent_limit > 0), $3) as min_started_at, now() AS now", + "query": "SELECT COALESCE((SELECT MIN(started_at) as min_started_at\n FROM v2_job_queue INNER JOIN v2_job ON v2_job.id = v2_job_queue.id LEFT JOIN runnable_settings rs ON rs.hash = v2_job_queue.runnable_settings_handle LEFT JOIN concurrency_settings cs ON cs.hash = rs.concurrency_settings\n WHERE v2_job.runnable_path = $1 AND v2_job.kind != 'dependencies' AND v2_job_queue.running = true AND v2_job_queue.workspace_id = $2 AND v2_job_queue.canceled_by IS NULL AND COALESCE(cs.concurrent_limit, v2_job.concurrent_limit) > 0), $3) as min_started_at, now() AS now", "describe": { "columns": [ { @@ -26,5 +26,5 @@ null ] }, - "hash": "6b6f8f7b4a6b6e7e41a9da8b6dfdbcae842ff252cc355bd91aeeb5e26dcc74f3" + "hash": "a2e52f033120a3f0b64e0a5ba125df7ce0d25096a23f0b655846a1c07b41f620" } diff --git a/backend/.sqlx/query-7f9b7ab9bec6a0f745273d0cd5602ceab46a7ec9fd225f7b9d16a2ddb9bad7b3.json b/backend/.sqlx/query-a33673ebc4d1eb4c3513987dbc43e2c80974598e1d9fe7203145bfc29928ba65.json similarity index 83% rename from backend/.sqlx/query-7f9b7ab9bec6a0f745273d0cd5602ceab46a7ec9fd225f7b9d16a2ddb9bad7b3.json rename to backend/.sqlx/query-a33673ebc4d1eb4c3513987dbc43e2c80974598e1d9fe7203145bfc29928ba65.json index 9033ac194b887..57a7434270e3b 100644 --- a/backend/.sqlx/query-7f9b7ab9bec6a0f745273d0cd5602ceab46a7ec9fd225f7b9d16a2ddb9bad7b3.json +++ b/backend/.sqlx/query-a33673ebc4d1eb4c3513987dbc43e2c80974598e1d9fe7203145bfc29928ba65.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "select hash, tag, concurrency_key, concurrent_limit, concurrency_time_window_s, debounce_key, debounce_delay_s, cache_ttl, cache_ignore_s3_path, language as \"language: ScriptLang\", dedicated_worker, priority, timeout, on_behalf_of_email, created_by FROM script\n WHERE path = $1 AND workspace_id = $2 AND archived = false AND (lock IS NOT NULL OR $3 = false)\n ORDER BY created_at DESC LIMIT 1", + "query": "select hash, tag, concurrency_key, concurrent_limit, concurrency_time_window_s, debounce_key, debounce_delay_s, cache_ttl, cache_ignore_s3_path, runnable_settings_handle, language as \"language: ScriptLang\", dedicated_worker, priority, timeout, on_behalf_of_email, created_by FROM script\n WHERE path = $1 AND workspace_id = $2 AND archived = false AND (lock IS NOT NULL OR $3 = false)\n ORDER BY created_at DESC LIMIT 1", "describe": { "columns": [ { @@ -50,6 +50,11 @@ }, { "ordinal": 9, + "name": "runnable_settings_handle", + "type_info": "Int8" + }, + { + "ordinal": 10, "name": "language: ScriptLang", "type_info": { "Custom": { @@ -85,27 +90,27 @@ } }, { - "ordinal": 10, + "ordinal": 11, "name": "dedicated_worker", "type_info": "Bool" }, { - "ordinal": 11, + "ordinal": 12, "name": "priority", "type_info": "Int2" }, { - "ordinal": 12, + "ordinal": 13, "name": "timeout", "type_info": "Int4" }, { - "ordinal": 13, + "ordinal": 14, "name": "on_behalf_of_email", "type_info": "Text" }, { - "ordinal": 14, + "ordinal": 15, "name": "created_by", "type_info": "Varchar" } @@ -127,6 +132,7 @@ true, true, true, + true, false, true, true, @@ -135,5 +141,5 @@ false ] }, - "hash": "7f9b7ab9bec6a0f745273d0cd5602ceab46a7ec9fd225f7b9d16a2ddb9bad7b3" + "hash": "a33673ebc4d1eb4c3513987dbc43e2c80974598e1d9fe7203145bfc29928ba65" } diff --git a/backend/.sqlx/query-a84e67035584bbdb02482026b9cc0808086c50f78947d43bb88628a481f41a1d.json b/backend/.sqlx/query-b3771b690c5966272b1f42c9965bb6a8f961c119516e4c33dc928cd3b4f4edbc.json similarity index 78% rename from backend/.sqlx/query-a84e67035584bbdb02482026b9cc0808086c50f78947d43bb88628a481f41a1d.json rename to backend/.sqlx/query-b3771b690c5966272b1f42c9965bb6a8f961c119516e4c33dc928cd3b4f4edbc.json index eb9dfd792369e..d5084e373ee41 100644 --- a/backend/.sqlx/query-a84e67035584bbdb02482026b9cc0808086c50f78947d43bb88628a481f41a1d.json +++ b/backend/.sqlx/query-b3771b690c5966272b1f42c9965bb6a8f961c119516e4c33dc928cd3b4f4edbc.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "SELECT id, q.workspace_id, j.runnable_id as \"runnable_id: ScriptHash\", scheduled_for, parent_job, flow_innermost_root_job, runnable_path, kind as \"kind: JobKind\", started_at, permissioned_as, created_by, script_lang as \"script_lang: ScriptLang\", \n permissioned_as_email, flow_step_id, trigger_kind as \"trigger_kind: JobTriggerKind\", trigger, q.priority, concurrent_limit, q.tag, cache_ttl, cache_ignore_s3_path, r.ping as last_ping, worker, memory_peak, running\n FROM v2_job_queue q JOIN v2_job j USING (id) LEFT JOIN v2_job_runtime r USING (id) LEFT JOIN v2_job_status s USING (id)\n WHERE j.id = $1", + "query": "SELECT\n id,\n q.runnable_settings_handle,\n q.workspace_id,\n j.runnable_id as \"runnable_id: ScriptHash\",\n scheduled_for,\n parent_job,\n flow_innermost_root_job,\n runnable_path,\n kind as \"kind: JobKind\",\n started_at,\n permissioned_as,\n created_by,\n script_lang as \"script_lang: ScriptLang\",\n permissioned_as_email,\n flow_step_id,\n trigger_kind as \"trigger_kind: JobTriggerKind\",\n trigger,\n q.priority,\n concurrent_limit,\n q.tag,\n cache_ttl,\n cache_ignore_s3_path,\n r.ping as last_ping,\n worker,\n memory_peak,\n running\n FROM v2_job_queue q\n JOIN v2_job j USING (id)\n LEFT JOIN v2_job_runtime r USING (id)\n LEFT JOIN v2_job_status s USING (id)\n WHERE j.id = $1", "describe": { "columns": [ { @@ -10,36 +10,41 @@ }, { "ordinal": 1, + "name": "runnable_settings_handle", + "type_info": "Int8" + }, + { + "ordinal": 2, "name": "workspace_id", "type_info": "Varchar" }, { - "ordinal": 2, + "ordinal": 3, "name": "runnable_id: ScriptHash", "type_info": "Int8" }, { - "ordinal": 3, + "ordinal": 4, "name": "scheduled_for", "type_info": "Timestamptz" }, { - "ordinal": 4, + "ordinal": 5, "name": "parent_job", "type_info": "Uuid" }, { - "ordinal": 5, + "ordinal": 6, "name": "flow_innermost_root_job", "type_info": "Uuid" }, { - "ordinal": 6, + "ordinal": 7, "name": "runnable_path", "type_info": "Varchar" }, { - "ordinal": 7, + "ordinal": 8, "name": "kind: JobKind", "type_info": { "Custom": { @@ -74,22 +79,22 @@ } }, { - "ordinal": 8, + "ordinal": 9, "name": "started_at", "type_info": "Timestamptz" }, { - "ordinal": 9, + "ordinal": 10, "name": "permissioned_as", "type_info": "Varchar" }, { - "ordinal": 10, + "ordinal": 11, "name": "created_by", "type_info": "Varchar" }, { - "ordinal": 11, + "ordinal": 12, "name": "script_lang: ScriptLang", "type_info": { "Custom": { @@ -125,17 +130,17 @@ } }, { - "ordinal": 12, + "ordinal": 13, "name": "permissioned_as_email", "type_info": "Varchar" }, { - "ordinal": 13, + "ordinal": 14, "name": "flow_step_id", "type_info": "Varchar" }, { - "ordinal": 14, + "ordinal": 15, "name": "trigger_kind: JobTriggerKind", "type_info": { "Custom": { @@ -161,52 +166,52 @@ } }, { - "ordinal": 15, + "ordinal": 16, "name": "trigger", "type_info": "Varchar" }, { - "ordinal": 16, + "ordinal": 17, "name": "priority", "type_info": "Int2" }, { - "ordinal": 17, + "ordinal": 18, "name": "concurrent_limit", "type_info": "Int4" }, { - "ordinal": 18, + "ordinal": 19, "name": "tag", "type_info": "Varchar" }, { - "ordinal": 19, + "ordinal": 20, "name": "cache_ttl", "type_info": "Int4" }, { - "ordinal": 20, + "ordinal": 21, "name": "cache_ignore_s3_path", "type_info": "Bool" }, { - "ordinal": 21, + "ordinal": 22, "name": "last_ping", "type_info": "Timestamptz" }, { - "ordinal": 22, + "ordinal": 23, "name": "worker", "type_info": "Varchar" }, { - "ordinal": 23, + "ordinal": 24, "name": "memory_peak", "type_info": "Int4" }, { - "ordinal": 24, + "ordinal": 25, "name": "running", "type_info": "Bool" } @@ -218,6 +223,7 @@ }, "nullable": [ false, + true, false, true, false, @@ -244,5 +250,5 @@ false ] }, - "hash": "a84e67035584bbdb02482026b9cc0808086c50f78947d43bb88628a481f41a1d" + "hash": "b3771b690c5966272b1f42c9965bb6a8f961c119516e4c33dc928cd3b4f4edbc" } diff --git a/backend/.sqlx/query-3d05d9d7e087eb6e1c14c2b8a20598581e6c7493ed99cb9ad1c2ee5d0b212d38.json b/backend/.sqlx/query-b4eb72b0274cbdce7490f63c36d0d16ee847294fadc138593a1baa417cbb3652.json similarity index 86% rename from backend/.sqlx/query-3d05d9d7e087eb6e1c14c2b8a20598581e6c7493ed99cb9ad1c2ee5d0b212d38.json rename to backend/.sqlx/query-b4eb72b0274cbdce7490f63c36d0d16ee847294fadc138593a1baa417cbb3652.json index 521b7cb1c0cac..f1ba61b895943 100644 --- a/backend/.sqlx/query-3d05d9d7e087eb6e1c14c2b8a20598581e6c7493ed99cb9ad1c2ee5d0b212d38.json +++ b/backend/.sqlx/query-b4eb72b0274cbdce7490f63c36d0d16ee847294fadc138593a1baa417cbb3652.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "INSERT INTO script (workspace_id, hash, path, parent_hashes, summary, description, content, created_by, schema, is_template, extra_perms, lock, language, kind, tag, draft_only, envs, concurrent_limit, concurrency_time_window_s, cache_ttl, dedicated_worker, ws_error_handler_muted, priority, restart_unless_cancelled, delete_after_use, timeout, concurrency_key, visible_to_runner_only, no_main_func, codebase, has_preprocessor, on_behalf_of_email, schema_validation, assets, debounce_key, debounce_delay_s, cache_ignore_s3_path) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9::text::json, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, $29, $30, $31, $32, $33, $34, $35, $36, $37)", + "query": "INSERT INTO script (workspace_id, hash, path, parent_hashes, summary, description, content, created_by, schema, is_template, extra_perms, lock, language, kind, tag, draft_only, envs, concurrent_limit, concurrency_time_window_s, cache_ttl, dedicated_worker, ws_error_handler_muted, priority, restart_unless_cancelled, delete_after_use, timeout, concurrency_key, visible_to_runner_only, no_main_func, codebase, has_preprocessor, on_behalf_of_email, schema_validation, assets, debounce_key, debounce_delay_s, cache_ignore_s3_path, runnable_settings_handle) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9::text::json, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, $29, $30, $31, $32, $33, $34, $35, $36, $37, $38)", "describe": { "columns": [], "parameters": { @@ -86,10 +86,11 @@ "Jsonb", "Varchar", "Int4", - "Bool" + "Bool", + "Int8" ] }, "nullable": [] }, - "hash": "3d05d9d7e087eb6e1c14c2b8a20598581e6c7493ed99cb9ad1c2ee5d0b212d38" + "hash": "b4eb72b0274cbdce7490f63c36d0d16ee847294fadc138593a1baa417cbb3652" } diff --git a/backend/.sqlx/query-ebbbd069e0f33be9609604025d159fe1ecbefc2e9c11f7c4900b7121d4367e01.json b/backend/.sqlx/query-ebbbd069e0f33be9609604025d159fe1ecbefc2e9c11f7c4900b7121d4367e01.json new file mode 100644 index 0000000000000..146c7f05b395b --- /dev/null +++ b/backend/.sqlx/query-ebbbd069e0f33be9609604025d159fe1ecbefc2e9c11f7c4900b7121d4367e01.json @@ -0,0 +1,28 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT concurrency_settings, debouncing_settings FROM runnable_settings WHERE hash = $1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "concurrency_settings", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "debouncing_settings", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + true, + true + ] + }, + "hash": "ebbbd069e0f33be9609604025d159fe1ecbefc2e9c11f7c4900b7121d4367e01" +} diff --git a/backend/Cargo.lock b/backend/Cargo.lock index 0b19814a2a2e7..359021af50c7c 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -15201,6 +15201,7 @@ dependencies = [ "sha1", "sha2 0.10.9", "size", + "sql-builder", "sqlx", "strum 0.27.2", "systemstat", diff --git a/backend/Cargo.toml b/backend/Cargo.toml index 500fa615e664d..d6ff9f7b7402a 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -125,6 +125,7 @@ windmill-autoscaling = { workspace = true, optional = true } futures.workspace = true tracing.workspace = true sqlx.workspace = true +sql-builder.workspace = true rand.workspace = true chrono.workspace = true git-version.workspace = true diff --git a/backend/ee-repo-ref.txt b/backend/ee-repo-ref.txt index 27a9af3b6c839..b6855b0df2898 100644 --- a/backend/ee-repo-ref.txt +++ b/backend/ee-repo-ref.txt @@ -1 +1 @@ -d347295041426d03039b747a148a71e3583c3a6b +505eadbff32d102ea5245a2bef88ce6f1bb95395 diff --git a/backend/migrations/20251204094847_external_runnables_settings.down.sql b/backend/migrations/20251204094847_external_runnables_settings.down.sql new file mode 100644 index 0000000000000..ca829e1079fad --- /dev/null +++ b/backend/migrations/20251204094847_external_runnables_settings.down.sql @@ -0,0 +1,11 @@ +ALTER TABLE v2_job_queue +DROP COLUMN runnable_settings_handle; + +ALTER TABLE script +DROP COLUMN runnable_settings_handle; + +DROP TABLE IF EXISTS job_settings; +DROP TABLE IF EXISTS runnable_settings; +DROP TABLE IF EXISTS concurrency_settings; +DROP TABLE IF EXISTS debouncing_settings; + diff --git a/backend/migrations/20251204094847_external_runnables_settings.up.sql b/backend/migrations/20251204094847_external_runnables_settings.up.sql new file mode 100644 index 0000000000000..7c893c85a71db --- /dev/null +++ b/backend/migrations/20251204094847_external_runnables_settings.up.sql @@ -0,0 +1,42 @@ +CREATE TABLE IF NOT EXISTS concurrency_settings( + hash BIGINT PRIMARY KEY, + concurrency_key VARCHAR(255), + concurrent_limit INTEGER, + concurrency_time_window_s INTEGER +); + +CREATE TABLE IF NOT EXISTS debouncing_settings( + hash BIGINT PRIMARY KEY, + debounce_key VARCHAR(255), + debounce_delay_s INTEGER, + max_total_debouncing_time INTEGER, + max_total_debounces_amount INTEGER, + debounce_args_to_accumulate TEXT[] +); + +CREATE TABLE IF NOT EXISTS runnable_settings( + hash BIGINT PRIMARY KEY, + debouncing_settings BIGINT DEFAULT NULL, + concurrency_settings BIGINT DEFAULT NULL +); + +CREATE TABLE IF NOT EXISTS job_settings( + job_id UUID PRIMARY KEY, + runnable_settings BIGINT DEFAULT NULL +); + +ALTER TABLE script +ADD COLUMN runnable_settings_handle BIGINT DEFAULT NULL; + +ALTER TABLE v2_job_queue +ADD COLUMN runnable_settings_handle BIGINT DEFAULT NULL; + + +GRANT ALL ON concurrency_settings TO windmill_admin; +GRANT ALL ON concurrency_settings TO windmill_user; +GRANT ALL ON debouncing_settings TO windmill_admin; +GRANT ALL ON debouncing_settings TO windmill_user; +GRANT ALL ON runnable_settings TO windmill_admin; +GRANT ALL ON runnable_settings TO windmill_user; +GRANT ALL ON job_settings TO windmill_admin; +GRANT ALL ON job_settings TO windmill_user; diff --git a/backend/tests/common/mod.rs b/backend/tests/common/mod.rs index a73068691dbb4..8966c95f7f219 100644 --- a/backend/tests/common/mod.rs +++ b/backend/tests/common/mod.rs @@ -691,8 +691,8 @@ pub async fn run_deployed_relative_imports( language, priority: None, apply_preprocessor: false, - concurrency_settings: windmill_common::jobs::ConcurrencySettings::default(), - debouncing_settings: windmill_common::jobs::DebouncingSettings::default(), + concurrency_settings: windmill_common::runnable_settings::ConcurrencySettings::default(), + debouncing_settings: windmill_common::runnable_settings::DebouncingSettings::default(), }) .push(&db2) .await; @@ -741,8 +741,8 @@ pub async fn run_preview_relative_imports( cache_ttl: None, cache_ignore_s3_path: None, dedicated_worker: None, - concurrency_settings: windmill_common::jobs::ConcurrencySettings::default().into(), - debouncing_settings: windmill_common::jobs::DebouncingSettings::default(), + concurrency_settings: windmill_common::runnable_settings::ConcurrencySettings::default().into(), + debouncing_settings: windmill_common::runnable_settings::DebouncingSettings::default(), })) .push(&db2) .await; diff --git a/backend/tests/job_payload.rs b/backend/tests/job_payload.rs index 2ad2bf339d260..545665c39f1a9 100644 --- a/backend/tests/job_payload.rs +++ b/backend/tests/job_payload.rs @@ -52,8 +52,8 @@ mod job_payload { let result = RunJob::from(JobPayload::ScriptHash { hash: ScriptHash(123412), path: "f/system/hello".to_string(), - concurrency_settings: windmill_common::jobs::ConcurrencySettings::default().into(), - debouncing_settings: windmill_common::jobs::DebouncingSettings::default(), + concurrency_settings: windmill_common::runnable_settings::ConcurrencySettings::default().into(), + debouncing_settings: windmill_common::runnable_settings::DebouncingSettings::default(), cache_ttl: None, cache_ignore_s3_path: None, dedicated_worker: None, @@ -90,8 +90,8 @@ mod job_payload { language: ScriptLang::Deno, priority: None, apply_preprocessor: true, - concurrency_settings: windmill_common::jobs::ConcurrencySettings::default(), - debouncing_settings: windmill_common::jobs::DebouncingSettings::default(), + concurrency_settings: windmill_common::runnable_settings::ConcurrencySettings::default(), + debouncing_settings: windmill_common::runnable_settings::DebouncingSettings::default(), }) .run_until_complete_with(db, false, port, |id| async move { let job = sqlx::query!("SELECT preprocessed FROM v2_job WHERE id = $1", id) @@ -163,7 +163,7 @@ mod job_payload { let result = RunJob::from(JobPayload::FlowScript { id: flow_scripts[0], language: ScriptLang::Deno, - concurrency_settings: windmill_common::jobs::ConcurrencySettings::default(), + concurrency_settings: windmill_common::runnable_settings::ConcurrencySettings::default(), cache_ttl: None, cache_ignore_s3_path: None, dedicated_worker: None, @@ -182,7 +182,7 @@ mod job_payload { let result = RunJob::from(JobPayload::FlowScript { id: flow_scripts[1], language: ScriptLang::Deno, - concurrency_settings: windmill_common::jobs::ConcurrencySettings::default(), + concurrency_settings: windmill_common::runnable_settings::ConcurrencySettings::default(), cache_ttl: None, cache_ignore_s3_path: None, dedicated_worker: None, diff --git a/backend/tests/python_jobs.rs b/backend/tests/python_jobs.rs index cb5909f183270..47a554afd5c12 100644 --- a/backend/tests/python_jobs.rs +++ b/backend/tests/python_jobs.rs @@ -189,8 +189,8 @@ def main(): path: None, language: ScriptLang::Python3, lock: None, - concurrency_settings: windmill_common::jobs::ConcurrencySettings::default().into(), - debouncing_settings: windmill_common::jobs::DebouncingSettings::default(), + concurrency_settings: windmill_common::runnable_settings::ConcurrencySettings::default().into(), + debouncing_settings: windmill_common::runnable_settings::DebouncingSettings::default(), cache_ttl: None, cache_ignore_s3_path: None, dedicated_worker: None, @@ -238,8 +238,8 @@ def main(): path: None, language: ScriptLang::Python3, lock: None, - concurrency_settings: windmill_common::jobs::ConcurrencySettings::default().into(), - debouncing_settings: windmill_common::jobs::DebouncingSettings::default(), + concurrency_settings: windmill_common::runnable_settings::ConcurrencySettings::default().into(), + debouncing_settings: windmill_common::runnable_settings::DebouncingSettings::default(), cache_ttl: None, cache_ignore_s3_path: None, dedicated_worker: None, @@ -272,8 +272,8 @@ def main(): path: None, language: ScriptLang::Python3, lock: None, - concurrency_settings: windmill_common::jobs::ConcurrencySettings::default().into(), - debouncing_settings: windmill_common::jobs::DebouncingSettings::default(), + concurrency_settings: windmill_common::runnable_settings::ConcurrencySettings::default().into(), + debouncing_settings: windmill_common::runnable_settings::DebouncingSettings::default(), cache_ttl: None, cache_ignore_s3_path: None, dedicated_worker: None, @@ -311,8 +311,8 @@ def main(): path: None, language: ScriptLang::Python3, lock: None, - concurrency_settings: windmill_common::jobs::ConcurrencySettings::default().into(), - debouncing_settings: windmill_common::jobs::DebouncingSettings::default(), + concurrency_settings: windmill_common::runnable_settings::ConcurrencySettings::default().into(), + debouncing_settings: windmill_common::runnable_settings::DebouncingSettings::default(), cache_ttl: None, cache_ignore_s3_path: None, dedicated_worker: None, @@ -348,8 +348,8 @@ def main(): path: None, language: ScriptLang::Python3, lock: None, - concurrency_settings: windmill_common::jobs::ConcurrencySettings::default().into(), - debouncing_settings: windmill_common::jobs::DebouncingSettings::default(), + concurrency_settings: windmill_common::runnable_settings::ConcurrencySettings::default().into(), + debouncing_settings: windmill_common::runnable_settings::DebouncingSettings::default(), cache_ttl: None, cache_ignore_s3_path: None, dedicated_worker: None, diff --git a/backend/tests/worker.rs b/backend/tests/worker.rs index b7a46b8c7810e..5cef9463153dc 100644 --- a/backend/tests/worker.rs +++ b/backend/tests/worker.rs @@ -188,7 +188,7 @@ async fn test_deno_flow(db: Pool) -> anyhow::Result<()> { path: None, lock: None, tag: None, - concurrency_settings: windmill_common::jobs::ConcurrencySettings::default() + concurrency_settings: windmill_common::runnable_settings::ConcurrencySettings::default() .into(), is_trigger: None, assets: None, @@ -235,7 +235,7 @@ async fn test_deno_flow(db: Pool) -> anyhow::Result<()> { lock: None, tag: None, concurrency_settings: - windmill_common::jobs::ConcurrencySettings::default().into(), + windmill_common::runnable_settings::ConcurrencySettings::default().into(), is_trigger: None, assets: None, } @@ -369,7 +369,7 @@ async fn test_deno_flow_same_worker(db: Pool) -> anyhow::Result<()> { path: None, lock: None, tag: None, - concurrency_settings: windmill_common::jobs::ConcurrencySettings::default().into(), + concurrency_settings: windmill_common::runnable_settings::ConcurrencySettings::default().into(), is_trigger: None, assets: None, @@ -425,7 +425,7 @@ async fn test_deno_flow_same_worker(db: Pool) -> anyhow::Result<()> { path: None, lock: None, tag: None, - concurrency_settings: windmill_common::jobs::ConcurrencySettings::default().into(), + concurrency_settings: windmill_common::runnable_settings::ConcurrencySettings::default().into(), is_trigger: None, assets: None, }.into(), @@ -465,7 +465,7 @@ async fn test_deno_flow_same_worker(db: Pool) -> anyhow::Result<()> { path: None, lock: None, tag: None, - concurrency_settings: windmill_common::jobs::ConcurrencySettings::default().into(), + concurrency_settings: windmill_common::runnable_settings::ConcurrencySettings::default().into(), is_trigger: None, assets: None, @@ -533,7 +533,7 @@ async fn test_deno_flow_same_worker(db: Pool) -> anyhow::Result<()> { path: None, lock: None, tag: None, - concurrency_settings: windmill_common::jobs::ConcurrencySettings::default().into(), + concurrency_settings: windmill_common::runnable_settings::ConcurrencySettings::default().into(), is_trigger: None, assets: None, }.into(), @@ -865,8 +865,8 @@ func main(derp string) (string, error) { cache_ttl: None, cache_ignore_s3_path: None, dedicated_worker: None, - concurrency_settings: windmill_common::jobs::ConcurrencySettings::default().into(), - debouncing_settings: windmill_common::jobs::DebouncingSettings::default(), + concurrency_settings: windmill_common::runnable_settings::ConcurrencySettings::default().into(), + debouncing_settings: windmill_common::runnable_settings::DebouncingSettings::default(), })) .arg("derp", json!("world")) .run_until_complete(&db, false, port) @@ -900,8 +900,8 @@ fn main(world: String) -> Result { lock: None, language: ScriptLang::Rust, cache_ignore_s3_path: None, - concurrency_settings: windmill_common::jobs::ConcurrencySettings::default().into(), - debouncing_settings: windmill_common::jobs::DebouncingSettings::default(), + concurrency_settings: windmill_common::runnable_settings::ConcurrencySettings::default().into(), + debouncing_settings: windmill_common::runnable_settings::DebouncingSettings::default(), cache_ttl: None, dedicated_worker: None, })) @@ -978,8 +978,8 @@ echo "hello $msg" cache_ttl: None, cache_ignore_s3_path: None, dedicated_worker: None, - concurrency_settings: windmill_common::jobs::ConcurrencySettings::default().into(), - debouncing_settings: windmill_common::jobs::DebouncingSettings::default(), + concurrency_settings: windmill_common::runnable_settings::ConcurrencySettings::default().into(), + debouncing_settings: windmill_common::runnable_settings::DebouncingSettings::default(), })) .arg("msg", json!("world")) .run_until_complete(&db, false, port) @@ -1011,8 +1011,8 @@ def main [ msg: string ] { cache_ttl: None, cache_ignore_s3_path: None, dedicated_worker: None, - concurrency_settings: windmill_common::jobs::ConcurrencySettings::default().into(), - debouncing_settings: windmill_common::jobs::DebouncingSettings::default(), + concurrency_settings: windmill_common::runnable_settings::ConcurrencySettings::default().into(), + debouncing_settings: windmill_common::runnable_settings::DebouncingSettings::default(), })) .arg("msg", json!("world")) .run_until_complete(&db, false, port) @@ -1064,8 +1064,8 @@ def main [ cache_ttl: None, cache_ignore_s3_path: None, dedicated_worker: None, - concurrency_settings: windmill_common::jobs::ConcurrencySettings::default().into(), - debouncing_settings: windmill_common::jobs::DebouncingSettings::default(), + concurrency_settings: windmill_common::runnable_settings::ConcurrencySettings::default().into(), + debouncing_settings: windmill_common::runnable_settings::DebouncingSettings::default(), })) .arg("a", json!("3")) .arg("b", json!("null")) @@ -1126,8 +1126,8 @@ public class Main { cache_ttl: None, cache_ignore_s3_path: None, dedicated_worker: None, - concurrency_settings: windmill_common::jobs::ConcurrencySettings::default().into(), - debouncing_settings: windmill_common::jobs::DebouncingSettings::default(), + concurrency_settings: windmill_common::runnable_settings::ConcurrencySettings::default().into(), + debouncing_settings: windmill_common::runnable_settings::DebouncingSettings::default(), })) .arg("a", json!(3)) .arg("b", json!(3.0)) @@ -1161,8 +1161,8 @@ export async function main(a: Date) { cache_ttl: None, cache_ignore_s3_path: None, dedicated_worker: None, - concurrency_settings: windmill_common::jobs::ConcurrencySettings::default().into(), - debouncing_settings: windmill_common::jobs::DebouncingSettings::default(), + concurrency_settings: windmill_common::runnable_settings::ConcurrencySettings::default().into(), + debouncing_settings: windmill_common::runnable_settings::DebouncingSettings::default(), })) .arg("a", json!("2024-09-24T10:00:00.000Z")) .run_until_complete(&db, false, port) @@ -1196,8 +1196,8 @@ export async function main(a: Date) { cache_ttl: None, cache_ignore_s3_path: None, dedicated_worker: None, - concurrency_settings: windmill_common::jobs::ConcurrencySettings::default().into(), - debouncing_settings: windmill_common::jobs::DebouncingSettings::default(), + concurrency_settings: windmill_common::runnable_settings::ConcurrencySettings::default().into(), + debouncing_settings: windmill_common::runnable_settings::DebouncingSettings::default(), })) .arg("a", json!("2024-09-24T10:00:00.000Z")) .run_until_complete(&db, false, port) @@ -1232,8 +1232,8 @@ def main(a: datetime, b: bytes): cache_ttl: None, cache_ignore_s3_path: None, dedicated_worker: None, - concurrency_settings: windmill_common::jobs::ConcurrencySettings::default().into(), - debouncing_settings: windmill_common::jobs::DebouncingSettings::default(), + concurrency_settings: windmill_common::runnable_settings::ConcurrencySettings::default().into(), + debouncing_settings: windmill_common::runnable_settings::DebouncingSettings::default(), })) .arg("a", json!("2024-09-24T10:00:00.000Z")) .arg("b", json!("dGVzdA==")) diff --git a/backend/windmill-api/src/flows.rs b/backend/windmill-api/src/flows.rs index bdc322f43e9ab..c68d21a6bf204 100644 --- a/backend/windmill-api/src/flows.rs +++ b/backend/windmill-api/src/flows.rs @@ -32,6 +32,7 @@ use sql_builder::prelude::*; use sqlx::{FromRow, Postgres, Transaction}; use windmill_audit::audit_oss::audit_log; use windmill_audit::ActionKind; +use windmill_common::runnable_settings::RunnableSettingsTrait; use windmill_common::utils::{query_elems_from_hub, WarnAfterExt}; use windmill_common::worker::{to_raw_value, CLOUD_HOSTED, MIN_VERSION_SUPPORTS_DEBOUNCING}; use windmill_common::HUB_BASE_URL; @@ -1601,7 +1602,9 @@ mod tests { ConstantDelay, ExponentialDelay, FlowModule, FlowModuleValue, FlowValue, InputTransform, Retry, StopAfterIf, }, - jobs::{ConcurrencySettings, ConcurrencySettingsWithCustom, DebouncingSettings}, + runnable_settings::{ + ConcurrencySettings, ConcurrencySettingsWithCustom, DebouncingSettings, + }, scripts, }; diff --git a/backend/windmill-api/src/jobs.rs b/backend/windmill-api/src/jobs.rs index 82a288abe3267..e84fa25ad7d8f 100644 --- a/backend/windmill-api/src/jobs.rs +++ b/backend/windmill-api/src/jobs.rs @@ -36,10 +36,13 @@ use windmill_common::flow_conversations::add_message_to_conversation_tx; use windmill_common::flow_status::{JobResult, RestartedFrom}; use windmill_common::jobs::{ check_tag_available_for_workspace_internal, format_completed_job_result, format_result, - ConcurrencySettings, ConcurrencySettingsWithCustom, DebouncingSettings, DynamicInput, - JobTriggerKind, RunInlinePreviewScriptFnParams, ENTRYPOINT_OVERRIDE, + DynamicInput, JobTriggerKind, RunInlinePreviewScriptFnParams, ENTRYPOINT_OVERRIDE, +}; +use windmill_common::runnable_settings::{ + ConcurrencySettings, ConcurrencySettingsWithCustom, DebouncingSettings, RunnableSettings, }; use windmill_common::s3_helpers::{upload_artifact_to_store, BundleFormat}; +use windmill_common::scripts::ScriptRunnableSettingsInline; use windmill_common::triggers::TriggerMetadata; use windmill_common::utils::{RunnableKind, WarnAfterExt}; use windmill_common::worker::{Connection, CLOUD_HOSTED, TMP_DIR}; @@ -891,7 +894,7 @@ macro_rules! get_job_query { get_job_query!( @impl "v2_job_queue", ($($opts)*), "scheduled_for, running, ping as last_ping, suspend, suspend_until, same_worker, pre_run_error, visible_to_owner, \ - flow_innermost_root_job AS root_job, flow_leaf_jobs AS leaf_jobs, concurrent_limit, concurrency_time_window_s, timeout, flow_step_id, cache_ttl, cache_ignore_s3_path, \ + flow_innermost_root_job AS root_job, flow_leaf_jobs AS leaf_jobs, concurrent_limit, concurrency_time_window_s, timeout, flow_step_id, cache_ttl, cache_ignore_s3_path, runnable_settings_handle, \ script_entrypoint_override", "LEFT JOIN v2_job_runtime ON v2_job_runtime.id = v2_job_queue.id LEFT JOIN v2_job_status ON v2_job_status.id = v2_job_queue.id", ) @@ -3373,6 +3376,7 @@ pub struct UnifiedJob { pub aggregate_wait_time_ms: Option, pub preprocessed: Option, pub worker: Option, + pub runnable_settings_handle: Option, } const CJ_FIELDS: &[&str] = &[ @@ -3413,6 +3417,7 @@ const CJ_FIELDS: &[&str] = &[ "aggregate_wait_time_ms", "v2_job.preprocessed", "v2_job_completed.worker", + "null as runnable_settings_handle", ]; const QJ_FIELDS: &[&str] = &[ @@ -3453,6 +3458,7 @@ const QJ_FIELDS: &[&str] = &[ "aggregate_wait_time_ms", "v2_job.preprocessed", "v2_job_queue.worker", + "v2_job_queue.runnable_settings_handle", ]; impl UnifiedJob { @@ -3517,14 +3523,13 @@ impl<'a> From for Job { created_by: uj.created_by, created_at: uj.created_at, started_at: uj.started_at, + scheduled_for: uj.scheduled_for.unwrap(), + running: uj.running.unwrap(), script_hash: uj.script_hash, script_path: uj.script_path, + script_entrypoint_override: None, args: None, - running: uj.running.unwrap(), - scheduled_for: uj.scheduled_for.unwrap(), logs: None, - flow_status: None, - workflow_as_code_status: None, canceled: uj.canceled, canceled_by: uj.canceled_by, canceled_reason: None, @@ -3532,9 +3537,10 @@ impl<'a> From for Job { job_kind: uj.job_kind, schedule_path: uj.schedule_path, permissioned_as: uj.permissioned_as, + flow_status: None, + workflow_as_code_status: None, is_flow_step: uj.is_flow_step, language: uj.language, - script_entrypoint_override: None, same_worker: false, pre_run_error: None, email: uj.email, @@ -3552,6 +3558,7 @@ impl<'a> From for Job { cache_ignore_s3_path: None, priority: uj.priority, preprocessed: uj.preprocessed, + runnable_settings_handle: uj.runnable_settings_handle, }, )), t => panic!("job type {} not valid", t), @@ -4654,6 +4661,13 @@ pub async fn run_workflow_as_code( let job = not_found_if_none(job, "Queued Job", &job_id.to_string())?; let JobExtended { inner: job, raw_code, raw_lock, .. } = job; + + let (_debouncing_settings, concurrency_settings) = + RunnableSettings::from_runnable_settings_handle(job.runnable_settings_handle, &db) + .await? + .prefetch_cached(&db) + .await?; + let (job_payload, tag, _delete_after_use, timeout, on_behalf_of) = match job.job_kind { JobKind::Preview => ( JobPayload::Code(RawCode { @@ -4662,13 +4676,15 @@ pub async fn run_workflow_as_code( path: job.script_path, language: job.language.unwrap_or_else(|| ScriptLang::Deno), lock: raw_lock, - concurrency_settings: windmill_common::jobs::ConcurrencySettingsWithCustom { - custom_concurrency_key: windmill_queue::custom_concurrency_key(&db, &job.id) - .await - .map_err(to_anyhow)?, - concurrent_limit: job.concurrent_limit, - concurrency_time_window_s: job.concurrency_time_window_s, - }, + concurrency_settings: concurrency_settings + .maybe_fallback( + windmill_queue::custom_concurrency_key(&db, &job.id) + .await + .map_err(to_anyhow)?, + job.concurrent_limit, + job.concurrency_time_window_s, + ) + .into(), cache_ttl: job.cache_ttl, cache_ignore_s3_path: job.cache_ignore_s3_path, dedicated_worker: None, @@ -5489,11 +5505,6 @@ pub async fn run_wait_result_script_by_hash( let ScriptHashInfo { path, tag, - concurrency_key, - concurrent_limit, - concurrency_time_window_s, - debounce_key, - debounce_delay_s, mut cache_ttl, mut cache_ignore_s3_path, language, @@ -5504,8 +5515,14 @@ pub async fn run_wait_result_script_by_hash( has_preprocessor, on_behalf_of_email, created_by, + runnable_settings: + ScriptRunnableSettingsInline { concurrency_settings, debouncing_settings }, .. - } = get_script_info_for_hash(Some(userdb_authed), &db, &w_id, hash).await?; + } = get_script_info_for_hash(Some(userdb_authed), &db, &w_id, hash) + .await? + .prefetch_cached(&db) + .await?; + if let Some(run_query_cache_ttl) = run_query.cache_ttl { cache_ttl = Some(run_query_cache_ttl); cache_ignore_s3_path = run_query.cache_ignore_s3_path; @@ -5539,17 +5556,8 @@ pub async fn run_wait_result_script_by_hash( JobPayload::ScriptHash { hash: ScriptHash(hash), path: path, - concurrency_settings: windmill_common::jobs::ConcurrencySettingsWithCustom { - custom_concurrency_key: concurrency_key, - concurrent_limit: concurrent_limit, - concurrency_time_window_s: concurrency_time_window_s, - } - .into(), - debouncing_settings: DebouncingSettings { - custom_key: debounce_key, - delay_s: debounce_delay_s, - ..Default::default() // TODO - }, + concurrency_settings, + debouncing_settings, cache_ttl, cache_ignore_s3_path, language, @@ -6492,9 +6500,15 @@ async fn add_batch_jobs( job_kind, language, dedicated_worker, - custom_concurrency_key, - concurrent_limit, - concurrent_time_window_s, + ScriptRunnableSettingsInline { + concurrency_settings: + ConcurrencySettings { + concurrent_limit, + concurrency_time_window_s, + concurrency_key: custom_concurrency_key, + }, + .. + }, timeout, raw_code, raw_lock, @@ -6507,23 +6521,22 @@ async fn add_batch_jobs( UserDbWithAuthed { db: user_db.clone(), authed: &authed.to_authed_ref() }; let ScriptHashInfo { hash: script_hash, - concurrency_key, - concurrent_limit, - concurrency_time_window_s, language, dedicated_worker, timeout, + runnable_settings, .. // TODO: consider on_behalf_of_email and created_by for batch jobs - } = get_latest_deployed_hash_for_path(Some(db_authed), db.clone(), &w_id, &path).await?; + } = get_latest_deployed_hash_for_path(Some(db_authed), db.clone(), &w_id, &path) + .await? + .prefetch_cached(&db) + .await?; ( Some(script_hash), Some(path), JobKind::Script, Some(language), dedicated_worker, - concurrency_key, - concurrent_limit, - concurrency_time_window_s, + runnable_settings, timeout, None, None, @@ -6544,9 +6557,7 @@ async fn add_batch_jobs( JobKind::Preview, rawscript.language, None, - None, - None, - None, + Default::default(), None, Some(rawscript.content), rawscript.lock, @@ -6591,19 +6602,20 @@ async fn add_batch_jobs( add_virtual_items_if_necessary(&mut value.modules); let flow_status = FlowStatus::new(&value); ( - None, // script_hash - path, // script_path - job_kind, // job_kind - None, // language - None, // dedicated_worker - value.concurrency_settings.concurrency_key.clone(), // custom_concurrency_key - value.concurrency_settings.concurrent_limit.clone(), // concurrent_limit - value.concurrency_settings.concurrency_time_window_s, // concurrency_time_window_s - None, // timeout - None, // raw_code - None, // raw_lock - Some(value), // raw_flow - Some(flow_status), // flow_status + None, // script_hash + path, // script_path + job_kind, // job_kind + None, // language + None, // dedicated_worker + ScriptRunnableSettingsInline { + concurrency_settings: value.concurrency_settings.clone(), + ..Default::default() + }, + None, // timeout + None, // raw_code + None, // raw_lock + Some(value), // raw_flow + Some(flow_status), // flow_status ) } "noop" => ( @@ -6612,9 +6624,7 @@ async fn add_batch_jobs( JobKind::Noop, None, None, - None, - None, - None, + Default::default(), None, None, None, @@ -6669,7 +6679,7 @@ async fn add_batch_jobs( username_to_permissioned_as(&authed.username), authed.email, concurrent_limit, - concurrent_time_window_s, + concurrency_time_window_s, timeout, n, ) @@ -7063,11 +7073,8 @@ pub async fn run_job_by_hash_inner( let ScriptHashInfo { path, tag, - concurrency_key, - concurrent_limit, - concurrency_time_window_s, - debounce_delay_s, - debounce_key, + runnable_settings: + ScriptRunnableSettingsInline { concurrency_settings, debouncing_settings }, mut cache_ttl, mut cache_ignore_s3_path, language, @@ -7079,7 +7086,10 @@ pub async fn run_job_by_hash_inner( created_by, delete_after_use, .. - } = get_script_info_for_hash(Some(userdb_authed), &db, &w_id, hash).await?; + } = get_script_info_for_hash(Some(userdb_authed), &db, &w_id, hash) + .await? + .prefetch_cached(&db) + .await?; check_scopes(&authed, || format!("jobs:run:scripts:{path}"))?; if let Some(run_query_cache_ttl) = run_query.cache_ttl { @@ -7115,16 +7125,8 @@ pub async fn run_job_by_hash_inner( JobPayload::ScriptHash { hash: ScriptHash(hash), path: path, - concurrency_settings: ConcurrencySettings { - concurrency_key, - concurrent_limit, - concurrency_time_window_s, - }, - debouncing_settings: DebouncingSettings { - custom_key: debounce_key, - delay_s: debounce_delay_s, - ..Default::default() - }, + concurrency_settings, + debouncing_settings, cache_ttl, cache_ignore_s3_path, language, diff --git a/backend/windmill-api/src/scripts.rs b/backend/windmill-api/src/scripts.rs index 272f132b7e642..aa9470366f430 100644 --- a/backend/windmill-api/src/scripts.rs +++ b/backend/windmill-api/src/scripts.rs @@ -42,9 +42,12 @@ use windmill_worker::{process_relative_imports, scoped_dependency_map::ScopedDep use windmill_common::{ assets::{clear_asset_usage, insert_asset_usage, AssetUsageKind, AssetWithAltAccessType}, - error::to_anyhow, + error::{self, to_anyhow}, + runnable_settings::{ + min_version_supports_runnable_settings_v0, RunnableSettings, RunnableSettingsTrait, + }, s3_helpers::upload_artifact_to_store, - scripts::hash_script, + scripts::{hash_script, ScriptRunnableSettingsHandle, ScriptRunnableSettingsInline}, utils::{paginate_without_limits, WarnAfterExt}, worker::{CLOUD_HOSTED, MIN_VERSION_SUPPORTS_DEBOUNCING}, }; @@ -71,7 +74,7 @@ use windmill_queue::{schedule::push_scheduled_job, PushIsolationLevel}; const MAX_HASH_HISTORY_LENGTH_STORED: usize = 20; #[derive(Serialize, sqlx::FromRow)] -pub struct ScriptWDraft { +pub struct ScriptWDraft { pub hash: ScriptHash, pub path: String, pub summary: String, @@ -88,10 +91,6 @@ pub struct ScriptWDraft { #[serde(skip_serializing_if = "Option::is_none")] pub envs: Option>, #[serde(skip_serializing_if = "Option::is_none")] - pub concurrent_limit: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub concurrency_time_window_s: Option, - #[serde(skip_serializing_if = "Option::is_none")] pub cache_ttl: Option, #[serde(skip_serializing_if = "Option::is_none")] pub cache_ignore_s3_path: Option, @@ -108,8 +107,6 @@ pub struct ScriptWDraft { #[serde(skip_serializing_if = "Option::is_none")] pub timeout: Option, #[serde(skip_serializing_if = "Option::is_none")] - pub concurrency_key: Option, - #[serde(skip_serializing_if = "Option::is_none")] pub visible_to_runner_only: Option, #[serde(skip_serializing_if = "Option::is_none")] pub no_main_func: Option, @@ -120,10 +117,64 @@ pub struct ScriptWDraft { #[serde(skip_serializing_if = "Option::is_none")] #[sqlx(json(nullable))] pub assets: Option>, - #[serde(skip_serializing_if = "Option::is_none")] - pub debounce_key: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub debounce_delay_s: Option, + #[serde(flatten)] + #[sqlx(flatten)] + pub runnable_settings: SR, +} + +impl ScriptWDraft { + pub async fn prefetch_cached<'a>( + self, + db: &DB, + ) -> error::Result> { + let (debouncing_settings, concurrency_settings) = + RunnableSettings::from_runnable_settings_handle( + self.runnable_settings.runnable_settings_handle, + db, + ) + .await? + .prefetch_cached(db) + .await?; + + Ok(ScriptWDraft { + runnable_settings: ScriptRunnableSettingsInline { + concurrency_settings: concurrency_settings.maybe_fallback( + self.runnable_settings.concurrency_key, + self.runnable_settings.concurrent_limit, + self.runnable_settings.concurrency_time_window_s, + ), + debouncing_settings: debouncing_settings.maybe_fallback( + self.runnable_settings.debounce_key, + self.runnable_settings.debounce_delay_s, + ), + }, + hash: self.hash, + path: self.path, + summary: self.summary, + description: self.description, + content: self.content, + language: self.language, + kind: self.kind, + tag: self.tag, + draft: self.draft, + schema: self.schema, + draft_only: self.draft_only, + envs: self.envs, + cache_ttl: self.cache_ttl, + cache_ignore_s3_path: self.cache_ignore_s3_path, + dedicated_worker: self.dedicated_worker, + ws_error_handler_muted: self.ws_error_handler_muted, + priority: self.priority, + restart_unless_cancelled: self.restart_unless_cancelled, + delete_after_use: self.delete_after_use, + timeout: self.timeout, + visible_to_runner_only: self.visible_to_runner_only, + no_main_func: self.no_main_func, + has_preprocessor: self.has_preprocessor, + on_behalf_of_email: self.on_behalf_of_email, + assets: self.assets, + }) + } } pub fn global_service() -> Router { @@ -589,7 +640,7 @@ async fn create_script_internal<'c>( .to_owned(), )); }; - let clashing_script = sqlx::query_as::<_, Script>( + let clashing_script = sqlx::query_as::<_, Script>( "SELECT * FROM script WHERE path = $1 AND archived = false AND workspace_id = $2", ) .bind(&ns.path) @@ -790,6 +841,31 @@ async fn create_script_internal<'c>( } }; + let runnable_settings_handle = RunnableSettings { + debouncing_settings: ns.debouncing_settings.insert_cached(&db).await?, + concurrency_settings: ns.concurrency_settings.insert_cached(&db).await?, + } + .insert_cached(&db) + .await?; + + let ( + guarded_concurrent_limit, + guarded_concurrency_time_window_s, + guarded_concurrency_key, + guarded_debounce_key, + guarded_debounce_delay_s, + ) = if min_version_supports_runnable_settings_v0().await { + Default::default() + } else { + ( + ns.concurrency_settings.concurrent_limit.clone(), + ns.concurrency_settings.concurrency_time_window_s.clone(), + ns.concurrency_settings.concurrency_key.clone(), + ns.debouncing_settings.debounce_key.clone(), + ns.debouncing_settings.debounce_delay_s.clone(), + ) + }; + // Row lock debounce key for path. We need this to make all updates of runnables sequential and predictable. tokio::time::timeout( core::time::Duration::from_secs(60), @@ -803,8 +879,8 @@ async fn create_script_internal<'c>( content, created_by, schema, is_template, extra_perms, lock, language, kind, tag, \ draft_only, envs, concurrent_limit, concurrency_time_window_s, cache_ttl, \ dedicated_worker, ws_error_handler_muted, priority, restart_unless_cancelled, \ - delete_after_use, timeout, concurrency_key, visible_to_runner_only, no_main_func, codebase, has_preprocessor, on_behalf_of_email, schema_validation, assets, debounce_key, debounce_delay_s, cache_ignore_s3_path) \ - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9::text::json, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, $29, $30, $31, $32, $33, $34, $35, $36, $37)", + delete_after_use, timeout, concurrency_key, visible_to_runner_only, no_main_func, codebase, has_preprocessor, on_behalf_of_email, schema_validation, assets, debounce_key, debounce_delay_s, cache_ignore_s3_path, runnable_settings_handle) \ + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9::text::json, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, $29, $30, $31, $32, $33, $34, $35, $36, $37, $38)", &w_id, &hash.0, ns.path, @@ -822,8 +898,8 @@ async fn create_script_internal<'c>( ns.tag, ns.draft_only, envs, - ns.concurrent_limit, - ns.concurrency_time_window_s, + guarded_concurrent_limit, + guarded_concurrency_time_window_s, ns.cache_ttl, ns.dedicated_worker, ns.ws_error_handler_muted.unwrap_or(false), @@ -831,7 +907,7 @@ async fn create_script_internal<'c>( ns.restart_unless_cancelled, ns.delete_after_use, ns.timeout, - ns.concurrency_key, + guarded_concurrency_key, ns.visible_to_runner_only, no_main_func.filter(|x| *x), // should be Some(true) or None codebase, @@ -843,9 +919,10 @@ async fn create_script_internal<'c>( }, validate_schema, ns.assets.as_ref().and_then(|a| serde_json::to_value(a).ok()), - ns.debounce_key, - ns.debounce_delay_s, + guarded_debounce_key, + guarded_debounce_delay_s, ns.cache_ignore_s3_path, + runnable_settings_handle ) .execute(&mut *tx) .await?; @@ -1173,18 +1250,20 @@ pub async fn pick_hub_script_by_path( Ok::<_, Error>((status_code, headers, response)) } +#[axum::debug_handler] async fn get_script_by_path( authed: ApiAuthed, Extension(user_db): Extension, + Extension(db): Extension, Path((w_id, path)): Path<(String, StripPath)>, Query(query): Query, -) -> JsonResult { +) -> JsonResult> { let path = path.to_path(); check_scopes(&authed, || format!("scripts:read:{}", path))?; let mut tx = user_db.begin(&authed).await?; let script_o = if query.with_starred_info.unwrap_or(false) { - sqlx::query_as::<_, ScriptWithStarred>( + sqlx::query_as::<_, ScriptWithStarred>( "SELECT s.*, favorite.path IS NOT NULL as starred FROM script s LEFT JOIN favorite @@ -1202,7 +1281,7 @@ async fn get_script_by_path( .fetch_optional(&mut *tx) .await? } else { - sqlx::query_as::<_, ScriptWithStarred>( + sqlx::query_as::<_, ScriptWithStarred>( "SELECT *, NULL as starred FROM script WHERE path = $1 AND workspace_id = $2 ORDER BY created_at DESC LIMIT 1", ) .bind(path) @@ -1212,7 +1291,10 @@ async fn get_script_by_path( }; tx.commit().await?; - let script = not_found_if_none(script_o, "Script", path)?; + let script = not_found_if_none(script_o, "Script", path)? + .prefetch_cached(&db) + .await?; + Ok(Json(script)) } @@ -1234,15 +1316,16 @@ async fn get_triggers_count( async fn get_script_by_path_w_draft( authed: ApiAuthed, + Extension(db): Extension, Extension(user_db): Extension, Path((w_id, path)): Path<(String, StripPath)>, -) -> JsonResult { +) -> JsonResult> { let path = path.to_path(); check_scopes(&authed, || format!("scripts:read:{}", path))?; let mut tx = user_db.begin(&authed).await?; - let script_o = sqlx::query_as::<_, ScriptWDraft>( - "SELECT hash, script.path, summary, description, content, language, kind, tag, schema, draft_only, envs, concurrent_limit, concurrency_time_window_s, cache_ttl, cache_ignore_s3_path, ws_error_handler_muted, draft.value as draft, dedicated_worker, priority, restart_unless_cancelled, delete_after_use, timeout, concurrency_key, visible_to_runner_only, no_main_func, has_preprocessor, on_behalf_of_email, assets, debounce_key, debounce_delay_s FROM script LEFT JOIN draft ON + let script_o = sqlx::query_as::<_, ScriptWDraft>( + "SELECT hash, script.path, summary, description, content, language, kind, tag, schema, draft_only, envs, runnable_settings_handle, concurrent_limit, concurrency_time_window_s, cache_ttl, cache_ignore_s3_path, ws_error_handler_muted, draft.value as draft, dedicated_worker, priority, restart_unless_cancelled, delete_after_use, timeout, concurrency_key, visible_to_runner_only, no_main_func, has_preprocessor, on_behalf_of_email, assets, debounce_key, debounce_delay_s FROM script LEFT JOIN draft ON script.path = draft.path AND script.workspace_id = draft.workspace_id AND draft.typ = 'script' WHERE script.path = $1 AND script.workspace_id = $2 ORDER BY script.created_at DESC LIMIT 1", @@ -1254,7 +1337,7 @@ async fn get_script_by_path_w_draft( tx.commit().await?; let script = not_found_if_none(script_o, "Script", path)?; - Ok(Json(script)) + Ok(Json(script.prefetch_cached(&db).await?)) } async fn get_script_history( @@ -1686,9 +1769,9 @@ async fn get_script_by_hash_internal<'c>( workspace_id: &str, hash: &ScriptHash, with_starred_info_for_username: Option<&str>, -) -> Result { +) -> Result> { let script_o = if let Some(username) = with_starred_info_for_username { - sqlx::query_as::<_, ScriptWithStarred>( + sqlx::query_as::<_, ScriptWithStarred>( "SELECT s.*, favorite.path IS NOT NULL as starred FROM script s LEFT JOIN favorite @@ -1704,7 +1787,7 @@ async fn get_script_by_hash_internal<'c>( .fetch_optional(&mut **db) .await? } else { - sqlx::query_as::<_, ScriptWithStarred>( + sqlx::query_as::<_, ScriptWithStarred>( "SELECT *, NULL as starred FROM script WHERE hash = $1 AND workspace_id = $2", ) .bind(hash) @@ -1728,7 +1811,7 @@ async fn get_script_by_hash( Query(query): Query, Query(query_auth): Query, Extension(authed): Extension, -) -> JsonResult { +) -> JsonResult> { let mut tx = if query_auth.authed.is_some_and(|x| x) { user_db.begin(&authed).await? } else { @@ -1752,7 +1835,7 @@ async fn get_script_by_hash( tx.commit().await?; - Ok(Json(r)) + Ok(Json(r.prefetch_cached(&db).await?)) } async fn raw_script_by_hash( @@ -1887,12 +1970,13 @@ async fn archive_script_by_path( async fn archive_script_by_hash( authed: ApiAuthed, Extension(user_db): Extension, + Extension(db): Extension, Extension(webhook): Extension, Path((w_id, hash)): Path<(String, ScriptHash)>, -) -> JsonResult