Skip to content

#96 Improve takeSnapshot #97

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,17 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [2.5.0] - 2024-09-17
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can remove the changelog, we're using github releases for release notes ;)


### Added

- `restartWorker` option to `takeSnapshot`
- a promise return value to `takeSnapshot` settled to a created file path

### Changed

- `takeSnapshot` will no longer create a heap snapshot when there is not enough memory

## [2.4.0] - 2021-12-03

### Added
Expand Down
19 changes: 17 additions & 2 deletions Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,27 @@ Run CPU Profiler and save result on main process directory

<a name="WorkerNodes+takeSnapshot"></a>

### workerNodes.takeSnapshot() ⇒ <code>void</code>
### workerNodes.takeSnapshot() ⇒ <code>Promise</code>
Take Heap Snapshot and save result on main process directory

The operation will fail when there is not enough memory to create a snapshot.

**Kind**: instance method of [<code>WorkerNodes</code>](#WorkerNodes)
<a name="WorkerNodes+getUsedWorkers"></a>

| Param | Type |
|---------| --- |
| options | <code>TakeSnapshotOptions</code> |

### options.restartWorker : <code>Boolean</code>

Orders a worker that was used to create a snapshot,
to be immediately restarted.
It's recommended to use this option,
because V8 might persist the snapshot in memory until exit.

**Default**: <code>false</code>

### workerNodes.getUsedWorkers() ⇒ <code>Array.&lt;Worker&gt;</code>
Return list with used workers in pool

Expand Down Expand Up @@ -296,4 +311,4 @@ Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
limitations under the License.
8 changes: 8 additions & 0 deletions e2e/fixtures/mock-heap-statistics.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
const v8 = require("v8");

module.exports = function mockHeapStatistics () {
v8.getHeapStatistics = () => ({
heap_size_limit: 100,
used_heap_size: 55,
})
};
49 changes: 37 additions & 12 deletions e2e/v8-profilers.base.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,41 @@ module.exports = function describe(workerType) {
await workerNodes.call('hello!');

// when
workerNodes.takeSnapshot();
const getHeapSnapshotFilename = workerType === "thread" ?
() => fs.readdirSync(process.cwd()).find(name => name.includes('.heapsnapshot') && name.includes(`-${process.pid}-`)) :
() => fs.readdirSync(process.cwd()).find(name => name.includes('.heapsnapshot') && !name.includes(`-${process.pid}-`));
await eventually(() => getHeapSnapshotFilename() !== undefined);
const filePath = await workerNodes.takeSnapshot();

const result = getHeapSnapshotFilename();
t.truthy(result);
t.true(result.length > 0)
fs.unlinkSync(result);
t.regex(filePath, /^HeapSnapshot-\d+-\d+\.heapsnapshot$/);
t.true(fs.existsSync(filePath))
fs.unlinkSync(filePath);
});

test.serial(`should restart worker after taking heap snapshot when restartWorker option was set`, async (t) => {
// given
const workerNodes = new WorkerNodes(fixture('echo-function-async'), { lazyStart: true, workerType });
await workerNodes.ready();
await workerNodes.call('hello!');

const workersBefore = workerNodes.getUsedWorkers();

// when
const filePath = await workerNodes.takeSnapshot({ restartWorker: true });
fs.unlinkSync(filePath);

// Waiting for a worker to restart
await new Promise((resolve) => setTimeout(resolve, 500));

const workersAfter = workerNodes.getUsedWorkers();
t.true(workersBefore.length === workersAfter.length);
t.true(workersBefore[0] !== workersAfter[0]);
});

test.serial(`should let takeSnapshot throw an error when there is not enough heap`, async (t) => {
const workerNodes = new WorkerNodes(fixture('mock-heap-statistics'), { lazyStart: true, workerType });
await workerNodes.ready();
await workerNodes.call();

await t.throwsAsync(workerNodes.takeSnapshot(), {
message: 'Not enough memory to perform heap snapshot'
})
});

test(`should generate heap profiler result file`, async (t) => {
Expand All @@ -34,8 +59,8 @@ module.exports = function describe(workerType) {

await workerNodes.call('hello!');

const getCpuProfileFilename = workerType === "thread" ?
() => fs.readdirSync(process.cwd()).find(name => name.includes('.cpuprofile') && name.includes(`-${process.pid}-`)) :
const getCpuProfileFilename = workerType === "thread" ?
() => fs.readdirSync(process.cwd()).find(name => name.includes('.cpuprofile') && name.includes(`-${process.pid}-`)) :
() => fs.readdirSync(process.cwd()).find(name => name.includes('.cpuprofile') && !name.includes(`-${process.pid}-`));

await eventually(() => getCpuProfileFilename() !== undefined);
Expand All @@ -46,4 +71,4 @@ module.exports = function describe(workerType) {
t.true(result.length > 0)
fs.unlinkSync(result);
});
}
}
6 changes: 5 additions & 1 deletion index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,16 @@ interface Options {
workerType?: "thread" | "process";
}

