diff options
Diffstat (limited to 'src/js/node')
-rw-r--r-- | src/js/node/async_hooks.ts | 2 | ||||
-rw-r--r-- | src/js/node/child_process.js | 89 | ||||
-rw-r--r-- | src/js/node/fs.js | 2 | ||||
-rw-r--r-- | src/js/node/fs.promises.ts | 35 | ||||
-rw-r--r-- | src/js/node/http.ts | 5 | ||||
-rw-r--r-- | src/js/node/stream.js | 427 | ||||
-rw-r--r-- | src/js/node/v8.ts | 21 |
7 files changed, 508 insertions, 73 deletions
diff --git a/src/js/node/async_hooks.ts b/src/js/node/async_hooks.ts index d04b226f8..ef77b79f7 100644 --- a/src/js/node/async_hooks.ts +++ b/src/js/node/async_hooks.ts @@ -24,10 +24,12 @@ const { cleanupLater, setAsyncHooksEnabled } = $lazy("async_hooks"); function get(): ReadonlyArray<any> | undefined { + $debug("get", $getInternalField($asyncContext, 0)); return $getInternalField($asyncContext, 0); } function set(contextValue: ReadonlyArray<any> | undefined) { + $debug("set", contextValue); return $putInternalField($asyncContext, 0, contextValue); } diff --git a/src/js/node/child_process.js b/src/js/node/child_process.js index 6878f3ca1..859e01aa7 100644 --- a/src/js/node/child_process.js +++ b/src/js/node/child_process.js @@ -1158,6 +1158,9 @@ class ChildProcess extends EventEmitter { const stdio = options.stdio || ["pipe", "pipe", "pipe"]; const bunStdio = getBunStdioFromOptions(stdio); + // TODO: better ipc support + const ipc = $isArray(stdio) && stdio[3] === "ipc"; + var env = options.envPairs || undefined; const detachedOption = options.detached; this.#encoding = options.encoding || undefined; @@ -1182,53 +1185,67 @@ class ChildProcess extends EventEmitter { ); }, lazy: true, + ipc: ipc ? this.#emitIpcMessage.bind(this) : undefined, }); this.pid = this.#handle.pid; onSpawnNT(this); - // const ipc = stdio.ipc; - // const ipcFd = stdio.ipcFd; - // stdio = options.stdio = stdio.stdio; - - // for (i = 0; i < stdio.length; i++) { - // const stream = stdio[i]; - // if (stream.type === "ignore") continue; - - // if (stream.ipc) { - // this._closesNeeded++; - // continue; - // } + if (ipc) { + this.send = this.#send; + this.disconnect = this.#disconnect; + } + } - // // The stream is already cloned and piped, thus stop its readable side, - // // otherwise we might attempt to read from the stream when at the same time - // // the child process does. - // if (stream.type === "wrap") { - // stream.handle.reading = false; - // stream.handle.readStop(); - // stream._stdio.pause(); - // stream._stdio.readableFlowing = false; - // stream._stdio._readableState.reading = false; - // stream._stdio[kIsUsedAsStdio] = true; - // continue; - // } + #emitIpcMessage(message) { + this.emit("message", message); + } - // if (stream.handle) { - // stream.socket = createSocket( - // this.pid !== 0 ? stream.handle : null, - // i > 0 - // ); + #send(message, handle, options, callback) { + if (typeof handle === "function") { + callback = handle; + handle = undefined; + options = undefined; + } else if (typeof options === "function") { + callback = options; + options = undefined; + } else if (options !== undefined) { + if (typeof options !== "object" || options === null) { + throw new ERR_INVALID_ARG_TYPE("options", "Object", options); + } + } - // // Add .send() method and start listening for IPC data - // if (ipc !== undefined) setupChannel(this, ipc, serialization); - } + if (!this.#handle) { + if (callback) { + process.nextTick(callback, new TypeError("Process was closed while trying to send message")); + } else { + this.emit("error", new TypeError("Process was closed while trying to send message")); + } + return false; + } - send() { - console.log("ChildProcess.prototype.send() - Sorry! Not implemented yet"); + // Bun does not handle handles yet + try { + this.#handle.send(message); + if (callback) process.nextTick(callback); + return true; + } catch (error) { + if (callback) { + process.nextTick(callback, error); + } else { + this.emit("error", error); + } + return false; + } } - disconnect() { - console.log("ChildProcess.prototype.disconnect() - Sorry! Not implemented yet"); + #disconnect() { + if (!this.connected) { + this.emit("error", new TypeError("Process was closed while trying to send message")); + return; + } + this.connected = false; + this.#handle.disconnect(); } kill(sig) { diff --git a/src/js/node/fs.js b/src/js/node/fs.js index 2c6b8cfbe..e3630c461 100644 --- a/src/js/node/fs.js +++ b/src/js/node/fs.js @@ -824,7 +824,7 @@ var WriteStreamClass; WriteStream = (function (InternalWriteStream) { WriteStreamClass = InternalWriteStream; Object.defineProperty(WriteStreamClass.prototype, Symbol.toStringTag, { - value: "WritesStream", + value: "WriteStream", enumerable: false, }); diff --git a/src/js/node/fs.promises.ts b/src/js/node/fs.promises.ts index dfc24cae7..8921e42d3 100644 --- a/src/js/node/fs.promises.ts +++ b/src/js/node/fs.promises.ts @@ -1,3 +1,5 @@ +import type { Dirent } from "fs"; + // Hardcoded module "node:fs/promises" const constants = $processBindingConstants.fs; @@ -105,6 +107,37 @@ function cp(src, dest, options) { return fs.cp(src, dest, options.recursive, options.errorOnExist, options.force ?? true, options.mode); } +// TODO: implement this in native code using a Dir Iterator 💀 +// This is currently stubbed for Next.js support. +class Dir { + #entries: Dirent[]; + constructor(e: Dirent[]) { + this.#entries = e; + } + readSync() { + return this.#entries.shift() ?? null; + } + read(c) { + if (c) process.nextTick(c, null, this.readSync()); + return Promise.resolve(this.readSync()); + } + closeSync() {} + close(c) { + if (c) process.nextTick(c); + return Promise.resolve(); + } + *[Symbol.asyncIterator]() { + var next; + while ((next = this.readSync())) { + yield next; + } + } +} +async function opendir(dir: string) { + const entries = await fs.readdir(dir, { withFileTypes: true }); + return new Dir(entries); +} + export default { access: promisify(fs.accessSync), appendFile: promisify(fs.appendFileSync), @@ -175,4 +208,6 @@ export default { }, constants, watch, + + opendir, }; diff --git a/src/js/node/http.ts b/src/js/node/http.ts index 0d41cf996..8d457bb85 100644 --- a/src/js/node/http.ts +++ b/src/js/node/http.ts @@ -89,7 +89,7 @@ const NODE_HTTP_WARNING = var _defaultHTTPSAgent; var kInternalRequest = Symbol("kInternalRequest"); -var kInternalSocketData = Symbol.for("::bunternal::"); +const kInternalSocketData = Symbol.for("::bunternal::"); const kEmptyBuffer = Buffer.alloc(0); @@ -134,6 +134,7 @@ function getHeader(headers, name) { type FakeSocket = InstanceType<typeof FakeSocket>; var FakeSocket = class Socket extends Duplex { + [kInternalSocketData]: any; bytesRead = 0; bytesWritten = 0; connecting = false; @@ -525,7 +526,7 @@ class Server extends EventEmitter { const upgrade = req.headers.get("upgrade"); if (upgrade) { - const socket = new FakeSocket(); + const socket = http_req.socket; socket[kInternalSocketData] = [_server, http_res, req]; server.emit("upgrade", http_req, socket, kEmptyBuffer); } else { diff --git a/src/js/node/stream.js b/src/js/node/stream.js index d7d984cb8..d91ec2f6c 100644 --- a/src/js/node/stream.js +++ b/src/js/node/stream.js @@ -2,7 +2,7 @@ // "readable-stream" npm package // just transpiled and debug logs added. -const EE = $lazy("events"); +const EE = require("node:events"); const StringDecoder = require("node:string_decoder").StringDecoder; var __getOwnPropNames = Object.getOwnPropertyNames; @@ -24,6 +24,275 @@ function validateBoolean(value, name) { $debug("node:stream loaded"); +function highWaterMarkFrom(options, isDuplex, duplexKey) { + return options.highWaterMark != null ? options.highWaterMark : isDuplex ? options[duplexKey] : null; +} +function getDefaultHighWaterMark(objectMode) { + return objectMode ? 16 : 16 * 1024; +} +function getHighWaterMark(state, options, duplexKey, isDuplex) { + const hwm = highWaterMarkFrom(options, isDuplex, duplexKey); + if (hwm != null) { + if (!Number.isInteger(hwm) || hwm < 0) { + const name = isDuplex ? `options.${duplexKey}` : "options.highWaterMark"; + throw new ERR_INVALID_ARG_VALUE(name, hwm); + } + return Math.floor(hwm); + } + + // Default value + return getDefaultHighWaterMark(state.objectMode); +} + +class BufferList { + constructor() { + this.head = null; + this.tail = null; + this.length = 0; + } + push(v) { + const entry = { + data: v, + next: null, + }; + if (this.length > 0) this.tail.next = entry; + else this.head = entry; + this.tail = entry; + ++this.length; + } + unshift(v) { + const entry = { + data: v, + next: this.head, + }; + if (this.length === 0) this.tail = entry; + this.head = entry; + ++this.length; + } + shift() { + if (this.length === 0) return; + const ret = this.head.data; + if (this.length === 1) this.head = this.tail = null; + else this.head = this.head.next; + --this.length; + return ret; + } + clear() { + this.head = this.tail = null; + this.length = 0; + } + join(s) { + if (this.length === 0) return ""; + let p = this.head; + let ret = "" + p.data; + while ((p = p.next) !== null) ret += s + p.data; + return ret; + } + concat(n) { + if (this.length === 0) return Buffer.alloc(0); + const ret = Buffer.allocUnsafe(n >>> 0); + let p = this.head; + let i = 0; + while (p) { + ret.set(p.data, i); + i += p.data.length; + p = p.next; + } + return ret; + } + + // Consumes a specified amount of bytes or characters from the buffered data. + consume(n, hasStrings) { + const data = this.head.data; + if (n < data.length) { + // `slice` is the same for buffers and strings. + const slice = data.slice(0, n); + this.head.data = data.slice(n); + return slice; + } + if (n === data.length) { + // First chunk is a perfect match. + return this.shift(); + } + // Result spans more than one buffer. + return hasStrings ? this._getString(n) : this._getBuffer(n); + } + first() { + return this.head.data; + } + *[Symbol.iterator]() { + for (let p = this.head; p; p = p.next) { + yield p.data; + } + } + + // Consumes a specified amount of characters from the buffered data. + _getString(n) { + let ret = ""; + let p = this.head; + let c = 0; + do { + const str = p.data; + if (n > str.length) { + ret += str; + n -= str.length; + } else { + if (n === str.length) { + ret += str; + ++c; + if (p.next) this.head = p.next; + else this.head = this.tail = null; + } else { + ret += str.slice(0, n); + this.head = p; + p.data = str.slice(n); + } + break; + } + ++c; + } while ((p = p.next) !== null); + this.length -= c; + return ret; + } + + // Consumes a specified amount of bytes from the buffered data. + _getBuffer(n) { + const ret = Buffer.allocUnsafe(n); + const retLen = n; + let p = this.head; + let c = 0; + do { + const buf = p.data; + if (n > buf.length) { + ret.set(buf, retLen - n); + n -= buf.length; + } else { + if (n === buf.length) { + ret.set(buf, retLen - n); + ++c; + if (p.next) this.head = p.next; + else this.head = this.tail = null; + } else { + ret.set(new Uint8Array(buf.buffer, buf.byteOffset, n), retLen - n); + this.head = p; + p.data = buf.slice(n); + } + break; + } + ++c; + } while ((p = p.next) !== null); + this.length -= c; + return ret; + } + + // Make sure the linked list only shows the minimal necessary information. + [Symbol.for("nodejs.util.inspect.custom")](_, options) { + return inspect(this, { + ...options, + // Only inspect one level. + depth: 0, + // It should not recurse. + customInspect: false, + }); + } +} + +function ReadableState(options, stream, isDuplex) { + // Duplex streams are both readable and writable, but share + // the same options object. + // However, some cases require setting options to different + // values for the readable and the writable sides of the duplex stream. + // These options can be provided separately as readableXXX and writableXXX. + if (typeof isDuplex !== "boolean") isDuplex = stream instanceof Duplex; + + // Object stream flag. Used to make read(n) ignore n and to + // make all the buffer merging and length checks go away. + this.objectMode = !!(options && options.objectMode); + if (isDuplex) this.objectMode = this.objectMode || !!(options && options.readableObjectMode); + + // The point at which it stops calling _read() to fill the buffer + // Note: 0 is a valid value, means "don't call _read preemptively ever" + this.highWaterMark = options + ? getHighWaterMark(this, options, "readableHighWaterMark", isDuplex) + : getDefaultHighWaterMark(false); + + // A linked list is used to store data chunks instead of an array because the + // linked list can remove elements from the beginning faster than + // array.shift(). + this.buffer = new BufferList(); + this.length = 0; + this.pipes = []; + this.flowing = null; + this.ended = false; + this.endEmitted = false; + this.reading = false; + + // Stream is still being constructed and cannot be + // destroyed until construction finished or failed. + // Async construction is opt in, therefore we start as + // constructed. + this.constructed = true; + + // A flag to be able to tell if the event 'readable'/'data' is emitted + // immediately, or on a later tick. We set this to true at first, because + // any actions that shouldn't happen until "later" should generally also + // not happen before the first read call. + this.sync = true; + + // Whenever we return null, then we set a flag to say + // that we're awaiting a 'readable' event emission. + this.needReadable = false; + this.emittedReadable = false; + this.readableListening = false; + this.resumeScheduled = false; + // this[kPaused] = null + + // True if the error was already emitted and should not be thrown again. + this.errorEmitted = false; + + // Should close be emitted on destroy. Defaults to true. + this.emitClose = !options || options.emitClose !== false; + + // Should .destroy() be called after 'end' (and potentially 'finish'). + this.autoDestroy = !options || options.autoDestroy !== false; + + // Has it been destroyed. + this.destroyed = false; + + // Indicates whether the stream has errored. When true no further + // _read calls, 'data' or 'readable' events should occur. This is needed + // since when autoDestroy is disabled we need a way to tell whether the + // stream has failed. + this.errored = null; + + // Indicates whether the stream has finished destroying. + this.closed = false; + + // True if close has been emitted or would have been emitted + // depending on emitClose. + this.closeEmitted = false; + + // Crypto is kind of old and crusty. Historically, its default string + // encoding is 'binary' so we have to make this configurable. + // Everything else in the universe uses 'utf8', though. + this.defaultEncoding = (options && options.defaultEncoding) || "utf8"; + + // Ref the piped dest which we need a drain event on it + // type: null | Writable | Set<Writable>. + this.awaitDrainWriters = null; + this.multiAwaitDrain = false; + + // If true, a maybeReadMore has been scheduled. + this.readingMore = false; + this.dataEmitted = false; + this.decoder = null; + this.encoding = null; + if (options && options.encoding) { + this.decoder = new StringDecoder(options.encoding); + this.encoding = options.encoding; + } +} + /** * @callback validateObject * @param {*} value @@ -42,7 +311,7 @@ const validateObject = (value, name, options = null) => { const nullable = options?.nullable ?? false; if ( (!nullable && value === null) || - (!allowArray && ArrayIsArray(value)) || + (!allowArray && $isArray(value)) || (typeof value !== "object" && (!allowFunction || typeof value !== "function")) ) { throw new ERR_INVALID_ARG_TYPE(name, "Object", value); @@ -61,8 +330,6 @@ function validateString(value, name) { if (typeof value !== "string") throw new ERR_INVALID_ARG_TYPE(name, "string", value); } -var ArrayIsArray = Array.isArray; - //------------------------------------------------------------------------------ // Node error polyfills //------------------------------------------------------------------------------ @@ -81,7 +348,7 @@ var require_primordials = __commonJS({ "use strict"; module.exports = { ArrayIsArray(self) { - return Array.isArray(self); + return $isArray(self); }, ArrayPrototypeIncludes(self, el) { return self.includes(el); @@ -165,9 +432,6 @@ var require_primordials = __commonJS({ SymbolAsyncIterator: Symbol.asyncIterator, SymbolHasInstance: Symbol.hasInstance, SymbolIterator: Symbol.iterator, - TypedArrayPrototypeSet(self, buf, len) { - return self.set(buf, len); - }, Uint8Array, }; }, @@ -186,21 +450,7 @@ var require_util = __commonJS({ : function isBlob2(b) { return false; }; - var AggregateError = class extends Error { - constructor(errors) { - if (!Array.isArray(errors)) { - throw new TypeError(`Expected input to be an Array, got ${typeof errors}`); - } - let message = ""; - for (let i = 0; i < errors.length; i++) { - message += ` ${errors[i].stack} -`; - } - super(message); - this.name = "AggregateError"; - this.errors = errors; - } - }; + module.exports = { AggregateError, once(callback) { @@ -299,8 +549,8 @@ var require_util = __commonJS({ var require_errors = __commonJS({ "node_modules/readable-stream/lib/ours/errors.js"(exports, module) { "use strict"; - var { format, inspect, AggregateError: CustomAggregateError } = require_util(); - var AggregateError = globalThis.AggregateError || CustomAggregateError; + var { format, inspect } = require_util(); + var AggregateError = globalThis.AggregateError; var kIsNodeError = Symbol("kIsNodeError"); var kTypes = ["string", "function", "number", "object", "Function", "Object", "boolean", "bigint", "symbol"]; var classRegExp = /^([A-Z][a-z0-9]*)+$/; @@ -2260,7 +2510,6 @@ var require_readable = __commonJS({ Symbol: Symbol2, } = require_primordials(); - var ReadableState = $lazy("bun:stream").ReadableState; var { Stream, prependListener } = require_legacy(); function Readable(options) { @@ -2510,15 +2759,127 @@ var require_readable = __commonJS({ var { addAbortSignal } = require_add_abort_signal(); var eos = require_end_of_stream(); - const { maybeReadMore: _maybeReadMore, resume, emitReadable: _emitReadable, onEofChunk } = $lazy("bun:stream"); function maybeReadMore(stream, state) { - process.nextTick(_maybeReadMore, stream, state); + if (!state.readingMore && state.constructed) { + state.readingMore = true; + process.nextTick(maybeReadMore_, stream, state); + } } - // REVERT ME - function emitReadable(stream, state) { - $debug("NativeReadable - emitReadable", stream.__id); - _emitReadable(stream, state); + function maybeReadMore_(stream, state) { + // Attempt to read more data if we should. + // + // The conditions for reading more data are (one of): + // - Not enough data buffered (state.length < state.highWaterMark). The loop + // is responsible for filling the buffer with enough data if such data + // is available. If highWaterMark is 0 and we are not in the flowing mode + // we should _not_ attempt to buffer any extra data. We'll get more data + // when the stream consumer calls read() instead. + // - No data in the buffer, and the stream is in flowing mode. In this mode + // the loop below is responsible for ensuring read() is called. Failing to + // call read here would abort the flow and there's no other mechanism for + // continuing the flow if the stream consumer has just subscribed to the + // 'data' event. + // + // In addition to the above conditions to keep reading data, the following + // conditions prevent the data from being read: + // - The stream has ended (state.ended). + // - There is already a pending 'read' operation (state.reading). This is a + // case where the stream has called the implementation defined _read() + // method, but they are processing the call asynchronously and have _not_ + // called push() with new data. In this case we skip performing more + // read()s. The execution ends in this method again after the _read() ends + // up calling push() with more data. + while ( + !state.reading && + !state.ended && + (state.length < state.highWaterMark || (state.flowing && state.length === 0)) + ) { + const len = state.length; + $debug("maybeReadMore read 0"); + stream.read(0); + if (len === state.length) + // Didn't get any data, stop spinning. + break; + } + state.readingMore = false; + } + // Don't emit readable right away in sync mode, because this can trigger + // another read() call => stack overflow. This way, it might trigger + // a nextTick recursion warning, but that's not so bad. + function emitReadable(stream) { + const state = stream._readableState; + $debug("emitReadable", state.needReadable, state.emittedReadable); + state.needReadable = false; + if (!state.emittedReadable) { + $debug("emitReadable", state.flowing); + state.emittedReadable = true; + process.nextTick(emitReadable_, stream); + } + } + function emitReadable_(stream) { + const state = stream._readableState; + $debug("emitReadable_", state.destroyed, state.length, state.ended); + if (!state.destroyed && !state.errored && (state.length || state.ended)) { + stream.emit("readable"); + state.emittedReadable = false; + } + + // The stream needs another readable event if: + // 1. It is not flowing, as the flow mechanism will take + // care of it. + // 2. It is not ended. + // 3. It is below the highWaterMark, so we can schedule + // another readable later. + state.needReadable = !state.flowing && !state.ended && state.length <= state.highWaterMark; + flow(stream); + } + function flow(stream) { + const state = stream._readableState; + $debug("flow", state.flowing); + while (state.flowing && stream.read() !== null); + } + function onEofChunk(stream, state) { + $debug("onEofChunk"); + if (state.ended) return; + if (state.decoder) { + const chunk = state.decoder.end(); + if (chunk && chunk.length) { + state.buffer.push(chunk); + state.length += state.objectMode ? 1 : chunk.length; + } + } + state.ended = true; + if (state.sync) { + // If we are sync, wait until next tick to emit the data. + // Otherwise we risk emitting data in the flow() + // the readable code triggers during a read() call. + emitReadable(stream); + } else { + // Emit 'readable' now to make sure it gets picked up. + state.needReadable = false; + state.emittedReadable = true; + // We have to emit readable now that we are EOF. Modules + // in the ecosystem (e.g. dicer) rely on this event being sync. + emitReadable_(stream); + } + } + function resume(stream, state) { + if (!state.resumeScheduled) { + state.resumeScheduled = true; + process.nextTick(resume_, stream, state); + } } + function resume_(stream, state) { + $debug("resume", state.reading); + if (!state.reading) { + stream.read(0); + } + state.resumeScheduled = false; + stream.emit("resume"); + flow(stream); + if (state.flowing && !state.reading) stream.read(0); + } + var destroyImpl = require_destroy(); var { aggregateTwoErrors, @@ -5607,4 +5968,6 @@ exports[Symbol.for("::bunternal::")] = { _ReadableFromWeb, _ReadableFromWebForUn exports.eos = require_end_of_stream(); exports.EventEmitter = EE; +var Duplex = exports.Duplex; + export default exports; diff --git a/src/js/node/v8.ts b/src/js/node/v8.ts index f74422de8..2aa49451f 100644 --- a/src/js/node/v8.ts +++ b/src/js/node/v8.ts @@ -1,7 +1,7 @@ // Hardcoded module "node:v8" // This is a stub! None of this is actually implemented yet. const { hideFromStack, throwNotImplemented } = require("$shared"); -const jsc = require("bun:jsc"); +const jsc: typeof import("bun:jsc") = require("bun:jsc"); function notimpl(message) { throwNotImplemented("node:v8 " + message); @@ -32,7 +32,24 @@ function getHeapSnapshot() { notimpl("getHeapSnapshot"); } function getHeapStatistics() { - notimpl("getHeapStatistics"); + const stats = jsc.heapStats(); + // this is not very correct + return { + total_heap_size: stats.heapCapacity, + total_heap_size_executable: 0, + total_physical_size: stats.heapSize, + total_available_size: Infinity, + used_heap_size: stats.heapSize, + heap_size_limit: Infinity, + malloced_memory: stats.heapSize, + peak_malloced_memory: Infinity, + does_zap_garbage: 0, + number_of_native_contexts: Infinity, + number_of_detached_contexts: Infinity, + total_global_handles_size: Infinity, + used_global_handles_size: Infinity, + external_memory: Infinity, + }; } function getHeapSpaceStatistics() { notimpl("getHeapSpaceStatistics"); |