Replies: 2 comments
-
|
That's amazing! It's an interesting idea. |
Beta Was this translation helpful? Give feedback.
0 replies
-
|
Oof import type { EventEmitter } from 'node:stream'
import { MessageChannel } from 'node:worker_threads'
import type { MessagePort as _MessagePort } from 'node:worker_threads'
import Piscina from 'piscina'
import type TypedEmitter from 'typed-emitter'
type MessagePortBase<OutgoingMessage> = {
[K in keyof _MessagePort as K extends keyof EventEmitter ? never : K]:
K extends 'postMessage' ? (
(value: OutgoingMessage, transferList?: NonNullable<Parameters<_MessagePort['postMessage']>[1]>) => void
) : _MessagePort[K]
}
export interface MessagePort<OutgoingMessage = never, IncomingMessage = never> extends TypedEmitter<{
close: () => void
message: (value: IncomingMessage) => void
messageerror: (error: Error) => void
}>, MessagePortBase<OutgoingMessage> {}
type Pure<T> = { [K in keyof T]: T[K] } & unknown
type PickValues<T, V> = { [K in keyof T as T[K] extends V ? K : never]: T[K] } & unknown
type PiscinaOptions = NonNullable<ConstructorParameters<typeof Piscina>[0]>
export type SimOptions = Pure<{
[K in keyof PiscinaOptions]: PiscinaOptions[K]
} & {
url: URL
}>
export interface ClientPort<WorkerMessage = never, ClientMessage = never>
extends MessagePort<ClientMessage, WorkerMessage> {}
export interface WorkerPort<WorkerMessage = never, ClientMessage = never>
extends MessagePort<WorkerMessage, ClientMessage> {}
// eslint-disable-next-line @typescript-eslint/consistent-type-definitions
export type WithPort<WorkerMessage = never, ClientMessage = never> = {
messagePort: WorkerPort<WorkerMessage, ClientMessage>
signal: AbortSignal
}
interface AbortablePromise<T>
extends Promise<Awaited<T>> {
abortController: AbortController
}
interface AbortablePromiseWithPort<T, WorkerMessage = never, ClientMessage = never>
extends AbortablePromise<T> {
abortController: AbortController
messagePort: ClientPort<WorkerMessage, ClientMessage>
}
type ToCall<T> =
| T extends (this: infer F extends WithPort<any, any>, ...a: infer A) => infer R ? (
F extends WithPort<infer WorkerMessage, infer ClientMessage> ? (
(...a: A) => AbortablePromiseWithPort<Awaited<R>, WorkerMessage, ClientMessage>
) : never
) : T extends (...a: infer A) => infer R ? (
(...a: A) => AbortablePromise<Awaited<R>>
) : never
type SimPoolModule<T> = {
[K in keyof T as T[K] extends (...a: any) => any ? K : never]: ToCall<T[K]>
} & unknown
type KeepGenericModule<T> = {
[K in keyof T as T[K] extends (...a: any) => any ? K : never]:
T[K] extends (this: WithPort<infer WorkerMessage, infer ClientMessage>, ...a: infer A) => infer R ? (
(callback: (fn: T[K]) => any) => AbortablePromiseWithPort<Awaited<R>, WorkerMessage, ClientMessage>
) : T[K] extends (...a: infer A) => infer R ? (
(callback: (fn: T[K]) => any) => AbortablePromise<Awaited<R>>
) : never
}
type RunOptions = Pure<NonNullable<Parameters<Piscina['run']>[1]>>
export class SimPool<Module> extends Piscina {
#modulePromise: Promise<Module>
#awaitedModule?: Module = undefined
constructor(options: SimOptions) {
super({ filename: options.url.href })
this.#modulePromise = import(options.url.href)
.then((module: Module) => {
this.#awaitedModule = module
return module
})
}
async initSync() {
return this.#modulePromise.then(() => this)
}
/**
* @deprecated use `pool.async.<function>()` or `pool.runExt('<function>', [...args], [...tranferList])`
*/
run(task: any, options?: RunOptions | undefined): Promise<any> {
return super.run(task, options)
}
/**
* Runs the module export in the pool thread
*/
runExt<K extends keyof SimPoolModule<Module>>(
name: K,
args: Parameters<SimPoolModule<Module>[K]>,
options?: {
transferList?: RunOptions['transferList']
abortController?: AbortController
signal?: AbortSignal
thread?: 'pool' | 'current'
thisArg?: Pure<Partial<ThisParameterType<Module[K & keyof Module]>>>
},
): ReturnType<SimPoolModule<Module>[K]> {
const abortController = options?.abortController ?? new AbortController()
const channel = new MessageChannel()
const thisArg = { messagePort: channel.port1, ...options?.thisArg ?? {} }
const signal = options?.signal ?? abortController.signal
const task = { name, args, thisArg, signal }
const runOptions = {
name: 'call',
signal,
transferList: [channel.port1, ...options?.transferList ?? []],
}
const result = options?.thread === 'current'
? (this.module as any as { call(task: object): Promise<any> }).call(task)
: super.run(task, runOptions)
return Object.assign(result, {
abortController,
channel,
messagePort: channel.port2,
}) as never
}
#proxyWithGetter<T extends object>(get: (property: keyof T) => unknown) {
return new Proxy({} as any as T, {
get(target, property) {
return get(property as keyof T)
},
})
}
/**
* The module itself
* - allows generics
* - has constants
* - requires `await this.initSync()`
*/
get module(): { [K in keyof Module]: Module[K] } {
if (!this.#awaitedModule)
throw new Error('should `await pool.initSync()` before using sync calls!')
return this.#awaitedModule
}
/**
* allows to call module exports in current thread
* - is limited to async functions
* - allows generics
* - requires `export const call = callWrap(import('./<current module>'))`
*/
get asyncModule() {
type AsyncModule = PickValues<Module, ((...a: any[]) => Promise<any>)>
return this.#proxyWithGetter<AsyncModule>(
property => (...args: never) => this.runExt(property as any, args),
)
}
/**
* allows to call module exports in current thread
* - requires `export const call = callWrap(import('./<current module>'))`
* - is not compatible with **generics**
*/
get sync(): SimPoolModule<Module> {
return this.#proxyWithGetter<SimPoolModule<Module>>(
property => (...args: never) => this.runExt(property as any, args, { thread: 'current' }),
)
}
/**
* allows to call module exports in pool
* - requires `export const call = callWrap(import('./<current module>'))`
* - is not compatible with **generics**
*/
get async(): SimPoolModule<Module> {
return this.#proxyWithGetter<SimPoolModule<Module>>(
property => (...args: never) => this.runExt(property as any, args),
)
}
/**
* allows to call generic module exports in pool
* - requires `export const call = callWrap(import('./<current module>'))`
* - feels a beet weird
*/
get generic(): KeepGenericModule<Module> {
return this.#proxyWithGetter<KeepGenericModule<Module>>(
(property) => {
// returns (callback: (fn: T[K]) => any) => AbortablePromise<Awaited<R>>
return (callback: (call: Function) => any) => {
let args: any[] = []
callback((...a) => {
args = a
})
return this.runExt(property as any, args as never)
}
})
}
}
export function callWrap<Module extends object>(self: Promise<Module>) {
type SimModule = SimPoolModule<Omit<Module, 'call'>>
return async function call<K extends keyof SimModule>(task: {
name: K & string
args: Parameters<SimModule[K]>[]
thisArg: ThisParameterType<SimModule[K]>
}): Promise<Awaited<ReturnType<Extract<Module[K & keyof Module], (...a: any[]) => any>>>> {
const module = await self
if (!(task.name in module))
throw new Error(`No "${task.name}" exported`)
const fn = module[task.name as keyof Module] as Function
if (typeof fn !== 'function')
throw new Error(`"${task.name}" export was not a function`)
console.log('simcall', fn, task.args)
// eslint-disable-next-line @typescript-eslint/no-unsafe-return
return fn.apply(task.thisArg, task.args)
}
}import type { WithPort } from '../src/worker'
import { callWrap } from '../src/worker'
// REQUIRED
export const call = callWrap(import('./worker.test.2'))
export const cooonst = 'long'
export function summ(...a: number[]): number {
return a.reduce((v, e) => v + e, 0)
}
export function add(a: number, b: number): number
export function add(a: string, b: string): string
export function add(a, b) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-return, @typescript-eslint/restrict-plus-operands
return a + b
}
// eslint-disable-next-line @typescript-eslint/require-await
export async function aidentity<T>(v: T) {
return v
}
export async function read<T>(this: WithPort<never, T>) {
return await new Promise((resolve) => {
this.messagePort.once('message', resolve)
})
}
// eslint-disable-next-line @typescript-eslint/require-await
export async function write<T>(this: WithPort<T, never>, value: T) {
this.messagePort.postMessage(value)
}
export async function ping<I, O>(this: WithPort<O, I>, ping: I, pong: O) {
await new Promise((resolve, reject) => {
this.messagePort.once('message', (value) => {
if (value !== ping) {
reject(new Error(`ping message should be ${JSON.stringify(ping)}}`))
}
else {
this.messagePort.postMessage(pong)
resolve('ok')
}
})
})
}const pool = new SimPool<typeof import('./worker.test.2')>({
url: new URL('./worker.test.2', import.meta.url),
})
await pool.initSync()
console.log(pool.sync.cooonst)
// await pool.runExt('test1', [{ foo: 'bar' }], [])
console.log(pool.async.add)
console.log({
summn: await pool.generic.add(add => add(2, 2)),
summs: await pool.generic.add(add => add('a', 'b')),
})
void pool.runExt('add', ['a', 'b'])
void pool.asyncModule.aidentity(123) |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
I'm trying to implement a typed wrapper
Beta Was this translation helpful? Give feedback.
All reactions