interface TakeSnapshotOptions {
restartWorker?: boolean;
}

interface WorkerNodesInstance {
call: CallProperty;
ready: () => Promise<WorkerNodesInstance>;
terminate: () => Promise<WorkerNodesInstance>;
profiler: (duration?: number) => void;
takeSnapshot: () => void;
takeSnapshot: (options?: TakeSnapshotOptions) => void;
getUsedWorkers: () => Array<Worker>;
}

Expand Down
14 changes: 8 additions & 6 deletions lib/pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -355,17 +355,19 @@ class WorkerNodes extends EventEmitter {

/**
* Take Heap Snapshot and save result on main process directory
*
* @returns {void}
* @param {TakeSnapshotOptions} options
* @returns {Promise<string>}
*/
takeSnapshot() {
takeSnapshot(options = {}) {
const worker = this.pickWorker();

if (worker) {
worker.takeSnapshot();
return worker.takeSnapshot(options);
} else {
// There might not be availble worker, let it start.
setTimeout(() => this.takeSnapshot(), 500);
return new Promise((resolve) => {
// There might not be available worker, let it start.
setTimeout(() => resolve(this.takeSnapshot()), 500);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we pass the same options to the second call?

Suggested change
setTimeout(() => resolve(this.takeSnapshot()), 500);
setTimeout(() => resolve(this.takeSnapshot(options)), 500);

})
}
}

Expand Down
25 changes: 15 additions & 10 deletions lib/util/get-heap-snapshot.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
const v8 = require('v8');
const fs = require('fs');

const getHeapSnapshot = (callback) => {
const stream = v8.getHeapSnapshot();
const file = fs.createWriteStream(`HeapSnapshot-${process.pid}-${Date.now()}.heapsnapshot`);

stream.on('data', (chunk) => file.write(chunk));
const hasEnoughMemory = () => {
const heapStats = v8.getHeapStatistics();
return heapStats.heap_size_limit >= heapStats.used_heap_size * 2;
}

stream.on('end', () => {
if (callback) { callback('heap snapshot done'); }
});
const getHeapSnapshot = (callback) => {
if (hasEnoughMemory()) {
Copy link

@jedlikowski jedlikowski Sep 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about an early return if not enough memory to avoid having the main function body enclosed in an if statement?

const filePath = v8.writeHeapSnapshot(`HeapSnapshot-${process.pid}-${Date.now()}.heapsnapshot`);
callback(filePath);
} else {
callback(undefined, {
type: 'Error',
message: 'Not enough memory to perform heap snapshot'
});
}
}

module.exports = getHeapSnapshot;
module.exports = getHeapSnapshot;
10 changes: 10 additions & 0 deletions lib/util/promise-with-resolvers.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
const promiseWithResolvers = () => {
let resolve, reject
const promise = new Promise((res, rej) => {
resolve = res
reject = rej
})
return { promise, resolve, reject }
}

module.exports = promiseWithResolvers;
18 changes: 14 additions & 4 deletions lib/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ const EventEmitter = require('events');
const WorkerProcess = require('./worker/process');
const Sequence = require('./util/sequence');
const messages = require('./worker/message');
const promiseWithResolvers = require("./util/promise-with-resolvers");

const ProcessRequest = messages.Request;
const ProcessResponse = messages.Response;
Expand Down Expand Up @@ -133,17 +134,26 @@ class Worker extends EventEmitter {
}

/**
*
* @param {TakeSnapshotOptions} options
* @returns {Promise<string>}
*/
takeSnapshot() {
takeSnapshot(options) {
const { promise, resolve, reject } = promiseWithResolvers();

const cmd = 'takeSnapshot';
this.calls.set(cmd, {
timer: null,
reject: () => {},
resolve: () => {},
reject,
resolve
});

this.process.handle({ cmd });

if (options.restartWorker) {
promise.finally(() => this.stop());
}

return promise;
}
}

Expand Down
3 changes: 2 additions & 1 deletion lib/worker/child-loader.js
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,10 @@ function handleHeapSnapshot(requestData) {
const request = new Request(requestData);
const response = Response.from(request);

getHeapSnapshot((result) => {
getHeapSnapshot((result, error) => {
response.callId = 'takeSnapshot';
response.setResult(result);
response.error = error;
sendMessageToParent(response);
});
}
Expand Down
Loading