summaryrefslogtreecommitdiff
path: root/packages/integrations/image/src/utils/workerPool.ts
blob: 6f953998e260a64f9767ef340a158fd556f97e98 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
/* tslint-disable ban-types */
import { parentPort, Worker } from 'worker_threads';

function uuid() {
	return Array.from({ length: 16 }, () => Math.floor(Math.random() * 256).toString(16)).join('');
}

interface Job<I> {
	msg: I;
	resolve: (result: any) => void;
	reject: (reason: any) => void;
}

export default class WorkerPool<I, O> {
	public numWorkers: number;
	public jobQueue: TransformStream<Job<I>, Job<I>>;
	public workerQueue: TransformStream<Worker, Worker>;
	public done: Promise<void>;

	constructor(numWorkers: number, workerFile: string) {
		this.numWorkers = numWorkers;
		this.jobQueue = new TransformStream();
		this.workerQueue = new TransformStream();

		const writer = this.workerQueue.writable.getWriter();
		for (let i = 0; i < numWorkers; i++) {
			writer.write(new Worker(workerFile));
		}
		writer.releaseLock();

		this.done = this._readLoop();
	}

	async _readLoop() {
		const reader = this.jobQueue.readable.getReader();
		while (true) {
			const { value, done } = await reader.read();
			if (done) {
				await this._terminateAll();
				return;
			}

			if (!value) {
				throw new Error('Reader did not return any value');
			}

			const { msg, resolve, reject } = value;
			const worker = await this._nextWorker();
			this.jobPromise(worker, msg)
				.then((result) => resolve(result))
				.catch((reason) => reject(reason))
				.finally(() => {
					// Return the worker to the pool
					const writer = this.workerQueue.writable.getWriter();
					writer.write(worker);
					writer.releaseLock();
				});
		}
	}

	async _nextWorker() {
		const reader = this.workerQueue.readable.getReader();
		const { value } = await reader.read();
		reader.releaseLock();
		if (!value) {
			throw new Error('No worker left');
		}

		return value;
	}

	async _terminateAll() {
		for (let n = 0; n < this.numWorkers; n++) {
			const worker = await this._nextWorker();
			worker.terminate();
		}
		this.workerQueue.writable.close();
	}

	async join() {
		this.jobQueue.writable.getWriter().close();
		await this.done;
	}

	dispatchJob(msg: I): Promise<O> {
		return new Promise((resolve, reject) => {
			const writer = this.jobQueue.writable.getWriter();
			writer.write({ msg, resolve, reject });
			writer.releaseLock();
		});
	}

	private jobPromise(worker: Worker, msg: I) {
		return new Promise((resolve, reject) => {
			const id = uuid();
			worker.postMessage({ msg, id });
			worker.on('message', function f({ error, result, id: rid }) {
				if (rid !== id) {
					return;
				}
				if (error) {
					reject(error);
					return;
				}
				worker.off('message', f);
				resolve(result);
			});
		});
	}

	static useThisThreadAsWorker<I, O>(cb: (msg: I) => O) {
		parentPort!.on('message', async (data) => {
			const { msg, id } = data;
			try {
				const result = await cb(msg);
				parentPort!.postMessage({ result, id });
			} catch (e: any) {
				parentPort!.postMessage({ error: e.message, id });
			}
		});
	}
}