aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2022-06-03 04:44:11 -0700
committerGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2022-06-03 04:44:11 -0700
commite5322eb63bf6ff8deb0f2aade42b9caae40a47cc (patch)
tree87b72fe1895f92068e908aaaddafe5ef514d8daf
parent102553dca6a090533725416cc384d36a49a9ce1d (diff)
downloadbun-e5322eb63bf6ff8deb0f2aade42b9caae40a47cc.tar.gz
bun-e5322eb63bf6ff8deb0f2aade42b9caae40a47cc.tar.zst
bun-e5322eb63bf6ff8deb0f2aade42b9caae40a47cc.zip
Move streams to it's own file
-rw-r--r--src/global.zig4
-rw-r--r--src/javascript/jsc/api/server.zig8
-rw-r--r--src/javascript/jsc/bindings/builtins/js/ReadableStream.js159
-rw-r--r--src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js2
-rw-r--r--src/javascript/jsc/webcore.zig2
-rw-r--r--src/javascript/jsc/webcore/response.zig1163
-rw-r--r--src/javascript/jsc/webcore/streams.zig1321
7 files changed, 1504 insertions, 1155 deletions
diff --git a/src/global.zig b/src/global.zig
index eb385537d..6378bb2ba 100644
--- a/src/global.zig
+++ b/src/global.zig
@@ -3,20 +3,16 @@ pub const Environment = @import("env.zig");
pub const use_mimalloc = !Environment.isTest;
-/// For sizes less than 8 MB, allocate via mimalloc
pub const default_allocator: std.mem.Allocator = if (!use_mimalloc)
std.heap.c_allocator
else
@import("./memory_allocator.zig").c_allocator;
-/// For sizes larger than 8 MB, allocate via mmap() instead of malloc().
pub const huge_allocator: std.mem.Allocator = if (!use_mimalloc)
std.heap.c_allocator
else
@import("./memory_allocator.zig").huge_allocator;
-/// For sizes larger than 8 MB, allocate via mmap() instead of malloc().
-/// For sizes less than 8 MB, allocate via mimalloc
pub const auto_allocator: std.mem.Allocator = if (!use_mimalloc)
std.heap.c_allocator
else
diff --git a/src/javascript/jsc/api/server.zig b/src/javascript/jsc/api/server.zig
index aa4cb91a0..bec079e90 100644
--- a/src/javascript/jsc/api/server.zig
+++ b/src/javascript/jsc/api/server.zig
@@ -1359,7 +1359,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
}
- pub fn onRequestData(this: *RequestContext) void {
+ pub fn onPull(this: *RequestContext) void {
if (this.req.header("content-length")) |content_length| {
const len = std.fmt.parseInt(usize, content_length, 10) catch 0;
if (len == 0) {
@@ -1396,8 +1396,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
this.resp.onData(*RequestContext, onBodyChunk, this);
}
- pub fn onRequestDataCallback(this: *anyopaque) void {
- onRequestData(bun.cast(*RequestContext, this));
+ pub fn onPullCallback(this: *anyopaque) void {
+ onPull(bun.cast(*RequestContext, this));
}
};
}
@@ -1700,7 +1700,7 @@ pub fn NewServer(comptime ssl_enabled_: bool, comptime debug_mode_: bool) type {
.Locked = .{
.task = ctx,
.global = this.globalThis,
- .onRequestData = RequestContext.onRequestDataCallback,
+ .onPull = RequestContext.onPullCallback,
},
},
};
diff --git a/src/javascript/jsc/bindings/builtins/js/ReadableStream.js b/src/javascript/jsc/bindings/builtins/js/ReadableStream.js
index f3ab66d5c..e3f4cbdf8 100644
--- a/src/javascript/jsc/bindings/builtins/js/ReadableStream.js
+++ b/src/javascript/jsc/bindings/builtins/js/ReadableStream.js
@@ -89,6 +89,165 @@ function initializeReadableStream(underlyingSource, strategy)
}
@globalPrivate
+function readableStreamToArray(stream) {
+ "use strict";
+
+ if (@getByIdDirectPrivate(stream, "state") === @streamClosed) {
+ return null;
+ }
+ var reader = stream.getReader();
+
+ var manyResult = reader.readMany();
+
+ var processManyResult = (0, (async function(manyResult) {
+ if (result.done) {
+ return null;
+ }
+
+ var chunks = result.value;
+
+ while (true) {
+ var thisResult = await reader.read();
+ if (thisResult.done) {
+ return chunks;
+ }
+
+ chunks.push(thisResult.value);
+ }
+
+ return chunks;
+ }));
+
+
+ if (manyResult && @isPromise(manyResult)) {
+ return manyResult.then(processManyResult);
+ }
+
+ if (manyResult && manyResult.done) {
+ return null;
+ }
+
+ return processManyResult(manyResult);
+}
+
+
+
+@globalPrivate
+function consumeReadableStream(nativePtr, nativeType, inputStream) {
+ "use strict";
+ const symbol = Symbol.for("Bun.consumeReadableStreamPrototype");
+ var cached = globalThis[symbol];
+ if (!cached) {
+ cached = globalThis[symbol] = [];
+ }
+ var Prototype = cached[nativeType];
+ if (Prototype === @undefined) {
+ var [doRead, doError, doReadMany, doClose, onClose, deinit] = globalThis[Symbol.for("Bun.lazy")](nativeType);
+
+ Prototype = class NativeReadableStreamSink {
+ constructor(reader, ptr) {
+ this.#ptr = ptr;
+ this.#reader = reader;
+ this.#didClose = false;
+
+ this.handleError = this._handleError.bind(this);
+ this.handleClosed = this._handleClosed.bind(this);
+ this.processResult = this._processResult.bind(this);
+
+ reader.closed.then(this.handleClosed, this.handleError);
+ }
+
+ handleError;
+ handleClosed;
+ _handleClosed() {
+ if (this.#didClose) return;
+ this.#didClose = true;
+ var ptr = this.#ptr;
+ this.#ptr = 0;
+ doClose(ptr);
+ deinit(ptr);
+ }
+
+ _handleError(error) {
+ if (this.#didClose) return;
+ this.#didClose = true;
+ var ptr = this.#ptr;
+ this.#ptr = 0;
+ doError(ptr, error);
+ deinit(ptr);
+ }
+
+ #ptr;
+ #didClose = false;
+ #reader;
+
+ _handleReadMany({value, done, size}) {
+ if (done) {
+ this.handleClosed();
+ return;
+ }
+
+ if (this.#didClose) return;
+
+
+ doReadMany(this.#ptr, value, done, size);
+ }
+
+
+ read() {
+ if (!this.#ptr) return @throwTypeError("ReadableStreamSink is already closed");
+
+ return this.processResult(this.#reader.read());
+
+ }
+
+ _processResult(result) {
+ if (result && @isPromise(result)) {
+ const flags = @getPromiseInternalField(result, @promiseFieldFlags);
+ if (flags & @promiseStateFulfilled) {
+ const fulfilledValue = @getPromiseInternalField(result, @promiseFieldReactionsOrResult);
+ if (fulfilledValue) {
+ result = fulfilledValue;
+ }
+ }
+ }
+
+ if (result && @isPromise(result)) {
+ result.then(this.processResult, this.handleError);
+ return null;
+ }
+
+ if (result.done) {
+ this.handleClosed();
+ return 0;
+ } else if (result.value) {
+ return result.value;
+ } else {
+ return -1;
+ }
+ }
+
+ readMany() {
+ if (!this.#ptr) return @throwTypeError("ReadableStreamSink is already closed");
+ return this.processResult(this.#reader.readMany());
+ }
+ };
+
+ const minlength = nativeType + 1;
+ if (cached.length < minlength) {
+ cached.length = minlength;
+ }
+ @putByValDirect(cached, nativeType, Prototype);
+ }
+
+ if (@isReadableStreamLocked(inputStream)) {
+ @throwTypeError("Cannot start reading from a locked stream");
+ }
+
+ return new Prototype(inputStream.getReader(), nativePtr);
+}
+
+@globalPrivate
function createEmptyReadableStream() {
var stream = new @ReadableStream({
pull() {},
diff --git a/src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js b/src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js
index 18e82262c..09387c9f1 100644
--- a/src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js
+++ b/src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js
@@ -732,7 +732,7 @@ function readableStreamDefaultControllerEnqueue(controller, chunk)
// this is checked by callers
@assert(@readableStreamDefaultControllerCanCloseOrEnqueue(controller));
- if (@isReadableStreamLocked(stream) && @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests").isNotEmpty) {
+ if (@isReadableStreamLocked(stream) && @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty()) {
@readableStreamFulfillReadRequest(stream, chunk, false);
@readableStreamDefaultControllerCallPullIfNeeded(controller);
return;
diff --git a/src/javascript/jsc/webcore.zig b/src/javascript/jsc/webcore.zig
index 1a25c658d..1ed585018 100644
--- a/src/javascript/jsc/webcore.zig
+++ b/src/javascript/jsc/webcore.zig
@@ -1,5 +1,7 @@
pub usingnamespace @import("./webcore/response.zig");
pub usingnamespace @import("./webcore/encoding.zig");
+pub usingnamespace @import("./webcore/streams.zig");
+
const JSC = @import("../../jsc.zig");
const std = @import("std");
diff --git a/src/javascript/jsc/webcore/response.zig b/src/javascript/jsc/webcore/response.zig
index d85139525..8d854d2f6 100644
--- a/src/javascript/jsc/webcore/response.zig
+++ b/src/javascript/jsc/webcore/response.zig
@@ -1020,98 +1020,6 @@ pub const Fetch = struct {
}
};
-pub const ReadableStream = struct {
- pub const Tag = enum(i32) {
- Blob = 1,
- File = 2,
- };
-
- extern fn ZigGlobalObject__createNativeReadableStream(*JSGlobalObject, nativePtr: JSValue, nativeType: JSValue) JSValue;
-
- pub fn fromNative(globalThis: *JSGlobalObject, id: Tag, ptr: *anyopaque) JSC.JSValue {
- return ZigGlobalObject__createNativeReadableStream(globalThis, JSValue.fromPtr(ptr), JSValue.jsNumber(@enumToInt(id)));
- }
- pub fn fromBlob(globalThis: *JSGlobalObject, blob: *const Blob, recommended_chunk_size: Blob.SizeType) JSC.JSValue {
- if (comptime JSC.is_bindgen)
- unreachable;
- var store = blob.store orelse {
- return ReadableStream.empty(globalThis);
- };
- switch (store.data) {
- .bytes => {
- var reader = bun.default_allocator.create(ByteBlobLoader.Source) catch unreachable;
- reader.* = .{
- .context = undefined,
- };
- reader.context.setup(blob, recommended_chunk_size);
- return reader.toJS(globalThis);
- },
- .file => {
- var reader = bun.default_allocator.create(FileBlobLoader.Source) catch unreachable;
- reader.* = .{
- .context = undefined,
- };
- reader.context.setup(store, recommended_chunk_size);
- return reader.toJS(globalThis);
- },
- }
- }
-
- pub fn empty(globalThis: *JSGlobalObject) JSC.JSValue {
- if (comptime JSC.is_bindgen)
- unreachable;
-
- return ReadableStream__empty(globalThis);
- }
-
- extern fn ReadableStream__empty(*JSGlobalObject) JSC.JSValue;
- extern fn ReadableStream__fromBlob(
- *JSGlobalObject,
- store: *anyopaque,
- offset: usize,
- length: usize,
- ) JSC.JSValue;
- const Base = @import("../../../ast/base.zig");
- pub const StreamTag = enum(usize) {
- invalid = 0,
- _,
-
- pub fn init(filedes: JSC.Node.FileDescriptor) StreamTag {
- var bytes = [8]u8{ 1, 0, 0, 0, 0, 0, 0, 0 };
- const filedes_ = @bitCast([8]u8, @as(usize, @truncate(u56, @intCast(usize, filedes))));
- bytes[1..8].* = filedes_[0..7].*;
-
- return @intToEnum(StreamTag, @bitCast(u64, bytes));
- }
-
- pub fn fd(this: StreamTag) JSC.Node.FileDescriptor {
- var bytes = @bitCast([8]u8, @enumToInt(this));
- if (bytes[0] != 1) {
- return std.math.maxInt(JSC.Node.FileDescriptor);
- }
- var out: u64 = 0;
- @bitCast([8]u8, out)[0..7].* = bytes[1..8].*;
- return @intCast(JSC.Node.FileDescriptor, out);
- }
- };
-
- // pub fn NativeReader(
- // comptime Context: type,
- // comptime onEnqueue: anytype,
- // comptime onClose: anytype,
- // comptime onAsJS: anytype,
- // ) type {
- // return struct {
- // pub const tag = Context.tag;
-
- // pub fn enqueue(globalThis: *JSAGlobalObject, callframe: *const JSC.CallFrame) callconv(.C) JSC.JSValue {
- // var this = callframe.argument(0).asPtr(*Context);
- // return onEnqueue(this, globalThis, callframe.argument(1));
- // }
- // };
- // }
-};
-
// https://developer.mozilla.org/en-US/docs/Web/API/Headers
pub const Headers = struct {
pub usingnamespace HTTPClient.Headers;
@@ -2843,7 +2751,7 @@ pub const Blob = struct {
recommended_chunk_size = @intCast(SizeType, @maximum(0, @truncate(i52, JSValue.c(arguments[0]).toInt64())));
}
- return ReadableStream.fromBlob(
+ return JSC.WebCore.ReadableStream.fromBlob(
ctx.ptr(),
this,
recommended_chunk_size,
@@ -3911,7 +3819,7 @@ pub const Body = struct {
callback: ?fn (ctx: *anyopaque, value: *Value) void = null,
/// conditionally runs when requesting data
/// used in HTTP server to ignore request bodies unless asked for it
- onRequestData: ?fn (ctx: *anyopaque) void = null,
+ onPull: ?fn (ctx: *anyopaque) void = null,
deinit: bool = false,
action: Action = Action.none,
@@ -3920,9 +3828,9 @@ pub const Body = struct {
var promise = JSC.JSPromise.create(globalThis);
const promise_value = promise.asValue(globalThis);
value.promise = promise_value;
- if (value.onRequestData) |onRequestData| {
- value.onRequestData = null;
- onRequestData(value.task.?);
+ if (value.onPull) |onPull| {
+ value.onPull = null;
+ onPull(value.task.?);
}
return promise_value;
}
@@ -4143,6 +4051,11 @@ pub const Body = struct {
} else |_| {}
}
+ if (value.as(JSC.WebCore.ReadableStream)) |readable| {
+ body.value = Body.Value.fromReadableStream(ctx, readable);
+ return body;
+ }
+
body.value = .{
.Blob = Blob.fromJS(ctx.ptr(), value, true, false) catch |err| {
if (err == error.InvalidArguments) {
@@ -4841,1055 +4754,13 @@ pub const FetchEvent = struct {
}
};
-pub const StreamStart = union(enum) {
- empty: void,
- err: JSC.Node.Syscall.Error,
- chunk_size: Blob.SizeType,
-};
-
-pub const StreamResult = union(enum) {
- owned: bun.ByteList,
- owned_and_done: bun.ByteList,
- temporary_and_done: bun.ByteList,
- temporary: bun.ByteList,
- into_array: IntoArray,
- into_array_and_done: IntoArray,
- pending: *Pending,
- err: JSC.Node.Syscall.Error,
- done: void,
-
- pub const IntoArray = struct {
- value: JSValue = JSValue.zero,
- len: Blob.SizeType = std.math.maxInt(Blob.SizeType),
- };
+pub const StreamSource = struct {
+ ptr: ?*anyopaque = null,
+ vtable: VTable,
- pub const Pending = struct {
- frame: anyframe,
- result: StreamResult,
- used: bool = false,
+ pub const VTable = struct {
+ onStart: fn (this: StreamSource) JSC.WebCore.StreamStart,
+ onPull: fn (this: StreamSource) JSC.WebCore.StreamResult,
+ onError: fn (this: StreamSource) void,
};
-
- pub fn isDone(this: *const StreamResult) bool {
- return switch (this.*) {
- .owned_and_done, .temporary_and_done, .into_array_and_done, .done, .err => true,
- else => false,
- };
- }
-
- fn toPromisedWrap(globalThis: *JSGlobalObject, promise: *JSPromise, pending: *Pending) void {
- suspend {}
-
- pending.used = true;
- const result: StreamResult = pending.result;
-
- switch (result) {
- .err => |err| {
- promise.reject(globalThis, err.toJSC(globalThis));
- },
- .done => {
- promise.resolve(globalThis, JSValue.jsBoolean(false));
- },
- else => {
- promise.resolve(globalThis, result.toJS(globalThis));
- },
- }
- }
-
- pub fn toPromised(globalThis: *JSGlobalObject, promise: *JSPromise, pending: *Pending) void {
- var frame = bun.default_allocator.create(@Frame(toPromisedWrap)) catch unreachable;
- frame.* = async toPromisedWrap(globalThis, promise, pending);
- pending.frame = frame;
- }
-
- pub fn toJS(this: *const StreamResult, globalThis: *JSGlobalObject) JSValue {
- switch (this.*) {
- .owned => |list| {
- return JSC.ArrayBuffer.fromBytes(list.slice(), .Uint8Array).toJS(globalThis.ref(), null);
- },
- .owned_and_done => |list| {
- return JSC.ArrayBuffer.fromBytes(list.slice(), .Uint8Array).toJS(globalThis.ref(), null);
- },
- .temporary => |temp| {
- var array = JSC.JSValue.createUninitializedUint8Array(globalThis, temp.len);
- var slice = array.asArrayBuffer(globalThis).?.slice();
- @memcpy(slice.ptr, temp.ptr, temp.len);
- return array;
- },
- .temporary_and_done => |temp| {
- var array = JSC.JSValue.createUninitializedUint8Array(globalThis, temp.len);
- var slice = array.asArrayBuffer(globalThis).?.slice();
- @memcpy(slice.ptr, temp.ptr, temp.len);
- return array;
- },
- .into_array => |array| {
- return JSC.JSValue.jsNumberFromInt64(array.len);
- },
- .into_array_and_done => |array| {
- return JSC.JSValue.jsNumberFromInt64(array.len);
- },
- .pending => |pending| {
- var promise = JSC.JSPromise.create(globalThis);
- toPromised(globalThis, promise, pending);
- return promise.asValue(globalThis);
- },
-
- .err => |err| {
- return JSC.JSPromise.rejectedPromise(globalThis, JSValue.c(err.toJS(globalThis.ref()))).asValue(globalThis);
- },
-
- // false == controller.close()
- // undefined == noop, but we probably won't send it
- .done => {
- return JSC.JSValue.jsBoolean(false);
- },
- }
- }
};
-
-pub fn WritableStreamSink(
- comptime Context: type,
- comptime onStart: ?fn (this: Context) void,
- comptime onWrite: fn (this: Context, bytes: []const u8) JSC.Maybe(Blob.SizeType),
- comptime onAbort: ?fn (this: Context) void,
- comptime onClose: ?fn (this: Context) void,
- comptime deinit: ?fn (this: Context) void,
-) type {
- return struct {
- context: Context,
- closed: bool = false,
- deinited: bool = false,
- pending_err: ?JSC.Node.Syscall.Error = null,
- aborted: bool = false,
-
- abort_signaler: ?*anyopaque = null,
- onAbortCallback: ?fn (?*anyopaque) void = null,
-
- close_signaler: ?*anyopaque = null,
- onCloseCallback: ?fn (?*anyopaque) void = null,
-
- pub const This = @This();
-
- pub fn write(this: *This, bytes: []const u8) JSC.Maybe(Blob.SizeType) {
- if (this.pending_err) |err| {
- this.pending_err = null;
- return .{ .err = err };
- }
-
- if (this.closed or this.aborted or this.deinited) {
- return .{ .result = 0 };
- }
- return onWrite(&this.context, bytes);
- }
-
- pub fn start(this: *This) StreamStart {
- return onStart(&this.context);
- }
-
- pub fn abort(this: *This) void {
- if (this.closed or this.deinited or this.aborted) {
- return;
- }
-
- this.aborted = true;
- onAbort(&this.context);
- }
-
- pub fn didAbort(this: *This) void {
- if (this.closed or this.deinited or this.aborted) {
- return;
- }
- this.aborted = true;
-
- if (this.onAbortCallback) |cb| {
- this.onAbortCallback = null;
- cb(this.abort_signaler);
- }
- }
-
- pub fn didClose(this: *This) void {
- if (this.closed or this.deinited or this.aborted) {
- return;
- }
- this.closed = true;
-
- if (this.onCloseCallback) |cb| {
- this.onCloseCallback = null;
- cb(this.close_signaler);
- }
- }
-
- pub fn close(this: *This) void {
- if (this.closed or this.deinited or this.aborted) {
- return;
- }
-
- this.closed = true;
- onClose(this.context);
- }
-
- pub fn deinit(this: *This) void {
- if (this.deinited) {
- return;
- }
- this.deinited = true;
- deinit(this.context);
- }
-
- pub fn getError(this: *This) ?JSC.Node.Syscall.Error {
- if (this.pending_err) |err| {
- this.pending_err = null;
- return err;
- }
-
- return null;
- }
- };
-}
-
-pub fn HTTPServerWritable(comptime ssl: bool) type {
- return struct {
- pub const UWSResponse = uws.NewApp(ssl).Response;
- res: *UWSResponse,
- pending_chunk: []const u8 = "",
- is_listening_for_abort: bool = false,
- wrote: Blob.SizeType = 0,
- callback: anyframe->JSC.Maybe(Blob.SizeType) = undefined,
- writable: Writable,
-
- pub fn onWritable(this: *@This(), available: c_ulong, _: *UWSResponse) callconv(.C) bool {
- const to_write = @minimum(@truncate(Blob.SizeType, available), @truncate(Blob.SizeType, this.pending_chunk.len));
- if (!this.res.write(this.pending_chunk[0..to_write])) {
- return true;
- }
-
- this.pending_chunk = this.pending_chunk[to_write..];
- this.wrote += to_write;
- if (this.pending_chunk.len > 0) {
- this.res.onWritable(*@This(), onWritable, this);
- return true;
- }
-
- var callback = this.callback;
- this.callback = undefined;
- // TODO: clarify what the boolean means
- resume callback;
- bun.default_allocator.destroy(callback.*);
- return false;
- }
-
- pub fn onStart(this: *@This()) void {
- if (this.res.hasResponded()) {
- this.writable.didClose();
- }
- }
- pub fn onWrite(this: *@This(), bytes: []const u8) JSC.Maybe(Blob.SizeType) {
- if (this.writable.aborted) {
- return .{ .result = 0 };
- }
-
- if (this.pending_chunk.len > 0) {
- return JSC.Maybe(Blob.SizeType).retry;
- }
-
- if (this.res.write(bytes)) {
- return .{ .result = @truncate(Blob.SizeType, bytes.len) };
- }
-
- this.pending_chunk = bytes;
- this.writable.pending_err = null;
- suspend {
- if (!this.is_listening_for_abort) {
- this.is_listening_for_abort = true;
- this.res.onAborted(*@This(), onAborted);
- }
-
- this.res.onWritable(*@This(), onWritable, this);
- var frame = bun.default_allocator.create(@TypeOf(@Frame(onWrite))) catch unreachable;
- this.callback = frame;
- frame.* = @frame().*;
- }
- const wrote = this.wrote;
- this.wrote = 0;
- if (this.writable.pending_err) |err| {
- this.writable.pending_err = null;
- return .{ .err = err };
- }
- return .{ .result = wrote };
- }
-
- // client-initiated
- pub fn onAborted(this: *@This(), _: *UWSResponse) void {
- this.writable.didAbort();
- }
- // writer-initiated
- pub fn onAbort(this: *@This()) void {
- this.res.end("", true);
- }
- pub fn onClose(this: *@This()) void {
- this.res.end("", false);
- }
- pub fn deinit(_: *@This()) void {}
-
- pub const Writable = WritableStreamSink(@This(), onStart, onWrite, onAbort, onClose, deinit);
- };
-}
-pub const HTTPSWriter = HTTPServerWritable(true);
-pub const HTTPWriter = HTTPServerWritable(false);
-
-pub fn ReadableStreamSource(
- comptime Context: type,
- comptime name_: []const u8,
- comptime onStart: anytype,
- comptime onPull: anytype,
- comptime onCancel: fn (this: *Context) void,
- comptime deinit: fn (this: *Context) void,
-) type {
- return struct {
- context: Context,
- cancelled: bool = false,
- deinited: bool = false,
- pending_err: ?JSC.Node.Syscall.Error = null,
- close_handler: ?fn (*anyopaque) void = null,
- close_ctx: ?*anyopaque = null,
- close_jsvalue: JSValue = JSValue.zero,
- globalThis: *JSGlobalObject = undefined,
-
- const This = @This();
- const ReadableStreamSourceType = @This();
-
- pub fn pull(this: *This, buf: []u8) StreamResult {
- return onPull(&this.context, buf, JSValue.zero);
- }
-
- pub fn start(
- this: *This,
- ) StreamStart {
- return onStart(&this.context);
- }
-
- pub fn pullFromJS(this: *This, buf: []u8, view: JSValue) StreamResult {
- return onPull(&this.context, buf, view);
- }
-
- pub fn startFromJS(this: *This) StreamStart {
- return onStart(&this.context);
- }
-
- pub fn cancel(this: *This) void {
- if (this.cancelled or this.deinited) {
- return;
- }
-
- this.cancelled = true;
- onCancel(&this.context);
- }
-
- pub fn onClose(this: *This) void {
- if (this.cancelled or this.deinited) {
- return;
- }
-
- if (this.close_handler) |close| {
- this.close_handler = null;
- close(this.close_ctx);
- }
- }
-
- pub fn deinit(this: *This) void {
- if (this.deinited) {
- return;
- }
- this.deinited = true;
- deinit(&this.context);
- }
-
- pub fn getError(this: *This) ?JSC.Node.Syscall.Error {
- if (this.pending_err) |err| {
- this.pending_err = null;
- return err;
- }
-
- return null;
- }
-
- pub fn toJS(this: *ReadableStreamSourceType, globalThis: *JSGlobalObject) JSC.JSValue {
- return ReadableStream.fromNative(globalThis, Context.tag, this);
- }
-
- pub const JSReadableStreamSource = struct {
- pub const shim = JSC.Shimmer(std.mem.span(name_), "JSReadableStreamSource", @This());
- pub const name = std.fmt.comptimePrint("{s}_JSReadableStreamSource", .{std.mem.span(name_)});
-
- pub fn pull(globalThis: *JSGlobalObject, callFrame: *JSC.CallFrame) callconv(.C) JSC.JSValue {
- var this = callFrame.argument(0).asPtr(ReadableStreamSourceType);
- const view = callFrame.argument(1);
- view.ensureStillAlive();
- var buffer = view.asArrayBuffer(globalThis) orelse return JSC.JSValue.jsUndefined();
- return processResult(
- globalThis,
- callFrame,
- this.pullFromJS(buffer.slice(), view),
- );
- }
- pub fn start(globalThis: *JSGlobalObject, callFrame: *JSC.CallFrame) callconv(.C) JSC.JSValue {
- var this = callFrame.argument(0).asPtr(ReadableStreamSourceType);
- switch (this.startFromJS()) {
- .empty => return JSValue.jsNumber(0),
- .chunk_size => |size| return JSValue.jsNumber(size),
- .err => |err| {
- globalThis.vm().throwError(globalThis, err.toJSC(globalThis));
- return JSC.JSValue.jsUndefined();
- },
- }
- }
-
- pub fn processResult(globalThis: *JSGlobalObject, callFrame: *JSC.CallFrame, result: StreamResult) JSC.JSValue {
- switch (result) {
- .err => |err| {
- globalThis.vm().throwError(globalThis, err.toJSC(globalThis));
- return JSValue.jsUndefined();
- },
- .temporary_and_done, .owned_and_done, .into_array_and_done => {
- JSC.C.JSObjectSetPropertyAtIndex(globalThis.ref(), callFrame.argument(2).asObjectRef(), 0, JSValue.jsBoolean(true).asObjectRef(), null);
- return result.toJS(globalThis);
- },
- else => return result.toJS(globalThis),
- }
- }
- pub fn cancel(_: *JSGlobalObject, callFrame: *JSC.CallFrame) callconv(.C) JSC.JSValue {
- var this = callFrame.argument(0).asPtr(ReadableStreamSourceType);
- this.cancel();
- return JSC.JSValue.jsUndefined();
- }
- pub fn setClose(globalThis: *JSGlobalObject, callFrame: *JSC.CallFrame) callconv(.C) JSC.JSValue {
- var this = callFrame.argument(0).asPtr(ReadableStreamSourceType);
- this.close_ctx = this;
- this.close_handler = JSReadableStreamSource.onClose;
- this.globalThis = globalThis;
- this.close_jsvalue = callFrame.argument(1);
- return JSC.JSValue.jsUndefined();
- }
-
- fn onClose(ptr: *anyopaque) void {
- var this = bun.cast(*ReadableStreamSourceType, ptr);
- _ = this.close_jsvalue.call(this.globalThis, &.{});
- // this.closer
- }
-
- pub fn deinit(_: *JSGlobalObject, callFrame: *JSC.CallFrame) callconv(.C) JSC.JSValue {
- var this = callFrame.argument(0).asPtr(ReadableStreamSourceType);
- this.deinit();
- return JSValue.jsUndefined();
- }
-
- pub fn load(globalThis: *JSGlobalObject) callconv(.C) JSC.JSValue {
- if (comptime JSC.is_bindgen) unreachable;
- if (comptime Environment.allow_assert) {
- // this should be cached per globals object
- const OnlyOnce = struct {
- pub threadlocal var last_globals: ?*JSGlobalObject = null;
- };
- if (OnlyOnce.last_globals) |last_globals| {
- std.debug.assert(last_globals != globalThis);
- }
- OnlyOnce.last_globals = globalThis;
- }
- return JSC.JSArray.from(globalThis, &.{
- JSC.NewFunction(globalThis, null, 1, JSReadableStreamSource.pull),
- JSC.NewFunction(globalThis, null, 1, JSReadableStreamSource.start),
- JSC.NewFunction(globalThis, null, 1, JSReadableStreamSource.cancel),
- JSC.NewFunction(globalThis, null, 1, JSReadableStreamSource.setClose),
- JSC.NewFunction(globalThis, null, 1, JSReadableStreamSource.deinit),
- });
- }
-
- pub const Export = shim.exportFunctions(.{
- .@"load" = load,
- });
-
- comptime {
- if (!JSC.is_bindgen) {
- @export(load, .{ .name = Export[0].symbol_name });
- _ = JSReadableStreamSource.pull;
- _ = JSReadableStreamSource.start;
- _ = JSReadableStreamSource.cancel;
- _ = JSReadableStreamSource.setClose;
- _ = JSReadableStreamSource.load;
- }
- }
- };
- };
-}
-
-pub const ByteBlobLoader = struct {
- offset: Blob.SizeType = 0,
- store: *Blob.Store,
- chunk_size: Blob.SizeType = 1024 * 1024 * 2,
- remain: Blob.SizeType = 1024 * 1024 * 2,
- done: bool = false,
-
- pub const tag = ReadableStream.Tag.Blob;
-
- pub fn setup(
- this: *ByteBlobLoader,
- blob: *const Blob,
- user_chunk_size: Blob.SizeType,
- ) void {
- blob.store.?.ref();
- var blobe = blob.*;
- blobe.resolveSize();
- this.* = ByteBlobLoader{
- .offset = blobe.offset,
- .store = blobe.store.?,
- .chunk_size = if (user_chunk_size > 0) @minimum(user_chunk_size, blobe.size) else @minimum(1024 * 1024 * 2, blobe.size),
- .remain = blobe.size,
- .done = false,
- };
- }
-
- pub fn onStart(this: *ByteBlobLoader) StreamStart {
- return .{ .chunk_size = this.chunk_size };
- }
-
- pub fn onPull(this: *ByteBlobLoader, buffer: []u8, array: JSC.JSValue) StreamResult {
- array.ensureStillAlive();
- defer array.ensureStillAlive();
- if (this.done) {
- return .{ .done = {} };
- }
-
- var temporary = this.store.sharedView();
- temporary = temporary[this.offset..];
-
- temporary = temporary[0..@minimum(buffer.len, @minimum(temporary.len, this.remain))];
- if (temporary.len == 0) {
- this.store.deref();
- this.done = true;
- return .{ .done = {} };
- }
-
- const copied = @intCast(Blob.SizeType, temporary.len);
-
- this.remain -|= copied;
- this.offset +|= copied;
- @memcpy(buffer.ptr, temporary.ptr, temporary.len);
- if (this.remain == 0) {
- return .{ .into_array_and_done = .{ .value = array, .len = copied } };
- }
-
- return .{ .into_array = .{ .value = array, .len = copied } };
- }
-
- pub fn onCancel(_: *ByteBlobLoader) void {}
-
- pub fn deinit(this: *ByteBlobLoader) void {
- if (!this.done) {
- this.done = true;
- this.store.deref();
- }
-
- bun.default_allocator.destroy(this);
- }
-
- pub const Source = ReadableStreamSource(@This(), "ByteBlob", onStart, onPull, onCancel, deinit);
-};
-
-pub const FileBlobLoader = struct {
- buf: []u8 = &[_]u8{},
- protected_view: JSC.JSValue = JSC.JSValue.zero,
- fd: JSC.Node.FileDescriptor = 0,
- auto_close: bool = false,
- loop: *JSC.EventLoop = undefined,
- mode: JSC.Node.Mode = 0,
- store: *Blob.Store,
- total_read: Blob.SizeType = 0,
- finalized: bool = false,
- callback: anyframe = undefined,
- pending: StreamResult.Pending = StreamResult.Pending{
- .frame = undefined,
- .used = false,
- .result = .{ .done = {} },
- },
- cancelled: bool = false,
- user_chunk_size: Blob.SizeType = 0,
- scheduled_count: u32 = 0,
- concurrent: Concurrent = Concurrent{},
-
- const FileReader = @This();
-
- const run_on_different_thread_size = bun.huge_allocator_threshold;
-
- pub const tag = ReadableStream.Tag.File;
-
- pub fn setup(this: *FileBlobLoader, store: *Blob.Store, chunk_size: Blob.SizeType) void {
- store.ref();
- this.* = .{
- .loop = JSC.VirtualMachine.vm.eventLoop(),
- .auto_close = store.data.file.pathlike == .path,
- .store = store,
- .user_chunk_size = chunk_size,
- };
- }
-
- pub fn watch(this: *FileReader) void {
- _ = JSC.VirtualMachine.vm.poller.watch(this.fd, .read, this, callback);
- this.scheduled_count += 1;
- }
-
- const Concurrent = struct {
- read: Blob.SizeType = 0,
- task: NetworkThread.Task = .{ .callback = Concurrent.taskCallback },
- completion: AsyncIO.Completion = undefined,
- read_frame: anyframe = undefined,
- chunk_size: Blob.SizeType = 0,
- main_thread_task: JSC.AnyTask = .{ .callback = onJSThread, .ctx = null },
-
- pub fn taskCallback(task: *NetworkThread.Task) void {
- var this = @fieldParentPtr(FileBlobLoader, "concurrent", @fieldParentPtr(Concurrent, "task", task));
- var frame = HTTPClient.getAllocator().create(@Frame(runAsync)) catch unreachable;
- _ = @asyncCall(std.mem.asBytes(frame), undefined, runAsync, .{this});
- }
-
- pub fn onRead(this: *FileBlobLoader, completion: *HTTPClient.NetworkThread.Completion, result: AsyncIO.ReadError!usize) void {
- this.concurrent.read = @truncate(Blob.SizeType, result catch |err| {
- if (@hasField(HTTPClient.NetworkThread.Completion, "result")) {
- this.pending.result = .{
- .err = JSC.Node.Syscall.Error{
- .errno = @intCast(JSC.Node.Syscall.Error.Int, -completion.result),
- .syscall = .read,
- },
- };
- } else {
- this.pending.result = .{
- .err = JSC.Node.Syscall.Error{
- // this is too hacky
- .errno = @truncate(JSC.Node.Syscall.Error.Int, @intCast(u16, @maximum(1, @errorToInt(err)))),
- .syscall = .read,
- },
- };
- }
- this.concurrent.read = 0;
- resume this.concurrent.read_frame;
- return;
- });
-
- resume this.concurrent.read_frame;
- }
-
- pub fn scheduleRead(this: *FileBlobLoader) void {
- if (comptime Environment.isMac) {
- var remaining = this.buf[this.concurrent.read..];
-
- while (remaining.len > 0) {
- const to_read = @minimum(@as(usize, this.concurrent.chunk_size), remaining.len);
- switch (JSC.Node.Syscall.read(this.fd, remaining[0..to_read])) {
- .err => |err| {
- const retry = comptime if (Environment.isLinux)
- std.os.E.WOULDBLOCK
- else
- std.os.E.AGAIN;
-
- switch (err.getErrno()) {
- retry => break,
- else => {},
- }
-
- this.pending.result = .{ .err = err };
- scheduleMainThreadTask(this);
- return;
- },
- .result => |result| {
- this.concurrent.read += @intCast(Blob.SizeType, result);
- remaining = remaining[result..];
-
- if (result == 0) {
- remaining.len = 0;
- break;
- }
- },
- }
- }
-
- if (remaining.len == 0) {
- scheduleMainThreadTask(this);
- return;
- }
- }
-
- AsyncIO.global.read(
- *FileBlobLoader,
- this,
- onRead,
- &this.concurrent.completion,
- this.fd,
- this.buf[this.concurrent.read..],
- null,
- );
-
- suspend {
- var _frame = @frame();
- var this_frame = HTTPClient.getAllocator().create(std.meta.Child(@TypeOf(_frame))) catch unreachable;
- this_frame.* = _frame.*;
- this.concurrent.read_frame = this_frame;
- }
-
- scheduleMainThreadTask(this);
- }
-
- pub fn onJSThread(task_ctx: *anyopaque) void {
- var this: *FileBlobLoader = bun.cast(*FileBlobLoader, task_ctx);
- const protected_view = this.protected_view;
- defer protected_view.unprotect();
- this.protected_view = JSC.JSValue.zero;
-
- if (this.finalized and this.scheduled_count == 0) {
- if (!this.pending.used) {
- resume this.pending.frame;
- }
- this.scheduled_count -= 1;
-
- this.deinit();
-
- return;
- }
-
- if (!this.pending.used and this.pending.result == .err and this.concurrent.read == 0) {
- resume this.pending.frame;
- this.scheduled_count -= 1;
- this.finalize();
- return;
- }
-
- if (this.concurrent.read == 0) {
- this.pending.result = .{ .done = {} };
- resume this.pending.frame;
- this.scheduled_count -= 1;
- this.finalize();
- return;
- }
-
- this.pending.result = this.handleReadChunk(@as(usize, this.concurrent.read));
- resume this.pending.frame;
- this.scheduled_count -= 1;
- if (this.pending.result.isDone()) {
- this.finalize();
- }
- }
-
- pub fn scheduleMainThreadTask(this: *FileBlobLoader) void {
- this.concurrent.main_thread_task.ctx = this;
- this.loop.enqueueTaskConcurrent(JSC.Task.init(&this.concurrent.main_thread_task));
- }
-
- fn runAsync(this: *FileBlobLoader) void {
- this.concurrent.read = 0;
-
- Concurrent.scheduleRead(this);
-
- suspend {
- HTTPClient.getAllocator().destroy(@frame());
- }
- }
- };
-
- pub fn scheduleAsync(this: *FileReader, chunk_size: Blob.SizeType) void {
- this.scheduled_count += 1;
- this.loop.virtual_machine.active_tasks +|= 1;
-
- NetworkThread.init() catch {};
- this.concurrent.chunk_size = chunk_size;
- NetworkThread.global.pool.schedule(.{ .head = &this.concurrent.task, .tail = &this.concurrent.task, .len = 1 });
- }
-
- const default_fifo_chunk_size = 1024;
- const default_file_chunk_size = 1024 * 1024 * 2;
- pub fn onStart(this: *FileBlobLoader) StreamStart {
- var file = &this.store.data.file;
- var file_buf: [std.fs.MAX_PATH_BYTES]u8 = undefined;
- var auto_close = this.auto_close;
- defer this.auto_close = auto_close;
- var fd = if (!auto_close)
- file.pathlike.fd
- else switch (JSC.Node.Syscall.open(file.pathlike.path.sliceZ(&file_buf), std.os.O.RDONLY | std.os.O.NONBLOCK | std.os.O.CLOEXEC, 0)) {
- .result => |_fd| _fd,
- .err => |err| {
- this.deinit();
- return .{ .err = err.withPath(file.pathlike.path.slice()) };
- },
- };
-
- if (!auto_close) {
- // ensure we have non-blocking IO set
- const flags = std.os.fcntl(fd, std.os.F.GETFL, 0) catch return .{ .err = JSC.Node.Syscall.Error.fromCode(std.os.E.BADF, .fcntl) };
-
- // if we do not, clone the descriptor and set non-blocking
- // it is important for us to clone it so we don't cause Weird Things to happen
- if ((flags & std.os.O.NONBLOCK) == 0) {
- auto_close = true;
- fd = @intCast(@TypeOf(fd), std.os.fcntl(fd, std.os.F.DUPFD, 0) catch return .{ .err = JSC.Node.Syscall.Error.fromCode(std.os.E.BADF, .fcntl) });
- _ = std.os.fcntl(fd, std.os.F.SETFL, flags | std.os.O.NONBLOCK) catch return .{ .err = JSC.Node.Syscall.Error.fromCode(std.os.E.BADF, .fcntl) };
- }
- }
-
- const stat: std.os.Stat = switch (JSC.Node.Syscall.fstat(fd)) {
- .result => |result| result,
- .err => |err| {
- if (auto_close) {
- _ = JSC.Node.Syscall.close(fd);
- }
- this.deinit();
- return .{ .err = err.withPath(file.pathlike.path.slice()) };
- },
- };
-
- if (std.os.S.ISDIR(stat.mode)) {
- const err = JSC.Node.Syscall.Error.fromCode(.ISDIR, .fstat);
- if (auto_close) {
- _ = JSC.Node.Syscall.close(fd);
- }
- this.deinit();
- return .{ .err = err };
- }
-
- if (std.os.S.ISSOCK(stat.mode)) {
- const err = JSC.Node.Syscall.Error.fromCode(.INVAL, .fstat);
-
- if (auto_close) {
- _ = JSC.Node.Syscall.close(fd);
- }
- this.deinit();
- return .{ .err = err };
- }
-
- file.seekable = std.os.S.ISREG(stat.mode);
- file.mode = @intCast(JSC.Node.Mode, stat.mode);
- this.mode = file.mode;
-
- if (file.seekable orelse false)
- file.max_size = @intCast(Blob.SizeType, stat.size);
-
- if ((file.seekable orelse false) and file.max_size == 0) {
- if (auto_close) {
- _ = JSC.Node.Syscall.close(fd);
- }
- this.deinit();
- return .{ .empty = {} };
- }
-
- this.fd = fd;
- this.auto_close = auto_close;
-
- const chunk_size = this.calculateChunkSize(std.math.maxInt(usize));
- return .{ .chunk_size = @truncate(Blob.SizeType, chunk_size) };
- }
-
- fn calculateChunkSize(this: *FileBlobLoader, available_to_read: usize) usize {
- const file = &this.store.data.file;
-
- const chunk_size: usize = if (this.user_chunk_size > 0)
- @as(usize, this.user_chunk_size)
- else if (file.seekable orelse false)
- @as(usize, default_file_chunk_size)
- else
- @as(usize, default_fifo_chunk_size);
-
- return if (file.max_size > 0)
- if (available_to_read != std.math.maxInt(usize)) @minimum(chunk_size, available_to_read) else @minimum(@maximum(this.total_read, file.max_size) - this.total_read, chunk_size)
- else
- @minimum(available_to_read, chunk_size);
- }
-
- pub fn onPull(this: *FileBlobLoader, buffer: []u8, view: JSC.JSValue) StreamResult {
- const chunk_size = this.calculateChunkSize(std.math.maxInt(usize));
-
- switch (chunk_size) {
- 0 => {
- std.debug.assert(this.store.data.file.seekable orelse false);
- this.finalize();
- return .{ .done = {} };
- },
- run_on_different_thread_size...std.math.maxInt(@TypeOf(chunk_size)) => {
- this.protected_view = view;
- this.protected_view.protect();
- // should never be reached
- this.pending.result = .{
- .err = JSC.Node.Syscall.Error.todo,
- };
- this.buf = buffer;
-
- this.scheduleAsync(@truncate(Blob.SizeType, chunk_size));
-
- return .{ .pending = &this.pending };
- },
- else => {},
- }
-
- return this.read(buffer, view);
- }
-
- fn maybeAutoClose(this: *FileBlobLoader) void {
- if (this.auto_close) {
- _ = JSC.Node.Syscall.close(this.fd);
- this.auto_close = false;
- }
- }
-
- fn handleReadChunk(this: *FileBlobLoader, result: usize) StreamResult {
- this.total_read += @intCast(Blob.SizeType, result);
- const remaining: Blob.SizeType = if (this.store.data.file.seekable orelse false)
- this.store.data.file.max_size -| this.total_read
- else
- @as(Blob.SizeType, std.math.maxInt(Blob.SizeType));
-
- // this handles:
- // - empty file
- // - stream closed for some reason
- if ((result == 0 and remaining == 0)) {
- this.finalize();
- return .{ .done = {} };
- }
-
- const has_more = remaining > 0;
-
- if (!has_more) {
- return .{ .into_array_and_done = .{ .len = @truncate(Blob.SizeType, result) } };
- }
-
- return .{ .into_array = .{ .len = @truncate(Blob.SizeType, result) } };
- }
-
- pub fn read(
- this: *FileBlobLoader,
- read_buf: []u8,
- view: JSC.JSValue,
- ) StreamResult {
- const rc =
- JSC.Node.Syscall.read(this.fd, read_buf);
-
- switch (rc) {
- .err => |err| {
- const retry = comptime if (Environment.isLinux)
- std.os.E.WOULDBLOCK
- else
- std.os.E.AGAIN;
-
- switch (err.getErrno()) {
- retry => {
- this.protected_view = view;
- this.protected_view.protect();
- this.buf = read_buf;
- this.watch();
- return .{
- .pending = &this.pending,
- };
- },
- else => {},
- }
- const sys = if (this.store.data.file.pathlike == .path and this.store.data.file.pathlike.path.slice().len > 0)
- err.withPath(this.store.data.file.pathlike.path.slice())
- else
- err;
-
- this.finalize();
- return .{ .err = sys };
- },
- .result => |result| {
- return this.handleReadChunk(result);
- },
- }
- }
-
- pub fn callback(task: ?*anyopaque, sizeOrOffset: i64, _: u16) void {
- var this: *FileReader = bun.cast(*FileReader, task.?);
- this.scheduled_count -= 1;
- const protected_view = this.protected_view;
- defer protected_view.unprotect();
- this.protected_view = JSValue.zero;
-
- var available_to_read: usize = std.math.maxInt(usize);
- if (comptime Environment.isMac) {
- if (std.os.S.ISREG(this.mode)) {
- // Returns when the file pointer is not at the end of
- // file. data contains the offset from current position
- // to end of file, and may be negative.
- available_to_read = @intCast(usize, @maximum(sizeOrOffset, 0));
- } else if (std.os.S.ISCHR(this.mode) or std.os.S.ISFIFO(this.mode)) {
- available_to_read = @intCast(usize, @maximum(sizeOrOffset, 0));
- }
- }
- if (this.finalized and this.scheduled_count == 0) {
- if (!this.pending.used) {
- // should never be reached
- this.pending.result = .{
- .err = JSC.Node.Syscall.Error.todo,
- };
- resume this.pending.frame;
- }
- this.deinit();
- return;
- }
- if (this.cancelled)
- return;
-
- if (this.buf.len == 0) {
- return;
- } else {
- this.buf.len = @minimum(this.buf.len, available_to_read);
- }
-
- this.pending.result = this.read(this.buf, this.protected_view);
- resume this.pending.frame;
- }
-
- pub fn finalize(this: *FileBlobLoader) void {
- if (this.finalized)
- return;
- this.finalized = true;
-
- this.maybeAutoClose();
-
- this.store.deref();
- }
-
- pub fn onCancel(this: *FileBlobLoader) void {
- this.cancelled = true;
-
- this.deinit();
- }
-
- pub fn deinit(this: *FileBlobLoader) void {
- this.finalize();
- if (this.scheduled_count == 0 and !this.pending.used) {
- this.destroy();
- }
- }
-
- pub fn destroy(this: *FileBlobLoader) void {
- bun.default_allocator.destroy(this);
- }
-
- pub const Source = ReadableStreamSource(@This(), "FileBlobLoader", onStart, onPull, onCancel, deinit);
-};
-
-// pub const BlobFileLoader = struct {
-// reader: Reader,
-// source: ?*anyopaque = null,
-// store: *Store,
-// pub const Reader = NewFileReader(onRead, onError, onWouldBlock, isCancelled);
-
-// pub fn onRead(this: *BlobFileReader, buf: []u8, len: Blob.SizeType) bool {
-// if (len == 0) {
-// return BlobStore__onRead(this.source, null, 0);
-// }
-
-// return BlobStore__onReadExternal(this.source, buf.ptr, len, buf.ptr, JSC.MarkedArrayBuffer_deallocator);
-// }
-
-// pub fn onError(ctx: *BlobFileReader, err: JSC.SystemError) void {
-// _ = BlobStore__onError(ctx.source, &err, ctx.reader.global);
-// }
-
-// pub fn isCancelled(ctx: *BlobFileReader) bool {
-// return BlobStore__isCancelled(ctx.source);
-// }
-// };
diff --git a/src/javascript/jsc/webcore/streams.zig b/src/javascript/jsc/webcore/streams.zig
new file mode 100644
index 000000000..730513615
--- /dev/null
+++ b/src/javascript/jsc/webcore/streams.zig
@@ -0,0 +1,1321 @@
+const std = @import("std");
+const Api = @import("../../../api/schema.zig").Api;
+const bun = @import("../../../global.zig");
+const RequestContext = @import("../../../http.zig").RequestContext;
+const MimeType = @import("../../../http.zig").MimeType;
+const ZigURL = @import("../../../url.zig").URL;
+const HTTPClient = @import("http");
+const NetworkThread = HTTPClient.NetworkThread;
+const AsyncIO = NetworkThread.AsyncIO;
+const JSC = @import("javascript_core");
+const js = JSC.C;
+
+const Method = @import("../../../http/method.zig").Method;
+const FetchHeaders = JSC.FetchHeaders;
+const ObjectPool = @import("../../../pool.zig").ObjectPool;
+const SystemError = JSC.SystemError;
+const Output = @import("../../../global.zig").Output;
+const MutableString = @import("../../../global.zig").MutableString;
+const strings = @import("../../../global.zig").strings;
+const string = @import("../../../global.zig").string;
+const default_allocator = @import("../../../global.zig").default_allocator;
+const FeatureFlags = @import("../../../global.zig").FeatureFlags;
+const ArrayBuffer = @import("../base.zig").ArrayBuffer;
+const Properties = @import("../base.zig").Properties;
+const NewClass = @import("../base.zig").NewClass;
+const d = @import("../base.zig").d;
+const castObj = @import("../base.zig").castObj;
+const getAllocator = @import("../base.zig").getAllocator;
+const JSPrivateDataPtr = @import("../base.zig").JSPrivateDataPtr;
+const GetJSPrivateData = @import("../base.zig").GetJSPrivateData;
+const Environment = @import("../../../env.zig");
+const ZigString = JSC.ZigString;
+const IdentityContext = @import("../../../identity_context.zig").IdentityContext;
+const JSInternalPromise = JSC.JSInternalPromise;
+const JSPromise = JSC.JSPromise;
+const JSValue = JSC.JSValue;
+const JSError = JSC.JSError;
+const JSGlobalObject = JSC.JSGlobalObject;
+
+const VirtualMachine = @import("../javascript.zig").VirtualMachine;
+const Task = JSC.Task;
+const JSPrinter = @import("../../../js_printer.zig");
+const picohttp = @import("picohttp");
+const StringJoiner = @import("../../../string_joiner.zig");
+const uws = @import("uws");
+const Blob = JSC.WebCore.Blob;
+const Response = JSC.WebCore.Response;
+const Request = JSC.WebCore.Request;
+
+pub const ReadableStream = struct {
+ pub const Tag = enum(i32) {
+ Blob = 1,
+ File = 2,
+ HTTPRequest = 3,
+ HTTPSRequest = 3,
+ };
+
+ extern fn ZigGlobalObject__createNativeReadableStream(*JSGlobalObject, nativePtr: JSValue, nativeType: JSValue) JSValue;
+
+ pub fn fromNative(globalThis: *JSGlobalObject, id: Tag, ptr: *anyopaque) JSC.JSValue {
+ return ZigGlobalObject__createNativeReadableStream(globalThis, JSValue.fromPtr(ptr), JSValue.jsNumber(@enumToInt(id)));
+ }
+ pub fn fromBlob(globalThis: *JSGlobalObject, blob: *const Blob, recommended_chunk_size: Blob.SizeType) JSC.JSValue {
+ if (comptime JSC.is_bindgen)
+ unreachable;
+ var store = blob.store orelse {
+ return ReadableStream.empty(globalThis);
+ };
+ switch (store.data) {
+ .bytes => {
+ var reader = bun.default_allocator.create(ByteBlobLoader.Source) catch unreachable;
+ reader.* = .{
+ .context = undefined,
+ };
+ reader.context.setup(blob, recommended_chunk_size);
+ return reader.toJS(globalThis);
+ },
+ .file => {
+ var reader = bun.default_allocator.create(FileBlobLoader.Source) catch unreachable;
+ reader.* = .{
+ .context = undefined,
+ };
+ reader.context.setup(store, recommended_chunk_size);
+ return reader.toJS(globalThis);
+ },
+ }
+ }
+
+ pub fn empty(globalThis: *JSGlobalObject) JSC.JSValue {
+ if (comptime JSC.is_bindgen)
+ unreachable;
+
+ return ReadableStream__empty(globalThis);
+ }
+
+ extern fn ReadableStream__empty(*JSGlobalObject) JSC.JSValue;
+ extern fn ReadableStream__fromBlob(
+ *JSGlobalObject,
+ store: *anyopaque,
+ offset: usize,
+ length: usize,
+ ) JSC.JSValue;
+ const Base = @import("../../../ast/base.zig");
+ pub const StreamTag = enum(usize) {
+ invalid = 0,
+ _,
+
+ pub fn init(filedes: JSC.Node.FileDescriptor) StreamTag {
+ var bytes = [8]u8{ 1, 0, 0, 0, 0, 0, 0, 0 };
+ const filedes_ = @bitCast([8]u8, @as(usize, @truncate(u56, @intCast(usize, filedes))));
+ bytes[1..8].* = filedes_[0..7].*;
+
+ return @intToEnum(StreamTag, @bitCast(u64, bytes));
+ }
+
+ pub fn fd(this: StreamTag) JSC.Node.FileDescriptor {
+ var bytes = @bitCast([8]u8, @enumToInt(this));
+ if (bytes[0] != 1) {
+ return std.math.maxInt(JSC.Node.FileDescriptor);
+ }
+ var out: u64 = 0;
+ @bitCast([8]u8, out)[0..7].* = bytes[1..8].*;
+ return @intCast(JSC.Node.FileDescriptor, out);
+ }
+ };
+
+ pub fn NewNativeReader(
+ comptime Context: type,
+ comptime onEnqueue: anytype,
+ comptime onEnqueueMany: anytype,
+ comptime onClose: anytype,
+ comptime onError: anytype,
+ comptime name_: []const u8,
+ ) type {
+ return struct {
+ pub const JSReadableStreamReaderNative = struct {
+ pub const shim = JSC.Shimmer(std.mem.span(name_), "JSReadableStreamReaderNative", @This());
+ pub const tag = Context.tag;
+ pub const name = std.fmt.comptimePrint("{s}_JSReadableStreamReaderNative", .{std.mem.span(name_)});
+
+ pub fn enqueue(globalThis: *JSGlobalObject, callframe: *const JSC.CallFrame) callconv(.C) JSC.JSValue {
+ var this = callframe.argument(0).asPtr(*Context);
+ var buffer = callframe.argument(1).asArrayBuffer(globalThis) orelse {
+ globalThis.vm().throwError(globalThis, JSC.toInvalidArguments("Expected TypedArray or ArrayBuffer", .{}, globalThis));
+ return JSC.JSValue.jsUndefined();
+ };
+ return onEnqueue(this, globalThis, buffer.slice(), callframe.argument(1));
+ }
+
+ pub fn enqueueMany(globalThis: *JSGlobalObject, callframe: *const JSC.CallFrame) callconv(.C) JSC.JSValue {
+ var this = callframe.argument(0).asPtr(*Context);
+ return onEnqueueMany(this, globalThis, callframe.argument(1));
+ }
+
+ pub fn close(globalThis: *JSGlobalObject, callframe: *const JSC.CallFrame) callconv(.C) JSC.JSValue {
+ var this = callframe.argument(0).asPtr(*Context);
+ return onClose(this, globalThis, callframe.argument(1));
+ }
+
+ pub fn @"error"(globalThis: *JSGlobalObject, callframe: *const JSC.CallFrame) callconv(.C) JSC.JSValue {
+ var this = callframe.argument(0).asPtr(*Context);
+ return onError(this, globalThis, callframe.argument(1));
+ }
+
+ pub fn load(globalThis: *JSGlobalObject) callconv(.C) JSC.JSValue {
+ if (comptime JSC.is_bindgen) unreachable;
+ if (comptime Environment.allow_assert) {
+ // this should be cached per globals object
+ const OnlyOnce = struct {
+ pub threadlocal var last_globals: ?*JSGlobalObject = null;
+ };
+ if (OnlyOnce.last_globals) |last_globals| {
+ std.debug.assert(last_globals != globalThis);
+ }
+ OnlyOnce.last_globals = globalThis;
+ }
+ return JSC.JSArray.from(globalThis, &.{
+ JSC.NewFunction(globalThis, null, 2, JSReadableStreamReaderNative.enqueue),
+ JSC.NewFunction(globalThis, null, 2, JSReadableStreamReaderNative.enqueueMany),
+ JSC.NewFunction(globalThis, null, 2, JSReadableStreamReaderNative.close),
+ JSC.NewFunction(globalThis, null, 2, JSReadableStreamReaderNative.@"error"),
+ });
+ }
+
+ pub const Export = shim.exportFunctions(.{
+ .@"load" = load,
+ });
+
+ comptime {
+ if (!JSC.is_bindgen) {
+ @export(load, .{ .name = Export[0].symbol_name });
+ _ = JSReadableStreamReaderNative.enqueue;
+ _ = JSReadableStreamReaderNative.enqueueMany;
+ _ = JSReadableStreamReaderNative.close;
+ _ = JSReadableStreamReaderNative.@"error";
+ }
+ }
+ };
+ };
+ }
+};
+
+pub const StreamStart = union(enum) {
+ empty: void,
+ err: JSC.Node.Syscall.Error,
+ chunk_size: Blob.SizeType,
+ ready: void,
+};
+
+pub const StreamResult = union(enum) {
+ owned: bun.ByteList,
+ owned_and_done: bun.ByteList,
+ temporary_and_done: bun.ByteList,
+ temporary: bun.ByteList,
+ into_array: IntoArray,
+ into_array_and_done: IntoArray,
+ pending: *Pending,
+ err: JSC.Node.Syscall.Error,
+ done: void,
+
+ pub const IntoArray = struct {
+ value: JSValue = JSValue.zero,
+ len: Blob.SizeType = std.math.maxInt(Blob.SizeType),
+ };
+
+ pub const Pending = struct {
+ frame: anyframe,
+ result: StreamResult,
+ used: bool = false,
+ };
+
+ pub fn isDone(this: *const StreamResult) bool {
+ return switch (this.*) {
+ .owned_and_done, .temporary_and_done, .into_array_and_done, .done, .err => true,
+ else => false,
+ };
+ }
+
+ fn toPromisedWrap(globalThis: *JSGlobalObject, promise: *JSPromise, pending: *Pending) void {
+ suspend {}
+
+ pending.used = true;
+ const result: StreamResult = pending.result;
+
+ switch (result) {
+ .err => |err| {
+ promise.reject(globalThis, err.toJSC(globalThis));
+ },
+ .done => {
+ promise.resolve(globalThis, JSValue.jsBoolean(false));
+ },
+ else => {
+ promise.resolve(globalThis, result.toJS(globalThis));
+ },
+ }
+ }
+
+ pub fn toPromised(globalThis: *JSGlobalObject, promise: *JSPromise, pending: *Pending) void {
+ var frame = bun.default_allocator.create(@Frame(toPromisedWrap)) catch unreachable;
+ frame.* = async toPromisedWrap(globalThis, promise, pending);
+ pending.frame = frame;
+ }
+
+ pub fn toJS(this: *const StreamResult, globalThis: *JSGlobalObject) JSValue {
+ switch (this.*) {
+ .owned => |list| {
+ return JSC.ArrayBuffer.fromBytes(list.slice(), .Uint8Array).toJS(globalThis.ref(), null);
+ },
+ .owned_and_done => |list| {
+ return JSC.ArrayBuffer.fromBytes(list.slice(), .Uint8Array).toJS(globalThis.ref(), null);
+ },
+ .temporary => |temp| {
+ var array = JSC.JSValue.createUninitializedUint8Array(globalThis, temp.len);
+ var slice = array.asArrayBuffer(globalThis).?.slice();
+ @memcpy(slice.ptr, temp.ptr, temp.len);
+ return array;
+ },
+ .temporary_and_done => |temp| {
+ var array = JSC.JSValue.createUninitializedUint8Array(globalThis, temp.len);
+ var slice = array.asArrayBuffer(globalThis).?.slice();
+ @memcpy(slice.ptr, temp.ptr, temp.len);
+ return array;
+ },
+ .into_array => |array| {
+ return JSC.JSValue.jsNumberFromInt64(array.len);
+ },
+ .into_array_and_done => |array| {
+ return JSC.JSValue.jsNumberFromInt64(array.len);
+ },
+ .pending => |pending| {
+ var promise = JSC.JSPromise.create(globalThis);
+ toPromised(globalThis, promise, pending);
+ return promise.asValue(globalThis);
+ },
+
+ .err => |err| {
+ return JSC.JSPromise.rejectedPromise(globalThis, JSValue.c(err.toJS(globalThis.ref()))).asValue(globalThis);
+ },
+
+ // false == controller.close()
+ // undefined == noop, but we probably won't send it
+ .done => {
+ return JSC.JSValue.jsBoolean(false);
+ },
+ }
+ }
+};
+
+pub fn WritableStreamSink(
+ comptime Context: type,
+ comptime onStart: ?fn (this: Context) void,
+ comptime onWrite: fn (this: Context, bytes: []const u8) JSC.Maybe(Blob.SizeType),
+ comptime onAbort: ?fn (this: Context) void,
+ comptime onClose: ?fn (this: Context) void,
+ comptime deinit: ?fn (this: Context) void,
+) type {
+ return struct {
+ context: Context,
+ closed: bool = false,
+ deinited: bool = false,
+ pending_err: ?JSC.Node.Syscall.Error = null,
+ aborted: bool = false,
+
+ abort_signaler: ?*anyopaque = null,
+ onAbortCallback: ?fn (?*anyopaque) void = null,
+
+ close_signaler: ?*anyopaque = null,
+ onCloseCallback: ?fn (?*anyopaque) void = null,
+
+ pub const This = @This();
+
+ pub fn write(this: *This, bytes: []const u8) JSC.Maybe(Blob.SizeType) {
+ if (this.pending_err) |err| {
+ this.pending_err = null;
+ return .{ .err = err };
+ }
+
+ if (this.closed or this.aborted or this.deinited) {
+ return .{ .result = 0 };
+ }
+ return onWrite(&this.context, bytes);
+ }
+
+ pub fn start(this: *This) StreamStart {
+ return onStart(&this.context);
+ }
+
+ pub fn abort(this: *This) void {
+ if (this.closed or this.deinited or this.aborted) {
+ return;
+ }
+
+ this.aborted = true;
+ onAbort(&this.context);
+ }
+
+ pub fn didAbort(this: *This) void {
+ if (this.closed or this.deinited or this.aborted) {
+ return;
+ }
+ this.aborted = true;
+
+ if (this.onAbortCallback) |cb| {
+ this.onAbortCallback = null;
+ cb(this.abort_signaler);
+ }
+ }
+
+ pub fn didClose(this: *This) void {
+ if (this.closed or this.deinited or this.aborted) {
+ return;
+ }
+ this.closed = true;
+
+ if (this.onCloseCallback) |cb| {
+ this.onCloseCallback = null;
+ cb(this.close_signaler);
+ }
+ }
+
+ pub fn close(this: *This) void {
+ if (this.closed or this.deinited or this.aborted) {
+ return;
+ }
+
+ this.closed = true;
+ onClose(this.context);
+ }
+
+ pub fn deinit(this: *This) void {
+ if (this.deinited) {
+ return;
+ }
+ this.deinited = true;
+ deinit(this.context);
+ }
+
+ pub fn getError(this: *This) ?JSC.Node.Syscall.Error {
+ if (this.pending_err) |err| {
+ this.pending_err = null;
+ return err;
+ }
+
+ return null;
+ }
+ };
+}
+
+pub fn HTTPServerWritable(comptime ssl: bool) type {
+ return struct {
+ pub const UWSResponse = uws.NewApp(ssl).Response;
+ res: *UWSResponse,
+ pending_chunk: []const u8 = "",
+ is_listening_for_abort: bool = false,
+ wrote: Blob.SizeType = 0,
+ callback: anyframe->JSC.Maybe(Blob.SizeType) = undefined,
+ writable: Writable,
+
+ pub fn onWritable(this: *@This(), available: c_ulong, _: *UWSResponse) callconv(.C) bool {
+ const to_write = @minimum(@truncate(Blob.SizeType, available), @truncate(Blob.SizeType, this.pending_chunk.len));
+ if (!this.res.write(this.pending_chunk[0..to_write])) {
+ return true;
+ }
+
+ this.pending_chunk = this.pending_chunk[to_write..];
+ this.wrote += to_write;
+ if (this.pending_chunk.len > 0) {
+ this.res.onWritable(*@This(), onWritable, this);
+ return true;
+ }
+
+ var callback = this.callback;
+ this.callback = undefined;
+ // TODO: clarify what the boolean means
+ resume callback;
+ bun.default_allocator.destroy(callback.*);
+ return false;
+ }
+
+ pub fn onStart(this: *@This()) void {
+ if (this.res.hasResponded()) {
+ this.writable.didClose();
+ }
+ }
+ pub fn onWrite(this: *@This(), bytes: []const u8) JSC.Maybe(Blob.SizeType) {
+ if (this.writable.aborted) {
+ return .{ .result = 0 };
+ }
+
+ if (this.pending_chunk.len > 0) {
+ return JSC.Maybe(Blob.SizeType).retry;
+ }
+
+ if (this.res.write(bytes)) {
+ return .{ .result = @truncate(Blob.SizeType, bytes.len) };
+ }
+
+ this.pending_chunk = bytes;
+ this.writable.pending_err = null;
+ suspend {
+ if (!this.is_listening_for_abort) {
+ this.is_listening_for_abort = true;
+ this.res.onAborted(*@This(), onAborted);
+ }
+
+ this.res.onWritable(*@This(), onWritable, this);
+ var frame = bun.default_allocator.create(@TypeOf(@Frame(onWrite))) catch unreachable;
+ this.callback = frame;
+ frame.* = @frame().*;
+ }
+ const wrote = this.wrote;
+ this.wrote = 0;
+ if (this.writable.pending_err) |err| {
+ this.writable.pending_err = null;
+ return .{ .err = err };
+ }
+ return .{ .result = wrote };
+ }
+
+ // client-initiated
+ pub fn onAborted(this: *@This(), _: *UWSResponse) void {
+ this.writable.didAbort();
+ }
+ // writer-initiated
+ pub fn onAbort(this: *@This()) void {
+ this.res.end("", true);
+ }
+ pub fn onClose(this: *@This()) void {
+ this.res.end("", false);
+ }
+ pub fn deinit(_: *@This()) void {}
+
+ pub const Writable = WritableStreamSink(@This(), onStart, onWrite, onAbort, onClose, deinit);
+ };
+}
+pub const HTTPSWriter = HTTPServerWritable(true);
+pub const HTTPWriter = HTTPServerWritable(false);
+
+pub fn ReadableStreamSource(
+ comptime Context: type,
+ comptime name_: []const u8,
+ comptime onStart: anytype,
+ comptime onPull: anytype,
+ comptime onCancel: fn (this: *Context) void,
+ comptime deinit: fn (this: *Context) void,
+) type {
+ return struct {
+ context: Context,
+ cancelled: bool = false,
+ deinited: bool = false,
+ pending_err: ?JSC.Node.Syscall.Error = null,
+ close_handler: ?fn (*anyopaque) void = null,
+ close_ctx: ?*anyopaque = null,
+ close_jsvalue: JSValue = JSValue.zero,
+ globalThis: *JSGlobalObject = undefined,
+
+ const This = @This();
+ const ReadableStreamSourceType = @This();
+
+ pub fn pull(this: *This, buf: []u8) StreamResult {
+ return onPull(&this.context, buf, JSValue.zero);
+ }
+
+ pub fn start(
+ this: *This,
+ ) StreamStart {
+ return onStart(&this.context);
+ }
+
+ pub fn pullFromJS(this: *This, buf: []u8, view: JSValue) StreamResult {
+ return onPull(&this.context, buf, view);
+ }
+
+ pub fn startFromJS(this: *This) StreamStart {
+ return onStart(&this.context);
+ }
+
+ pub fn cancel(this: *This) void {
+ if (this.cancelled or this.deinited) {
+ return;
+ }
+
+ this.cancelled = true;
+ onCancel(&this.context);
+ }
+
+ pub fn onClose(this: *This) void {
+ if (this.cancelled or this.deinited) {
+ return;
+ }
+
+ if (this.close_handler) |close| {
+ this.close_handler = null;
+ close(this.close_ctx);
+ }
+ }
+
+ pub fn deinit(this: *This) void {
+ if (this.deinited) {
+ return;
+ }
+ this.deinited = true;
+ deinit(&this.context);
+ }
+
+ pub fn getError(this: *This) ?JSC.Node.Syscall.Error {
+ if (this.pending_err) |err| {
+ this.pending_err = null;
+ return err;
+ }
+
+ return null;
+ }
+
+ pub fn toJS(this: *ReadableStreamSourceType, globalThis: *JSGlobalObject) JSC.JSValue {
+ return ReadableStream.fromNative(globalThis, Context.tag, this);
+ }
+
+ pub const JSReadableStreamSource = struct {
+ pub const shim = JSC.Shimmer(std.mem.span(name_), "JSReadableStreamSource", @This());
+ pub const name = std.fmt.comptimePrint("{s}_JSReadableStreamSource", .{std.mem.span(name_)});
+
+ pub fn pull(globalThis: *JSGlobalObject, callFrame: *JSC.CallFrame) callconv(.C) JSC.JSValue {
+ var this = callFrame.argument(0).asPtr(ReadableStreamSourceType);
+ const view = callFrame.argument(1);
+ view.ensureStillAlive();
+ var buffer = view.asArrayBuffer(globalThis) orelse return JSC.JSValue.jsUndefined();
+ return processResult(
+ globalThis,
+ callFrame,
+ this.pullFromJS(buffer.slice(), view),
+ );
+ }
+ pub fn start(globalThis: *JSGlobalObject, callFrame: *JSC.CallFrame) callconv(.C) JSC.JSValue {
+ var this = callFrame.argument(0).asPtr(ReadableStreamSourceType);
+ switch (this.startFromJS()) {
+ .empty => return JSValue.jsNumber(0),
+ .ready => return JSValue.jsNumber(16384),
+ .chunk_size => |size| return JSValue.jsNumber(size),
+ .err => |err| {
+ globalThis.vm().throwError(globalThis, err.toJSC(globalThis));
+ return JSC.JSValue.jsUndefined();
+ },
+ }
+ }
+
+ pub fn processResult(globalThis: *JSGlobalObject, callFrame: *JSC.CallFrame, result: StreamResult) JSC.JSValue {
+ switch (result) {
+ .err => |err| {
+ globalThis.vm().throwError(globalThis, err.toJSC(globalThis));
+ return JSValue.jsUndefined();
+ },
+ .temporary_and_done, .owned_and_done, .into_array_and_done => {
+ JSC.C.JSObjectSetPropertyAtIndex(globalThis.ref(), callFrame.argument(2).asObjectRef(), 0, JSValue.jsBoolean(true).asObjectRef(), null);
+ return result.toJS(globalThis);
+ },
+ else => return result.toJS(globalThis),
+ }
+ }
+ pub fn cancel(_: *JSGlobalObject, callFrame: *JSC.CallFrame) callconv(.C) JSC.JSValue {
+ var this = callFrame.argument(0).asPtr(ReadableStreamSourceType);
+ this.cancel();
+ return JSC.JSValue.jsUndefined();
+ }
+ pub fn setClose(globalThis: *JSGlobalObject, callFrame: *JSC.CallFrame) callconv(.C) JSC.JSValue {
+ var this = callFrame.argument(0).asPtr(ReadableStreamSourceType);
+ this.close_ctx = this;
+ this.close_handler = JSReadableStreamSource.onClose;
+ this.globalThis = globalThis;
+ this.close_jsvalue = callFrame.argument(1);
+ return JSC.JSValue.jsUndefined();
+ }
+
+ fn onClose(ptr: *anyopaque) void {
+ var this = bun.cast(*ReadableStreamSourceType, ptr);
+ _ = this.close_jsvalue.call(this.globalThis, &.{});
+ // this.closer
+ }
+
+ pub fn deinit(_: *JSGlobalObject, callFrame: *JSC.CallFrame) callconv(.C) JSC.JSValue {
+ var this = callFrame.argument(0).asPtr(ReadableStreamSourceType);
+ this.deinit();
+ return JSValue.jsUndefined();
+ }
+
+ pub fn load(globalThis: *JSGlobalObject) callconv(.C) JSC.JSValue {
+ if (comptime JSC.is_bindgen) unreachable;
+ if (comptime Environment.allow_assert) {
+ // this should be cached per globals object
+ const OnlyOnce = struct {
+ pub threadlocal var last_globals: ?*JSGlobalObject = null;
+ };
+ if (OnlyOnce.last_globals) |last_globals| {
+ std.debug.assert(last_globals != globalThis);
+ }
+ OnlyOnce.last_globals = globalThis;
+ }
+ return JSC.JSArray.from(globalThis, &.{
+ JSC.NewFunction(globalThis, null, 1, JSReadableStreamSource.pull),
+ JSC.NewFunction(globalThis, null, 1, JSReadableStreamSource.start),
+ JSC.NewFunction(globalThis, null, 1, JSReadableStreamSource.cancel),
+ JSC.NewFunction(globalThis, null, 1, JSReadableStreamSource.setClose),
+ JSC.NewFunction(globalThis, null, 1, JSReadableStreamSource.deinit),
+ });
+ }
+
+ pub const Export = shim.exportFunctions(.{
+ .@"load" = load,
+ });
+
+ comptime {
+ if (!JSC.is_bindgen) {
+ @export(load, .{ .name = Export[0].symbol_name });
+ _ = JSReadableStreamSource.pull;
+ _ = JSReadableStreamSource.start;
+ _ = JSReadableStreamSource.cancel;
+ _ = JSReadableStreamSource.setClose;
+ _ = JSReadableStreamSource.load;
+ }
+ }
+ };
+ };
+}
+
+pub const ByteBlobLoader = struct {
+ offset: Blob.SizeType = 0,
+ store: *Blob.Store,
+ chunk_size: Blob.SizeType = 1024 * 1024 * 2,
+ remain: Blob.SizeType = 1024 * 1024 * 2,
+ done: bool = false,
+
+ pub const tag = ReadableStream.Tag.Blob;
+
+ pub fn setup(
+ this: *ByteBlobLoader,
+ blob: *const Blob,
+ user_chunk_size: Blob.SizeType,
+ ) void {
+ blob.store.?.ref();
+ var blobe = blob.*;
+ blobe.resolveSize();
+ this.* = ByteBlobLoader{
+ .offset = blobe.offset,
+ .store = blobe.store.?,
+ .chunk_size = if (user_chunk_size > 0) @minimum(user_chunk_size, blobe.size) else @minimum(1024 * 1024 * 2, blobe.size),
+ .remain = blobe.size,
+ .done = false,
+ };
+ }
+
+ pub fn onStart(this: *ByteBlobLoader) StreamStart {
+ return .{ .chunk_size = this.chunk_size };
+ }
+
+ pub fn onPull(this: *ByteBlobLoader, buffer: []u8, array: JSC.JSValue) StreamResult {
+ array.ensureStillAlive();
+ defer array.ensureStillAlive();
+ if (this.done) {
+ return .{ .done = {} };
+ }
+
+ var temporary = this.store.sharedView();
+ temporary = temporary[this.offset..];
+
+ temporary = temporary[0..@minimum(buffer.len, @minimum(temporary.len, this.remain))];
+ if (temporary.len == 0) {
+ this.store.deref();
+ this.done = true;
+ return .{ .done = {} };
+ }
+
+ const copied = @intCast(Blob.SizeType, temporary.len);
+
+ this.remain -|= copied;
+ this.offset +|= copied;
+ @memcpy(buffer.ptr, temporary.ptr, temporary.len);
+ if (this.remain == 0) {
+ return .{ .into_array_and_done = .{ .value = array, .len = copied } };
+ }
+
+ return .{ .into_array = .{ .value = array, .len = copied } };
+ }
+
+ pub fn onCancel(_: *ByteBlobLoader) void {}
+
+ pub fn deinit(this: *ByteBlobLoader) void {
+ if (!this.done) {
+ this.done = true;
+ this.store.deref();
+ }
+
+ bun.default_allocator.destroy(this);
+ }
+
+ pub const Source = ReadableStreamSource(@This(), "ByteBlob", onStart, onPull, onCancel, deinit);
+};
+
+pub fn RequestBodyStreamer(
+ comptime is_ssl: bool,
+) type {
+ return struct {
+ response: *uws.NewApp(is_ssl).Response,
+
+ pub const tag = if (is_ssl)
+ ReadableStream.Tag.HTTPRequest
+ else if (is_ssl)
+ ReadableStream.Tag.HTTPSRequest;
+
+ pub fn setup(
+ this: *ByteBlobLoader,
+ blob: *const Blob,
+ user_chunk_size: Blob.SizeType,
+ ) void {
+ blob.store.?.ref();
+ var blobe = blob.*;
+ blobe.resolveSize();
+ this.* = ByteBlobLoader{
+ .offset = blobe.offset,
+ .store = blobe.store.?,
+ .chunk_size = if (user_chunk_size > 0) @minimum(user_chunk_size, blobe.size) else @minimum(1024 * 1024 * 2, blobe.size),
+ .remain = blobe.size,
+ .done = false,
+ };
+ }
+
+ pub fn onStart(this: *ByteBlobLoader) StreamStart {
+ return .{ .chunk_size = this.chunk_size };
+ }
+
+ pub fn onPull(this: *ByteBlobLoader, buffer: []u8, array: JSC.JSValue) StreamResult {
+ array.ensureStillAlive();
+ defer array.ensureStillAlive();
+ if (this.done) {
+ return .{ .done = {} };
+ }
+
+ var temporary = this.store.sharedView();
+ temporary = temporary[this.offset..];
+
+ temporary = temporary[0..@minimum(buffer.len, @minimum(temporary.len, this.remain))];
+ if (temporary.len == 0) {
+ this.store.deref();
+ this.done = true;
+ return .{ .done = {} };
+ }
+
+ const copied = @intCast(Blob.SizeType, temporary.len);
+
+ this.remain -|= copied;
+ this.offset +|= copied;
+ @memcpy(buffer.ptr, temporary.ptr, temporary.len);
+ if (this.remain == 0) {
+ return .{ .into_array_and_done = .{ .value = array, .len = copied } };
+ }
+
+ return .{ .into_array = .{ .value = array, .len = copied } };
+ }
+
+ pub fn onCancel(_: *ByteBlobLoader) void {}
+
+ pub fn deinit(this: *ByteBlobLoader) void {
+ if (!this.done) {
+ this.done = true;
+ this.store.deref();
+ }
+
+ bun.default_allocator.destroy(this);
+ }
+
+ pub const label = if (is_ssl) "HTTPSRequestBodyStreamer" else "HTTPRequestBodyStreamer";
+ pub const Source = ReadableStreamSource(@This(), label, onStart, onPull, onCancel, deinit);
+ };
+}
+
+pub const FileBlobLoader = struct {
+ buf: []u8 = &[_]u8{},
+ protected_view: JSC.JSValue = JSC.JSValue.zero,
+ fd: JSC.Node.FileDescriptor = 0,
+ auto_close: bool = false,
+ loop: *JSC.EventLoop = undefined,
+ mode: JSC.Node.Mode = 0,
+ store: *Blob.Store,
+ total_read: Blob.SizeType = 0,
+ finalized: bool = false,
+ callback: anyframe = undefined,
+ pending: StreamResult.Pending = StreamResult.Pending{
+ .frame = undefined,
+ .used = false,
+ .result = .{ .done = {} },
+ },
+ cancelled: bool = false,
+ user_chunk_size: Blob.SizeType = 0,
+ scheduled_count: u32 = 0,
+ concurrent: Concurrent = Concurrent{},
+
+ const FileReader = @This();
+
+ const run_on_different_thread_size = bun.huge_allocator_threshold;
+
+ pub const tag = ReadableStream.Tag.File;
+
+ pub fn setup(this: *FileBlobLoader, store: *Blob.Store, chunk_size: Blob.SizeType) void {
+ store.ref();
+ this.* = .{
+ .loop = JSC.VirtualMachine.vm.eventLoop(),
+ .auto_close = store.data.file.pathlike == .path,
+ .store = store,
+ .user_chunk_size = chunk_size,
+ };
+ }
+
+ pub fn watch(this: *FileReader) void {
+ _ = JSC.VirtualMachine.vm.poller.watch(this.fd, .read, this, callback);
+ this.scheduled_count += 1;
+ }
+
+ const Concurrent = struct {
+ read: Blob.SizeType = 0,
+ task: NetworkThread.Task = .{ .callback = Concurrent.taskCallback },
+ completion: AsyncIO.Completion = undefined,
+ read_frame: anyframe = undefined,
+ chunk_size: Blob.SizeType = 0,
+ main_thread_task: JSC.AnyTask = .{ .callback = onJSThread, .ctx = null },
+
+ pub fn taskCallback(task: *NetworkThread.Task) void {
+ var this = @fieldParentPtr(FileBlobLoader, "concurrent", @fieldParentPtr(Concurrent, "task", task));
+ var frame = HTTPClient.getAllocator().create(@Frame(runAsync)) catch unreachable;
+ _ = @asyncCall(std.mem.asBytes(frame), undefined, runAsync, .{this});
+ }
+
+ pub fn onRead(this: *FileBlobLoader, completion: *HTTPClient.NetworkThread.Completion, result: AsyncIO.ReadError!usize) void {
+ this.concurrent.read = @truncate(Blob.SizeType, result catch |err| {
+ if (@hasField(HTTPClient.NetworkThread.Completion, "result")) {
+ this.pending.result = .{
+ .err = JSC.Node.Syscall.Error{
+ .errno = @intCast(JSC.Node.Syscall.Error.Int, -completion.result),
+ .syscall = .read,
+ },
+ };
+ } else {
+ this.pending.result = .{
+ .err = JSC.Node.Syscall.Error{
+ // this is too hacky
+ .errno = @truncate(JSC.Node.Syscall.Error.Int, @intCast(u16, @maximum(1, @errorToInt(err)))),
+ .syscall = .read,
+ },
+ };
+ }
+ this.concurrent.read = 0;
+ resume this.concurrent.read_frame;
+ return;
+ });
+
+ resume this.concurrent.read_frame;
+ }
+
+ pub fn scheduleRead(this: *FileBlobLoader) void {
+ if (comptime Environment.isMac) {
+ var remaining = this.buf[this.concurrent.read..];
+
+ while (remaining.len > 0) {
+ const to_read = @minimum(@as(usize, this.concurrent.chunk_size), remaining.len);
+ switch (JSC.Node.Syscall.read(this.fd, remaining[0..to_read])) {
+ .err => |err| {
+ const retry = comptime if (Environment.isLinux)
+ std.os.E.WOULDBLOCK
+ else
+ std.os.E.AGAIN;
+
+ switch (err.getErrno()) {
+ retry => break,
+ else => {},
+ }
+
+ this.pending.result = .{ .err = err };
+ scheduleMainThreadTask(this);
+ return;
+ },
+ .result => |result| {
+ this.concurrent.read += @intCast(Blob.SizeType, result);
+ remaining = remaining[result..];
+
+ if (result == 0) {
+ remaining.len = 0;
+ break;
+ }
+ },
+ }
+ }
+
+ if (remaining.len == 0) {
+ scheduleMainThreadTask(this);
+ return;
+ }
+ }
+
+ AsyncIO.global.read(
+ *FileBlobLoader,
+ this,
+ onRead,
+ &this.concurrent.completion,
+ this.fd,
+ this.buf[this.concurrent.read..],
+ null,
+ );
+
+ suspend {
+ var _frame = @frame();
+ var this_frame = HTTPClient.getAllocator().create(std.meta.Child(@TypeOf(_frame))) catch unreachable;
+ this_frame.* = _frame.*;
+ this.concurrent.read_frame = this_frame;
+ }
+
+ scheduleMainThreadTask(this);
+ }
+
+ pub fn onJSThread(task_ctx: *anyopaque) void {
+ var this: *FileBlobLoader = bun.cast(*FileBlobLoader, task_ctx);
+ const protected_view = this.protected_view;
+ defer protected_view.unprotect();
+ this.protected_view = JSC.JSValue.zero;
+
+ if (this.finalized and this.scheduled_count == 0) {
+ if (!this.pending.used) {
+ resume this.pending.frame;
+ }
+ this.scheduled_count -= 1;
+
+ this.deinit();
+
+ return;
+ }
+
+ if (!this.pending.used and this.pending.result == .err and this.concurrent.read == 0) {
+ resume this.pending.frame;
+ this.scheduled_count -= 1;
+ this.finalize();
+ return;
+ }
+
+ if (this.concurrent.read == 0) {
+ this.pending.result = .{ .done = {} };
+ resume this.pending.frame;
+ this.scheduled_count -= 1;
+ this.finalize();
+ return;
+ }
+
+ this.pending.result = this.handleReadChunk(@as(usize, this.concurrent.read));
+ resume this.pending.frame;
+ this.scheduled_count -= 1;
+ if (this.pending.result.isDone()) {
+ this.finalize();
+ }
+ }
+
+ pub fn scheduleMainThreadTask(this: *FileBlobLoader) void {
+ this.concurrent.main_thread_task.ctx = this;
+ this.loop.enqueueTaskConcurrent(JSC.Task.init(&this.concurrent.main_thread_task));
+ }
+
+ fn runAsync(this: *FileBlobLoader) void {
+ this.concurrent.read = 0;
+
+ Concurrent.scheduleRead(this);
+
+ suspend {
+ HTTPClient.getAllocator().destroy(@frame());
+ }
+ }
+ };
+
+ pub fn scheduleAsync(this: *FileReader, chunk_size: Blob.SizeType) void {
+ this.scheduled_count += 1;
+ this.loop.virtual_machine.active_tasks +|= 1;
+
+ NetworkThread.init() catch {};
+ this.concurrent.chunk_size = chunk_size;
+ NetworkThread.global.pool.schedule(.{ .head = &this.concurrent.task, .tail = &this.concurrent.task, .len = 1 });
+ }
+
+ const default_fifo_chunk_size = 1024;
+ const default_file_chunk_size = 1024 * 1024 * 2;
+ pub fn onStart(this: *FileBlobLoader) StreamStart {
+ var file = &this.store.data.file;
+ var file_buf: [std.fs.MAX_PATH_BYTES]u8 = undefined;
+ var auto_close = this.auto_close;
+ defer this.auto_close = auto_close;
+ var fd = if (!auto_close)
+ file.pathlike.fd
+ else switch (JSC.Node.Syscall.open(file.pathlike.path.sliceZ(&file_buf), std.os.O.RDONLY | std.os.O.NONBLOCK | std.os.O.CLOEXEC, 0)) {
+ .result => |_fd| _fd,
+ .err => |err| {
+ this.deinit();
+ return .{ .err = err.withPath(file.pathlike.path.slice()) };
+ },
+ };
+
+ if (!auto_close) {
+ // ensure we have non-blocking IO set
+ const flags = std.os.fcntl(fd, std.os.F.GETFL, 0) catch return .{ .err = JSC.Node.Syscall.Error.fromCode(std.os.E.BADF, .fcntl) };
+
+ // if we do not, clone the descriptor and set non-blocking
+ // it is important for us to clone it so we don't cause Weird Things to happen
+ if ((flags & std.os.O.NONBLOCK) == 0) {
+ auto_close = true;
+ fd = @intCast(@TypeOf(fd), std.os.fcntl(fd, std.os.F.DUPFD, 0) catch return .{ .err = JSC.Node.Syscall.Error.fromCode(std.os.E.BADF, .fcntl) });
+ _ = std.os.fcntl(fd, std.os.F.SETFL, flags | std.os.O.NONBLOCK) catch return .{ .err = JSC.Node.Syscall.Error.fromCode(std.os.E.BADF, .fcntl) };
+ }
+ }
+
+ const stat: std.os.Stat = switch (JSC.Node.Syscall.fstat(fd)) {
+ .result => |result| result,
+ .err => |err| {
+ if (auto_close) {
+ _ = JSC.Node.Syscall.close(fd);
+ }
+ this.deinit();
+ return .{ .err = err.withPath(file.pathlike.path.slice()) };
+ },
+ };
+
+ if (std.os.S.ISDIR(stat.mode)) {
+ const err = JSC.Node.Syscall.Error.fromCode(.ISDIR, .fstat);
+ if (auto_close) {
+ _ = JSC.Node.Syscall.close(fd);
+ }
+ this.deinit();
+ return .{ .err = err };
+ }
+
+ if (std.os.S.ISSOCK(stat.mode)) {
+ const err = JSC.Node.Syscall.Error.fromCode(.INVAL, .fstat);
+
+ if (auto_close) {
+ _ = JSC.Node.Syscall.close(fd);
+ }
+ this.deinit();
+ return .{ .err = err };
+ }
+
+ file.seekable = std.os.S.ISREG(stat.mode);
+ file.mode = @intCast(JSC.Node.Mode, stat.mode);
+ this.mode = file.mode;
+
+ if (file.seekable orelse false)
+ file.max_size = @intCast(Blob.SizeType, stat.size);
+
+ if ((file.seekable orelse false) and file.max_size == 0) {
+ if (auto_close) {
+ _ = JSC.Node.Syscall.close(fd);
+ }
+ this.deinit();
+ return .{ .empty = {} };
+ }
+
+ this.fd = fd;
+ this.auto_close = auto_close;
+
+ const chunk_size = this.calculateChunkSize(std.math.maxInt(usize));
+ return .{ .chunk_size = @truncate(Blob.SizeType, chunk_size) };
+ }
+
+ fn calculateChunkSize(this: *FileBlobLoader, available_to_read: usize) usize {
+ const file = &this.store.data.file;
+
+ const chunk_size: usize = if (this.user_chunk_size > 0)
+ @as(usize, this.user_chunk_size)
+ else if (file.seekable orelse false)
+ @as(usize, default_file_chunk_size)
+ else
+ @as(usize, default_fifo_chunk_size);
+
+ return if (file.max_size > 0)
+ if (available_to_read != std.math.maxInt(usize)) @minimum(chunk_size, available_to_read) else @minimum(@maximum(this.total_read, file.max_size) - this.total_read, chunk_size)
+ else
+ @minimum(available_to_read, chunk_size);
+ }
+
+ pub fn onPull(this: *FileBlobLoader, buffer: []u8, view: JSC.JSValue) StreamResult {
+ const chunk_size = this.calculateChunkSize(std.math.maxInt(usize));
+
+ switch (chunk_size) {
+ 0 => {
+ std.debug.assert(this.store.data.file.seekable orelse false);
+ this.finalize();
+ return .{ .done = {} };
+ },
+ run_on_different_thread_size...std.math.maxInt(@TypeOf(chunk_size)) => {
+ this.protected_view = view;
+ this.protected_view.protect();
+ // should never be reached
+ this.pending.result = .{
+ .err = JSC.Node.Syscall.Error.todo,
+ };
+ this.buf = buffer;
+
+ this.scheduleAsync(@truncate(Blob.SizeType, chunk_size));
+
+ return .{ .pending = &this.pending };
+ },
+ else => {},
+ }
+
+ return this.read(buffer, view);
+ }
+
+ fn maybeAutoClose(this: *FileBlobLoader) void {
+ if (this.auto_close) {
+ _ = JSC.Node.Syscall.close(this.fd);
+ this.auto_close = false;
+ }
+ }
+
+ fn handleReadChunk(this: *FileBlobLoader, result: usize) StreamResult {
+ this.total_read += @intCast(Blob.SizeType, result);
+ const remaining: Blob.SizeType = if (this.store.data.file.seekable orelse false)
+ this.store.data.file.max_size -| this.total_read
+ else
+ @as(Blob.SizeType, std.math.maxInt(Blob.SizeType));
+
+ // this handles:
+ // - empty file
+ // - stream closed for some reason
+ if ((result == 0 and remaining == 0)) {
+ this.finalize();
+ return .{ .done = {} };
+ }
+
+ const has_more = remaining > 0;
+
+ if (!has_more) {
+ return .{ .into_array_and_done = .{ .len = @truncate(Blob.SizeType, result) } };
+ }
+
+ return .{ .into_array = .{ .len = @truncate(Blob.SizeType, result) } };
+ }
+
+ pub fn read(
+ this: *FileBlobLoader,
+ read_buf: []u8,
+ view: JSC.JSValue,
+ ) StreamResult {
+ const rc =
+ JSC.Node.Syscall.read(this.fd, read_buf);
+
+ switch (rc) {
+ .err => |err| {
+ const retry = comptime if (Environment.isLinux)
+ std.os.E.WOULDBLOCK
+ else
+ std.os.E.AGAIN;
+
+ switch (err.getErrno()) {
+ retry => {
+ this.protected_view = view;
+ this.protected_view.protect();
+ this.buf = read_buf;
+ this.watch();
+ return .{
+ .pending = &this.pending,
+ };
+ },
+ else => {},
+ }
+ const sys = if (this.store.data.file.pathlike == .path and this.store.data.file.pathlike.path.slice().len > 0)
+ err.withPath(this.store.data.file.pathlike.path.slice())
+ else
+ err;
+
+ this.finalize();
+ return .{ .err = sys };
+ },
+ .result => |result| {
+ return this.handleReadChunk(result);
+ },
+ }
+ }
+
+ pub fn callback(task: ?*anyopaque, sizeOrOffset: i64, _: u16) void {
+ var this: *FileReader = bun.cast(*FileReader, task.?);
+ this.scheduled_count -= 1;
+ const protected_view = this.protected_view;
+ defer protected_view.unprotect();
+ this.protected_view = JSValue.zero;
+
+ var available_to_read: usize = std.math.maxInt(usize);
+ if (comptime Environment.isMac) {
+ if (std.os.S.ISREG(this.mode)) {
+ // Returns when the file pointer is not at the end of
+ // file. data contains the offset from current position
+ // to end of file, and may be negative.
+ available_to_read = @intCast(usize, @maximum(sizeOrOffset, 0));
+ } else if (std.os.S.ISCHR(this.mode) or std.os.S.ISFIFO(this.mode)) {
+ available_to_read = @intCast(usize, @maximum(sizeOrOffset, 0));
+ }
+ }
+ if (this.finalized and this.scheduled_count == 0) {
+ if (!this.pending.used) {
+ // should never be reached
+ this.pending.result = .{
+ .err = JSC.Node.Syscall.Error.todo,
+ };
+ resume this.pending.frame;
+ }
+ this.deinit();
+ return;
+ }
+ if (this.cancelled)
+ return;
+
+ if (this.buf.len == 0) {
+ return;
+ } else {
+ this.buf.len = @minimum(this.buf.len, available_to_read);
+ }
+
+ this.pending.result = this.read(this.buf, this.protected_view);
+ resume this.pending.frame;
+ }
+
+ pub fn finalize(this: *FileBlobLoader) void {
+ if (this.finalized)
+ return;
+ this.finalized = true;
+
+ this.maybeAutoClose();
+
+ this.store.deref();
+ }
+
+ pub fn onCancel(this: *FileBlobLoader) void {
+ this.cancelled = true;
+
+ this.deinit();
+ }
+
+ pub fn deinit(this: *FileBlobLoader) void {
+ this.finalize();
+ if (this.scheduled_count == 0 and !this.pending.used) {
+ this.destroy();
+ }
+ }
+
+ pub fn destroy(this: *FileBlobLoader) void {
+ bun.default_allocator.destroy(this);
+ }
+
+ pub const Source = ReadableStreamSource(@This(), "FileBlobLoader", onStart, onPull, onCancel, deinit);
+};
+
+pub const StreamSource = struct {
+ ptr: ?*anyopaque = null,
+ vtable: VTable,
+
+ pub const VTable = struct {
+ onStart: fn (this: StreamSource) JSC.WebCore.StreamStart,
+ onPull: fn (this: StreamSource) JSC.WebCore.StreamResult,
+ onError: fn (this: StreamSource) void,
+ };
+};