Skip to content

Starlingmonkey timers #798

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions integration-tests/js-compute/fixtures/app/src/cache-core.js
Original file line number Diff line number Diff line change
Expand Up @@ -1990,7 +1990,7 @@ let error;
if (error) { return error }
await sleep(1000);
result = CoreCache.lookup(key).age()
error = assert(result >= 1_000, true, `CoreCache.lookup(key).age() >= 1_000`)
error = assert(result >= 1_000, true, `CoreCache.lookup(key).age() >= 1_000 (${result})`)
if (error) { return error }
return pass("ok")
});
Expand Down Expand Up @@ -2805,7 +2805,7 @@ let error;
writer.append("hello");
writer.close();
const actual = await new Response(reader.body()).text();
let error = assert("hello", actual, `actual === "hello"`);
let error = assert(actual, "hello", `actual === "hello"`);
if (error) { return error }
return pass("ok")
});
Expand Down
18 changes: 18 additions & 0 deletions integration-tests/js-compute/fixtures/app/src/timers.js
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,24 @@ import { routes } from "./routes.js";
if (error) { return error }
return pass()
});
routes.set("/setTimeout/200-ms", async () => {
let controller, start
setTimeout(() => {
const end = Date.now()
controller.enqueue(new TextEncoder().encode(`END\n`))
if (end - start < 190) {
controller.enqueue(new TextEncoder().encode(`ERROR: Timer took ${end - start} instead of 200ms`))
}
controller.close()
}, 200);
return new Response(new ReadableStream({
start(_controller) {
controller = _controller
start = Date.now()
controller.enqueue(new TextEncoder().encode(`START\n`))
}
}))
});
}

// clearInterval
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
[
"GET /cache-entry/age/called-on-instance",
"GET /transaction-cache-entry/insertAndStreamBack/write-to-writer-and-read-from-reader"
]
11 changes: 11 additions & 0 deletions integration-tests/js-compute/fixtures/app/tests.json
Original file line number Diff line number Diff line change
Expand Up @@ -4834,6 +4834,17 @@
"status": 200
}
},
"GET /setTimeout/200-ms": {
"environments": ["viceroy", "compute"],
"downstream_request": {
"method": "GET",
"pathname": "/setTimeout/200-ms"
},
"downstream_response": {
"status": 200,
"body": ["START\nEND\n"]
}
},
"GET /clearInterval/exposed-as-global": {
"environments": ["viceroy", "compute"],
"downstream_request": {
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/js-compute/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ if (!local) {
domain = "http://127.0.0.1:7676"
}

core.startGroup('Check service is up and running')
core.startGroup(`Check service is up and running on ${domain}`)
await retry(10, expBackoff('60s', '30s'), async () => {
const response = await request(domain)
if (response.statusCode !== 200) {
Expand Down
2 changes: 1 addition & 1 deletion runtime/StarlingMonkey
104 changes: 90 additions & 14 deletions runtime/fastly/host-api/host_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,102 @@
#include <algorithm>
#include <arpa/inet.h>

#include <time.h>

using api::FastlyResult;
using fastly::FastlyAPIError;
using host_api::MonotonicClock;
using host_api::Result;

#define NEVER_HANDLE 0xFFFFFFFE

#define MILLISECS_IN_NANOSECS 1000000
#define SECS_IN_NANOSECS 1000000000

void sleep_until(uint64_t time_ns, uint64_t now) {
while (time_ns > now) {
uint64_t duration = time_ns - now;
timespec req{.tv_sec = static_cast<time_t>(duration / SECS_IN_NANOSECS),
.tv_nsec = static_cast<long>(duration % SECS_IN_NANOSECS)};
timespec rem;
nanosleep(&req, &rem);
now = MonotonicClock::now();
}
}

size_t api::AsyncTask::select(std::vector<api::AsyncTask *> *tasks) {
size_t tasks_len = tasks->size();
fastly_compute_at_edge_async_io_handle_t *handles =
new fastly_compute_at_edge_async_io_handle_t[tasks_len];
for (int i = 0; i < tasks_len; i++) {
handles[i] = tasks->at(i)->id();
std::vector<fastly_compute_at_edge_async_io_handle_t> handles;
handles.reserve(tasks_len);
uint64_t now = 0;
uint64_t soonest_deadline = 0;
size_t soonest_deadline_idx = -1;
for (size_t idx = 0; idx < tasks_len; ++idx) {
auto *task = tasks->at(idx);
uint64_t deadline = task->deadline();
// Select for completed task deadlines before performing the task select host call.
if (deadline > 0) {
MOZ_ASSERT(task->id() == NEVER_HANDLE);
if (now == 0) {
now = MonotonicClock::now();
MOZ_ASSERT(now > 0);
}
if (deadline <= now) {
return idx;
}
if (soonest_deadline == 0 || deadline < soonest_deadline) {
soonest_deadline = deadline;
soonest_deadline_idx = idx;
}
} else {
uint32_t handle = task->id();
// Timer task handles are skipped and never passed to the host.
MOZ_ASSERT(handle != NEVER_HANDLE);
handles.push_back(handle);
}
}
fastly_world_list_handle_t hs{.ptr = handles, .len = tasks_len};

// When there are no async tasks, sleep until the deadline
if (handles.size() == 0) {
MOZ_ASSERT(soonest_deadline > 0);
sleep_until(soonest_deadline, now);
return soonest_deadline_idx;
}

fastly_world_list_handle_t hs{.ptr = handles.data(), .len = handles.size()};
fastly_world_option_u32_t ret;
fastly_compute_at_edge_types_error_t err = 0;
if (!fastly_compute_at_edge_async_io_select(&hs, 0, &ret, &err)) {
abort();
} else if (ret.is_some) {
return ret.val;
} else {
abort();

while (true) {
if (!fastly_compute_at_edge_async_io_select(
&hs, (soonest_deadline - now) / MILLISECS_IN_NANOSECS, &ret, &err)) {
abort();
} else if (ret.is_some) {
// The host index will be the index in the list of tasks with the timer tasks filtered out.
// We thus need to offset the host index by any timer tasks appearing before the nth
// non-timer task.
size_t task_idx = 0;
for (size_t idx = 0; idx < tasks_len; ++idx) {
if (tasks->at(idx)->id() != NEVER_HANDLE) {
if (ret.val == task_idx) {
return idx;
}
task_idx++;
}
}
abort();
} else {
// No value case means a timeout, which means soonest_deadline_idx is set.
MOZ_ASSERT(soonest_deadline > 0);
MOZ_ASSERT(soonest_deadline_idx != -1);
// Verify that the task definitely is ready from a time perspective, and if not loop the host
// call again.
now = MonotonicClock::now();
if (soonest_deadline > now) {
continue;
}
return soonest_deadline_idx;
}
}
}

Expand Down Expand Up @@ -96,11 +172,11 @@ Result<uint32_t> Random::get_u32() {
return res;
}

uint64_t MonotonicClock::now() { return 0; }
uint64_t MonotonicClock::now() { return JS_Now() * 1000; }

uint64_t MonotonicClock::resolution() { return 1000000; }
uint64_t MonotonicClock::resolution() { return 1000; }

int32_t MonotonicClock::subscribe(const uint64_t when, const bool absolute) { return 0; }
int32_t MonotonicClock::subscribe(const uint64_t when, const bool absolute) { return NEVER_HANDLE; }

void MonotonicClock::unsubscribe(const int32_t handle_id) {}

Expand Down
Loading