// import type { Readable, Writable } from "node:stream"; // import type { WorkerOptions } from "node:worker_threads"; declare const self: typeof globalThis; type WebWorker = InstanceType; const EventEmitter = require("node:events"); const { throwNotImplemented } = require("../internal/shared"); const { MessageChannel, BroadcastChannel, Worker: WebWorker } = globalThis; const SHARE_ENV = Symbol("nodejs.worker_threads.SHARE_ENV"); const isMainThread = Bun.isMainThread; let [_workerData, _threadId, _receiveMessageOnPort] = $lazy("worker_threads"); type NodeWorkerOptions = import("node:worker_threads").WorkerOptions; const emittedWarnings = new Set(); function emitWarning(type, message) { if (emittedWarnings.has(type)) return; emittedWarnings.add(type); // process.emitWarning(message); // our printing is bad console.warn("[bun] Warning:", message); } function injectFakeEmitter(Class) { function messageEventHandler(event: MessageEvent) { return event.data; } function errorEventHandler(event: ErrorEvent) { return event.error; } const wrappedListener = Symbol("wrappedListener"); function wrapped(run, listener) { const callback = function (event) { return listener(run(event)); }; listener[wrappedListener] = callback; return callback; } function functionForEventType(event, listener) { switch (event) { case "error": case "messageerror": { return wrapped(errorEventHandler, listener); } default: { return wrapped(messageEventHandler, listener); } } } Class.prototype.on = function (event, listener) { this.addEventListener(event, functionForEventType(event, listener)); return this; }; Class.prototype.off = function (event, listener) { if (listener) { this.removeEventListener(event, listener[wrappedListener] || listener); } else { this.removeEventListener(event); } return this; }; Class.prototype.once = function (event, listener) { this.addEventListener(event, functionForEventType(event, listener), { once: true }); return this; }; function EventClass(eventName) { if (eventName === "error" || eventName === "messageerror") { return ErrorEvent; } return MessageEvent; } Class.prototype.emit = function (event, ...args) { this.dispatchEvent(new (EventClass(event))(event, ...args)); return this; }; Class.prototype.prependListener = Class.prototype.on; Class.prototype.prependOnceListener = Class.prototype.once; } const _MessagePort = globalThis.MessagePort; injectFakeEmitter(_MessagePort); const MessagePort = _MessagePort; let resourceLimits = {}; let workerData = _workerData; let threadId = _threadId; function receiveMessageOnPort(port: MessagePort) { let res = _receiveMessageOnPort(port); if (!res) return undefined; return { message: res, }; } function fakeParentPort() { const fake = Object.create(MessagePort.prototype); Object.defineProperty(fake, "onmessage", { get() { return self.onmessage; }, set(value) { self.onmessage = value; }, }); Object.defineProperty(fake, "onmessageerror", { get() { return self.onmessageerror; }, set(value) {}, }); Object.defineProperty(fake, "postMessage", { value(...args: [any, any]) { return self.postMessage(...args); }, }); Object.defineProperty(fake, "close", { value() { return process.exit(0); }, }); Object.defineProperty(fake, "start", { value() {}, }); Object.defineProperty(fake, "unref", { value() {}, }); Object.defineProperty(fake, "ref", { value() {}, }); Object.defineProperty(fake, "hasRef", { value() { return false; }, }); Object.defineProperty(fake, "setEncoding", { value() {}, }); Object.defineProperty(fake, "addEventListener", { value: self.addEventListener.bind(self), }); Object.defineProperty(fake, "removeEventListener", { value: self.removeEventListener.bind(self), }); return fake; } let parentPort: MessagePort | null = isMainThread ? null : fakeParentPort(); function getEnvironmentData() { return process.env; } function setEnvironmentData(env: any) { process.env = env; } function markAsUntransferable() { throwNotImplemented("worker_threads.markAsUntransferable"); } function moveMessagePortToContext() { throwNotImplemented("worker_threads.moveMessagePortToContext"); } const unsupportedOptions = [ "eval", "argv", "execArgv", "stdin", "stdout", "stderr", "trackedUnmanagedFds", "resourceLimits", ]; class Worker extends EventEmitter { #worker: WebWorker; #performance; // this is used by wt.Worker.terminate(); // either is the exit code if exited, a promise resolving to the exit code, or undefined if we havent sent .terminate() yet #onExitPromise: Promise | number | undefined = undefined; constructor(filename: string, options: NodeWorkerOptions = {}) { super(); for (const key of unsupportedOptions) { if (key in options) { emitWarning("option." + key, `worker_threads.Worker option "${key}" is not implemented.`); } } this.#worker = new WebWorker(filename, options); this.#worker.addEventListener("close", this.#onClose.bind(this)); this.#worker.addEventListener("error", this.#onError.bind(this)); this.#worker.addEventListener("message", this.#onMessage.bind(this)); this.#worker.addEventListener("messageerror", this.#onMessageError.bind(this)); this.#worker.addEventListener("open", this.#onOpen.bind(this)); } get threadId() { return this.#worker.threadId; } ref() { this.#worker.ref(); } unref() { this.#worker.unref(); } get stdin() { // TODO: return null; } get stdout() { // TODO: return null; } get stderr() { // TODO: return null; } get performance() { return (this.#performance ??= { eventLoopUtilization() { emitWarning("performance", "worker_threads.Worker.performance is not implemented."); return { idle: 0, active: 0, utilization: 0, }; }, }); } terminate() { const onExitPromise = this.#onExitPromise; if (onExitPromise) { return $isPromise(onExitPromise) ? onExitPromise : Promise.resolve(onExitPromise); } const { resolve, promise } = Promise.withResolvers(); this.#worker.addEventListener( "close", event => { resolve(event.code); }, { once: true }, ); this.#worker.terminate(); return (this.#onExitPromise = promise); } postMessage(...args: [any, any]) { return this.#worker.postMessage(...args); } #onClose(e) { this.#onExitPromise = e.code; this.emit("exit", e.code); } #onError(error: ErrorEvent) { this.emit("error", error); } #onMessage(event: MessageEvent) { // TODO: is this right? this.emit("message", event.data); } #onMessageError(event: MessageEvent) { // TODO: is this right? this.emit("messageerror", (event as any).error ?? event.data ?? event); } #onOpen() { this.emit("online"); } async getHeapSnapshot() { throwNotImplemented("worker_threads.Worker.getHeapSnapshot"); } } export default { Worker, workerData, parentPort, resourceLimits, isMainThread, MessageChannel, BroadcastChannel, MessagePort, getEnvironmentData, setEnvironmentData, getHeapSnapshot() { return {}; }, markAsUntransferable, moveMessagePortToContext, receiveMessageOnPort, SHARE_ENV, threadId, }; /fix-crash Unnamed repository; edit this file 'description' to name the repository.
aboutsummaryrefslogtreecommitdiff
AgeCommit message (Collapse)AuthorFilesLines
2023-10-17Remove ancient changelogGravatar Ashcon Partovi 1-11/+0
2023-10-17docs: fix ws.publish (#6558)Gravatar Aral Roca Gomez 1-1/+1
In this example there is no server variable in the context, and here it makes more sense to use ws.publish. It is explained below that once the serve is done, the server.publish can be used.
2023-10-17perf(bun-types): remove needless some call (#6550)Gravatar Mikhail 1-1/+1
2023-10-16fix(runtime): make some things more stable (partial jsc debug build) (#5881)Gravatar dave caruso 116-1446/+1830
* make our debug assertions work * install bun-webkit-debug * more progress * ok * progress... * more debug build stuff * ok * a * asdfghjkl * fix(runtime): fix bad assertion failure in JSBufferList * ok * stuff * upgrade webkit * Update src/bun.js/bindings/JSDOMWrapperCache.h Co-authored-by: Jarred Sumner <jarred@jarredsumner.com> * fix message for colin's changes * okay * fix cjs prototype * implement mainModule * i think this fixes it all --------- Co-authored-by: Jarred Sumner <jarred@jarredsumner.com>
2023-10-16fix(runtime): improve IPC reliability + organization pass on that code (#6475)Gravatar dave caruso 15-98/+266
* dfghj * Handle messages that did not finish * tidy * ok * a * Merge remote-tracking branch 'origin/main' into dave/ipc-fixes * test failures --------- Co-authored-by: Jarred Sumner <jarred@jarredsumner.com>
2023-10-16Simplify getting Set of extentions (#4975)Gravatar Mikhail 1-3/+3
2023-10-16Fix formattingGravatar Ashcon Partovi 1-3/+1
2023-10-16fix(test): when tests run with --only the nested describe blocks `.on… (#5616)Gravatar Igor Shapiro 2-13/+45
2023-10-16perf(node:events): optimize `emit(...)` function (#5485)Gravatar Yannik Schröder 3-11/+132
2023-10-16fix: don't remove content-encoding header from header table (#5743)Gravatar Liz 2-2/+25
Closes #5668
2023-10-16fix(sqlite) Insert .all() does not return an array #5872 (#5946)Gravatar Hugo Galan 2-7/+11
* fixing #5872 * removing useless comment
2023-10-16Fix formattingGravatar Ashcon Partovi 2-5/+4
2023-10-16Fix `Response.statusText` (#6151)Gravatar Chris Toshok 10-238/+269
2023-10-16fix-subprocess-argument-missing (#6407)Gravatar Nicolae-Rares Ailincai 4-2/+40
* fix-subprocess-argument-missing * fix-tests * nitpick, these should === not just be undefined --------- Co-authored-by: dave caruso <me@paperdave.net>
2023-10-16Add type parameter to `expect` (#6128)Gravatar Voldemat 1-3/+3
2023-10-16fix(node:worker_threads): ensure threadId property is exposed on ↵Gravatar Jérôme Benoit 6-15/+75
worker_threads instance (#6521) * fix: ensure threadId property is exposed on worker_threads instance Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com> * fix: rename lazy worker_threads module properties Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com> * fix: add getter for threadId Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com> * test: improve worker_threads UTs Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com> * test: fix lazy loading Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com> * test: fix worker_threads test Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org> * fix: return the worker threadId Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com> * test: refine worker_threads expectation on threadId Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org> --------- Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com> Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
2023-10-16Fix use before define bug in sqliteGravatar Ashcon Partovi 2-5/+5
Fixes #6481
2023-10-16fix(jest): fix toStrictEqual on same URLs (#6528)Gravatar João Alisson 2-13/+16
Fixes #6492
2023-10-16Fix `toHaveBeenCalled` having wrong error signatureGravatar Ashcon Partovi 1-2/+2
Fixes #6527
2023-10-16Fix formattingGravatar Ashcon Partovi 1-2/+1
2023-10-16Add `reusePort` to `Bun.serve` typesGravatar Ashcon Partovi 1-0/+9
2023-10-16Fix `request.url` having incorrect portGravatar Ashcon Partovi 4-1/+92
Fixes #6443
2023-10-16Remove uWebSockets header from Bun.serve responsesGravatar Ashcon Partovi 1-6/+6
2023-10-16Rename some testsGravatar Ashcon Partovi 3-0/+0
2023-10-16Fix #6467Gravatar Ashcon Partovi 2-3/+10
2023-10-16Update InternalModuleRegistryConstants.hGravatar Dylan Conway 1-3/+3
2023-10-16Development -> Contributing (#6538)Gravatar Colin McDonnell 2-1/+1
Co-authored-by: Colin McDonnell <colin@KennyM1.local>
2023-10-14fix(net/tls) fix pg hang on end + hanging on query (#6487)Gravatar Ciro Spaciari 3-8/+36
* fix pg hang on end + hanging on query * remove dummy function * fix node-stream * add test * fix test * return error in test * fix test use once instead of on * fix OOM * generated * 💅 * 💅
2023-10-13fix installing dependencies that match workspace versions (#6494)Gravatar Dylan Conway 4-2/+64
* check if dependency matches workspace version * test * Update lockfile.zig * set resolution to workspace package id
2023-10-13fix lockfile struct padding (#6495)Gravatar Dylan Conway 3-3/+18
* integrity padding * error message for bytes at end of struct