Skip to content

Commit 3bf5ed9

Browse files
authored
feat: Allow passing state from tracked threads (#7)
1 parent 31c6f80 commit 3bf5ed9

File tree

8 files changed

+149
-47
lines changed

8 files changed

+149
-47
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@ build/
33
lib/
44
/*.tgz
55
test/yarn.lock
6+
test/package.json

README.md

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ registerThread();
1616
Watchdog thread:
1717

1818
```ts
19-
const { captureStackTrace } = require("@sentry-internal/node-native-stacktrace");
19+
const {
20+
captureStackTrace,
21+
} = require("@sentry-internal/node-native-stacktrace");
2022

2123
const stacks = captureStackTrace();
2224
console.log(stacks);
@@ -87,15 +89,20 @@ In the main or worker threads if you call `registerThread()` regularly, times
8789
are recorded.
8890

8991
```ts
90-
const { registerThread } = require("@sentry-internal/node-native-stacktrace");
92+
const {
93+
registerThread,
94+
threadPoll,
95+
} = require("@sentry-internal/node-native-stacktrace");
96+
97+
registerThread();
9198

9299
setInterval(() => {
93-
registerThread();
100+
threadPoll({ optional_state: "some_value" });
94101
}, 200);
95102
```
96103

97104
In the watchdog thread you can call `getThreadsLastSeen()` to get how long it's
98-
been in milliseconds since each thread registered.
105+
been in milliseconds since each thread polled.
99106

100107
If any thread has exceeded a threshold, you can call `captureStackTrace()` to
101108
get the stack traces for all threads.
@@ -111,11 +118,13 @@ const THRESHOLD = 1000; // 1 second
111118
setInterval(() => {
112119
for (const [thread, time] in Object.entries(getThreadsLastSeen())) {
113120
if (time > THRESHOLD) {
114-
const stacks = captureStackTrace();
115-
const blockedThread = stacks[thread];
121+
const threads = captureStackTrace();
122+
const blockedThread = threads[thread];
123+
const { frames, state } = blockedThread;
116124
console.log(
117125
`Thread '${thread}' blocked more than ${THRESHOLD}ms`,
118-
blockedThread,
126+
frames,
127+
state,
119128
);
120129
}
121130
}

module.cc

Lines changed: 96 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ struct ThreadInfo {
1515
std::string thread_name;
1616
// Last time this thread was seen in milliseconds since epoch
1717
milliseconds last_seen;
18+
// Some JSON serialized state for the thread
19+
std::string state;
1820
};
1921

2022
static std::mutex threads_mutex;
@@ -32,6 +34,12 @@ struct JsStackFrame {
3234
// Type alias for a vector of JsStackFrame
3335
using JsStackTrace = std::vector<JsStackFrame>;
3436

37+
struct ThreadResult {
38+
std::string thread_name;
39+
std::string state;
40+
JsStackTrace stack_frames;
41+
};
42+
3543
// Function to be called when an isolate's execution is interrupted
3644
static void ExecutionInterrupted(Isolate *isolate, void *data) {
3745
auto promise = static_cast<std::promise<JsStackTrace> *>(data);
@@ -91,7 +99,6 @@ void CaptureStackTraces(const FunctionCallbackInfo<Value> &args) {
9199
auto capture_from_isolate = args.GetIsolate();
92100
auto current_context = capture_from_isolate->GetCurrentContext();
93101

94-
using ThreadResult = std::tuple<std::string, JsStackTrace>;
95102
std::vector<std::future<ThreadResult>> futures;
96103

97104
{
@@ -100,35 +107,38 @@ void CaptureStackTraces(const FunctionCallbackInfo<Value> &args) {
100107
if (thread_isolate == capture_from_isolate)
101108
continue;
102109
auto thread_name = thread_info.thread_name;
110+
auto state = thread_info.state;
103111

104112
futures.emplace_back(std::async(
105113
std::launch::async,
106-
[thread_name](Isolate *isolate) -> ThreadResult {
107-
return std::make_tuple(thread_name, CaptureStackTrace(isolate));
114+
[thread_name, state](Isolate *isolate) -> ThreadResult {
115+
return ThreadResult{thread_name, state, CaptureStackTrace(isolate)};
108116
},
109117
thread_isolate));
110118
}
111119
}
112120

113-
Local<Object> result = Object::New(capture_from_isolate);
121+
Local<Object> output = Object::New(capture_from_isolate);
114122

115123
for (auto &future : futures) {
116-
auto [thread_name, frames] = future.get();
117-
auto key = String::NewFromUtf8(capture_from_isolate, thread_name.c_str(),
118-
NewStringType::kNormal)
119-
.ToLocalChecked();
120-
121-
Local<Array> jsFrames = Array::New(capture_from_isolate, frames.size());
122-
for (size_t i = 0; i < frames.size(); ++i) {
123-
const auto &f = frames[i];
124+
auto result = future.get();
125+
auto key =
126+
String::NewFromUtf8(capture_from_isolate, result.thread_name.c_str(),
127+
NewStringType::kNormal)
128+
.ToLocalChecked();
129+
130+
Local<Array> jsFrames =
131+
Array::New(capture_from_isolate, result.stack_frames.size());
132+
for (size_t i = 0; i < result.stack_frames.size(); ++i) {
133+
const auto &frame = result.stack_frames[i];
124134
Local<Object> frameObj = Object::New(capture_from_isolate);
125135
frameObj
126136
->Set(current_context,
127137
String::NewFromUtf8(capture_from_isolate, "function",
128138
NewStringType::kInternalized)
129139
.ToLocalChecked(),
130140
String::NewFromUtf8(capture_from_isolate,
131-
f.function_name.c_str(),
141+
frame.function_name.c_str(),
132142
NewStringType::kNormal)
133143
.ToLocalChecked())
134144
.Check();
@@ -137,7 +147,8 @@ void CaptureStackTraces(const FunctionCallbackInfo<Value> &args) {
137147
String::NewFromUtf8(capture_from_isolate, "filename",
138148
NewStringType::kInternalized)
139149
.ToLocalChecked(),
140-
String::NewFromUtf8(capture_from_isolate, f.filename.c_str(),
150+
String::NewFromUtf8(capture_from_isolate,
151+
frame.filename.c_str(),
141152
NewStringType::kNormal)
142153
.ToLocalChecked())
143154
.Check();
@@ -146,23 +157,52 @@ void CaptureStackTraces(const FunctionCallbackInfo<Value> &args) {
146157
String::NewFromUtf8(capture_from_isolate, "lineno",
147158
NewStringType::kInternalized)
148159
.ToLocalChecked(),
149-
Integer::New(capture_from_isolate, f.lineno))
160+
Integer::New(capture_from_isolate, frame.lineno))
150161
.Check();
151162
frameObj
152163
->Set(current_context,
153164
String::NewFromUtf8(capture_from_isolate, "colno",
154165
NewStringType::kInternalized)
155166
.ToLocalChecked(),
156-
Integer::New(capture_from_isolate, f.colno))
167+
Integer::New(capture_from_isolate, frame.colno))
157168
.Check();
158169
jsFrames->Set(current_context, static_cast<uint32_t>(i), frameObj)
159170
.Check();
160171
}
161172

162-
result->Set(current_context, key, jsFrames).Check();
173+
// Create a thread object with a 'frames' property and optional 'state'
174+
Local<Object> threadObj = Object::New(capture_from_isolate);
175+
threadObj
176+
->Set(current_context,
177+
String::NewFromUtf8(capture_from_isolate, "frames",
178+
NewStringType::kInternalized)
179+
.ToLocalChecked(),
180+
jsFrames)
181+
.Check();
182+
183+
if (!result.state.empty()) {
184+
v8::MaybeLocal<v8::String> stateStr = v8::String::NewFromUtf8(
185+
capture_from_isolate, result.state.c_str(), NewStringType::kNormal);
186+
if (!stateStr.IsEmpty()) {
187+
v8::MaybeLocal<v8::Value> maybeStateVal =
188+
v8::JSON::Parse(current_context, stateStr.ToLocalChecked());
189+
v8::Local<v8::Value> stateVal;
190+
if (maybeStateVal.ToLocal(&stateVal)) {
191+
threadObj
192+
->Set(current_context,
193+
String::NewFromUtf8(capture_from_isolate, "state",
194+
NewStringType::kInternalized)
195+
.ToLocalChecked(),
196+
stateVal)
197+
.Check();
198+
}
199+
}
200+
}
201+
202+
output->Set(current_context, key, threadObj).Check();
163203
}
164204

165-
args.GetReturnValue().Set(result);
205+
args.GetReturnValue().Set(output);
166206
}
167207

168208
// Cleanup function to remove the thread from the map when the isolate is
@@ -194,13 +234,39 @@ void RegisterThread(const FunctionCallbackInfo<Value> &args) {
194234
std::lock_guard<std::mutex> lock(threads_mutex);
195235
auto found = threads.find(isolate);
196236
if (found == threads.end()) {
197-
threads.emplace(isolate, ThreadInfo{thread_name, milliseconds::zero()});
237+
threads.emplace(isolate,
238+
ThreadInfo{thread_name, milliseconds::zero(), ""});
198239
// Register a cleanup hook to remove this thread when the isolate is
199240
// destroyed
200241
node::AddEnvironmentCleanupHook(isolate, Cleanup, isolate);
242+
}
243+
}
244+
}
245+
246+
// Function to track a thread and set its state
247+
void ThreadPoll(const FunctionCallbackInfo<Value> &args) {
248+
auto isolate = args.GetIsolate();
249+
auto context = isolate->GetCurrentContext();
250+
251+
std::string state_str;
252+
if (args.Length() == 1 && args[0]->IsValue()) {
253+
MaybeLocal<String> maybe_json = v8::JSON::Stringify(context, args[0]);
254+
if (!maybe_json.IsEmpty()) {
255+
v8::String::Utf8Value utf8_state(isolate, maybe_json.ToLocalChecked());
256+
state_str = *utf8_state ? *utf8_state : "";
201257
} else {
258+
state_str = "";
259+
}
260+
} else {
261+
state_str = "";
262+
}
263+
264+
{
265+
std::lock_guard<std::mutex> lock(threads_mutex);
266+
auto found = threads.find(isolate);
267+
if (found != threads.end()) {
202268
auto &thread_info = found->second;
203-
thread_info.thread_name = thread_name;
269+
thread_info.state = state_str;
204270
thread_info.last_seen =
205271
duration_cast<milliseconds>(system_clock::now().time_since_epoch());
206272
}
@@ -257,6 +323,16 @@ NODE_MODULE_INITIALIZER(Local<Object> exports, Local<Value> module,
257323
.ToLocalChecked())
258324
.Check();
259325

326+
exports
327+
->Set(context,
328+
String::NewFromUtf8(isolate, "threadPoll",
329+
NewStringType::kInternalized)
330+
.ToLocalChecked(),
331+
FunctionTemplate::New(isolate, ThreadPoll)
332+
->GetFunction(context)
333+
.ToLocalChecked())
334+
.Check();
335+
260336
exports
261337
->Set(context,
262338
String::NewFromUtf8(isolate, "getThreadsLastSeen",

src/index.ts

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,11 @@ const arch = process.env['BUILD_ARCH'] || _arch();
1111
const abi = getAbi(versions.node, 'node');
1212
const identifier = [platform, arch, stdlib, abi].filter(c => c !== undefined && c !== null).join('-');
1313

14+
type Thread<S = unknown> = {
15+
frames: StackFrame[];
16+
state?: S
17+
}
18+
1419
type StackFrame = {
1520
function: string;
1621
filename: string;
@@ -20,7 +25,8 @@ type StackFrame = {
2025

2126
interface Native {
2227
registerThread(threadName: string): void;
23-
captureStackTrace(): Record<string, StackFrame[]>;
28+
threadPoll(state?: object): void;
29+
captureStackTrace<S = unknown>(): Record<string, Thread<S>>;
2430
getThreadsLastSeen(): Record<string, number>;
2531
}
2632

@@ -177,11 +183,24 @@ export function registerThread(threadName: string = String(threadId)): void {
177183
native.registerThread(threadName);
178184
}
179185

186+
/**
187+
* Tells the native module that the thread is still running and updates the state.
188+
*
189+
* @param state Optional state to pass to the native module.
190+
*/
191+
export function threadPoll(state?: object): void {
192+
if (typeof state === 'object') {
193+
native.threadPoll(state);
194+
} else {
195+
native.threadPoll();
196+
}
197+
}
198+
180199
/**
181200
* Captures stack traces for all registered threads.
182201
*/
183-
export function captureStackTrace(): Record<string, StackFrame[]> {
184-
return native.captureStackTrace();
202+
export function captureStackTrace<S = unknown>(): Record<string, Thread<S>> {
203+
return native.captureStackTrace<S>();
185204
}
186205

187206
/**

test/e2e.test.mjs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ describe('e2e Tests', { timeout: 20000 }, () => {
1313

1414
const stacks = JSON.parse(result.stdout.toString());
1515

16-
expect(stacks['0']).toEqual(expect.arrayContaining([
16+
expect(stacks['0'].frames).toEqual(expect.arrayContaining([
1717
{
1818
function: 'pbkdf2Sync',
1919
filename: expect.any(String),
@@ -34,7 +34,7 @@ describe('e2e Tests', { timeout: 20000 }, () => {
3434
},
3535
]));
3636

37-
expect(stacks['2']).toEqual(expect.arrayContaining([
37+
expect(stacks['2'].frames).toEqual(expect.arrayContaining([
3838
{
3939
function: 'pbkdf2Sync',
4040
filename: expect.any(String),
@@ -64,7 +64,7 @@ describe('e2e Tests', { timeout: 20000 }, () => {
6464

6565
const stacks = JSON.parse(result.stdout.toString());
6666

67-
expect(stacks['0']).toEqual(expect.arrayContaining([
67+
expect(stacks['0'].frames).toEqual(expect.arrayContaining([
6868
{
6969
function: 'pbkdf2Sync',
7070
filename: expect.any(String),
@@ -85,6 +85,8 @@ describe('e2e Tests', { timeout: 20000 }, () => {
8585
},
8686
]));
8787

88-
expect(stacks['2'].length).toEqual(1);
88+
expect(stacks['0'].state).toEqual({ some_property: 'some_value' });
89+
90+
expect(stacks['2'].frames.length).toEqual(1);
8991
});
9092
});

test/package.json

Lines changed: 0 additions & 7 deletions
This file was deleted.

test/stalled.js

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
const { Worker } = require('node:worker_threads');
22
const { longWork } = require('./long-work.js');
3-
const { registerThread } = require('@sentry-internal/node-native-stacktrace');
3+
const { registerThread, threadPoll } = require('@sentry-internal/node-native-stacktrace');
4+
5+
registerThread();
46

57
setInterval(() => {
6-
registerThread();
8+
threadPoll({ some_property: 'some_value' });
79
}, 200).unref();
810

911
const watchdog = new Worker('./test/stalled-watchdog.js');

test/worker-do-nothing.js

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
const { longWork } = require('./long-work');
2-
const { registerThread } = require('@sentry-internal/node-native-stacktrace');
1+
const { registerThread, threadPoll } = require('@sentry-internal/node-native-stacktrace');
2+
3+
registerThread();
34

45
setInterval(() => {
5-
registerThread();
6+
threadPoll();
67
}, 200);
7-

0 commit comments

Comments
 (0)