diff options
Diffstat (limited to 'src/bun.js/webcore')
-rw-r--r-- | src/bun.js/webcore/response.classes.ts | 28 | ||||
-rw-r--r-- | src/bun.js/webcore/response.zig | 346 | ||||
-rw-r--r-- | src/bun.js/webcore/streams.zig | 443 |
3 files changed, 632 insertions, 185 deletions
diff --git a/src/bun.js/webcore/response.classes.ts b/src/bun.js/webcore/response.classes.ts index 45df255e1..3cf695f74 100644 --- a/src/bun.js/webcore/response.classes.ts +++ b/src/bun.js/webcore/response.classes.ts @@ -104,4 +104,32 @@ export default [ }, }, }), + define({ + name: "Blob", + construct: true, + finalize: true, + JSType: "0b11101110", + klass: {}, + proto: { + text: { fn: "getText" }, + json: { fn: "getJSON" }, + arrayBuffer: { fn: "getArrayBuffer" }, + slice: { fn: "getSlice", length: 2 }, + stream: { fn: "getStream", length: 1 }, + + type: { + getter: "getType", + setter: "setType", + }, + + size: { + getter: "getSize", + }, + + writer: { + fn: "getWriter", + length: 1, + }, + }, + }), ]; diff --git a/src/bun.js/webcore/response.zig b/src/bun.js/webcore/response.zig index 1028f4aa9..428ef6ab4 100644 --- a/src/bun.js/webcore/response.zig +++ b/src/bun.js/webcore/response.zig @@ -770,7 +770,7 @@ pub const Fetch = struct { } if (options.fastGet(ctx.ptr(), .body)) |body__| { - if (Blob.fromJS(ctx.ptr(), body__, true, false)) |new_blob| { + if (Blob.get(ctx.ptr(), body__, true, false)) |new_blob| { body = new_blob; } else |_| { return JSPromise.rejectedPromiseValue(globalThis, ZigString.init("fetch() received invalid body").toErrorInstance(globalThis)).asRef(); @@ -905,6 +905,8 @@ const PathOrBlob = union(enum) { }; pub const Blob = struct { + pub usingnamespace JSC.Codegen.JSBlob; + size: SizeType = 0, offset: SizeType = 0, /// When set, the blob will be freed on finalization callbacks @@ -1169,13 +1171,16 @@ pub const Blob = struct { clone.allocator = bun.default_allocator; var cloned = bun.default_allocator.create(Blob) catch unreachable; cloned.* = clone; - return JSPromise.resolvedPromiseValue(ctx.ptr(), JSC.JSValue.fromRef(Blob.Class.make(ctx, cloned))).asObjectRef(); + return JSPromise.resolvedPromiseValue(ctx.ptr(), cloned.toJS(ctx)).asObjectRef(); } else if (destination_type == .bytes and source_type == .file) { + var fake_call_frame: [8]JSC.JSValue = undefined; + @memset(@ptrCast([*]u8, &fake_call_frame), 0, @sizeOf(@TypeOf(fake_call_frame))); + const blob_value = + source_blob.getSlice(ctx, @ptrCast(*JSC.CallFrame, &fake_call_frame)); + return JSPromise.resolvedPromiseValue( ctx.ptr(), - JSC.JSValue.fromRef( - source_blob.getSlice(ctx, undefined, undefined, &.{}, null), - ), + blob_value, ).asObjectRef(); } @@ -1320,7 +1325,7 @@ pub const Blob = struct { } } - break :brk Blob.fromJS( + break :brk Blob.get( ctx.ptr(), data, false, @@ -1534,7 +1539,7 @@ pub const Blob = struct { var ptr = bun.default_allocator.create(Blob) catch unreachable; ptr.* = blob; ptr.allocator = bun.default_allocator; - return Blob.Class.make(ctx, ptr); + return ptr.toJS(ctx).asObjectRef(); } pub fn findOrCreateFileFromPath(path_: JSC.Node.PathOrFileDescriptor, globalThis: *JSGlobalObject) Blob { @@ -2831,62 +2836,27 @@ pub const Blob = struct { } }; - pub const Constructor = JSC.NewConstructor( - Blob, - .{ - .constructor = .{ .rfn = constructor }, - }, - .{}, - ); - - pub const Class = NewClass( - Blob, - .{ .name = "Blob" }, - .{ .finalize = finalize, .text = .{ - .rfn = getText_c, - }, .json = .{ - .rfn = getJSON_c, - }, .arrayBuffer = .{ - .rfn = getArrayBuffer_c, - }, .slice = .{ - .rfn = getSlice, - }, .stream = .{ - .rfn = getStream, - } }, - .{ - .@"type" = .{ - .get = getType, - .set = setType, - }, - .@"size" = .{ - .get = getSize, - .ro = true, - }, - }, - ); - pub fn getStream( this: *Blob, - ctx: js.JSContextRef, - _: js.JSObjectRef, - _: js.JSObjectRef, - arguments: []const js.JSValueRef, - exception: js.ExceptionRef, - ) JSC.C.JSValueRef { + globalThis: *JSC.JSGlobalObject, + callframe: *JSC.CallFrame, + ) callconv(.C) JSC.JSValue { var recommended_chunk_size: SizeType = 0; + var arguments_ = callframe.arguments(2); + var arguments = arguments_.ptr[0..arguments_.len]; if (arguments.len > 0) { - if (!JSValue.c(arguments[0]).isNumber() and !JSValue.c(arguments[0]).isUndefinedOrNull()) { - JSC.throwInvalidArguments("chunkSize must be a number", .{}, ctx, exception); - return null; + if (!arguments[0].isNumber() and !arguments[0].isUndefinedOrNull()) { + globalThis.throwInvalidArguments("chunkSize must be a number", .{}); + return JSValue.jsUndefined(); } - recommended_chunk_size = @intCast(SizeType, @maximum(0, @truncate(i52, JSValue.c(arguments[0]).toInt64()))); + recommended_chunk_size = @intCast(SizeType, @maximum(0, @truncate(i52, arguments[0].toInt64()))); } return JSC.WebCore.ReadableStream.fromBlob( - ctx.ptr(), + globalThis, this, recommended_chunk_size, - ).asObjectRef(); + ); } fn promisified( @@ -2909,7 +2879,8 @@ pub const Blob = struct { pub fn getText( this: *Blob, globalThis: *JSC.JSGlobalObject, - ) JSC.JSValue { + _: *JSC.CallFrame, + ) callconv(.C) JSC.JSValue { return promisified(this.toString(globalThis, .clone), globalThis); } @@ -2990,6 +2961,66 @@ pub const Blob = struct { return promisified(this.toArrayBuffer(ctx.ptr(), .clone), ctx.ptr()).asObjectRef(); } + pub fn getWriter( + this: *Blob, + globalThis: *JSC.JSGlobalObject, + callframe: *JSC.CallFrame, + ) callconv(.C) JSC.JSValue { + var arguments_ = callframe.arguments(1); + var arguments = arguments_.ptr[0..arguments_.len]; + + var store = this.store orelse { + globalThis.throwInvalidArguments("Blob is detached", .{}); + return JSValue.jsUndefined(); + }; + + if (store.data != .file) { + globalThis.throwInvalidArguments("Blob is read-only", .{}); + return JSValue.jsUndefined(); + } + + var sink = JSC.WebCore.FileSink.init(globalThis.allocator(), null) catch |err| { + globalThis.throwInvalidArguments("Failed to create FileSink: {s}", .{@errorName(err)}); + return JSValue.jsUndefined(); + }; + + var input_path: JSC.WebCore.PathOrFileDescriptor = undefined; + if (store.data.file.pathlike == .fd) { + input_path = .{ .fd = store.data.file.pathlike.fd }; + } else { + input_path = .{ + .path = ZigString.Slice{ + .ptr = store.data.file.pathlike.path.slice().ptr, + .len = @truncate(u32, store.data.file.pathlike.path.slice().len), + .allocated = false, + .allocator = bun.default_allocator, + }, + }; + } + + var stream_start: JSC.WebCore.StreamStart = .{ + .FileSink = .{ + .input_path = input_path, + }, + }; + + if (arguments.len > 0) { + stream_start = JSC.WebCore.StreamStart.fromJSWithTag(globalThis, arguments[0], .FileSink); + stream_start.FileSink.input_path = input_path; + } + + switch (sink.start(stream_start)) { + .err => |err| { + globalThis.vm().throwError(globalThis, err.toJSC(globalThis)); + sink.finalize(); + return JSC.JSValue.jsUndefined(); + }, + else => {}, + } + + return sink.toJS(globalThis); + } + /// https://w3c.github.io/FileAPI/#slice-method-algo /// The slice() method returns a new Blob object with bytes ranging from the /// optional start parameter up to but not including the optional end @@ -2997,22 +3028,30 @@ pub const Blob = struct { /// contentType parameter. It must act as follows: pub fn getSlice( this: *Blob, - ctx: js.JSContextRef, - _: js.JSObjectRef, - _: js.JSObjectRef, - args: []const js.JSValueRef, - exception: js.ExceptionRef, - ) JSC.C.JSObjectRef { + globalThis: *JSC.JSGlobalObject, + callframe: *JSC.CallFrame, + ) callconv(.C) JSC.JSValue { + var allocator = globalThis.allocator(); + var arguments_ = callframe.arguments(2); + var args = arguments_.ptr[0..arguments_.len]; + if (this.size == 0) { - return constructor(ctx, null, &[_]js.JSValueRef{}, exception); + const empty = Blob.initEmpty(globalThis); + var ptr = allocator.create(Blob) catch { + return JSC.JSValue.jsUndefined(); + }; + ptr.* = empty; + ptr.allocator = allocator; + return ptr.toJS(globalThis); } + // If the optional start parameter is not used as a parameter when making this call, let relativeStart be 0. var relativeStart: i64 = 0; // If the optional end parameter is not used as a parameter when making this call, let relativeEnd be size. var relativeEnd: i64 = @intCast(i64, this.size); - var args_iter = JSC.Node.ArgumentsSlice.from(ctx.bunVM(), args); + var args_iter = JSC.Node.ArgumentsSlice.init(globalThis.bunVM(), args); if (args_iter.nextEat()) |start_| { const start = start_.toInt64(); if (start < 0) { @@ -3039,11 +3078,11 @@ pub const Blob = struct { var content_type: string = ""; if (args_iter.nextEat()) |content_type_| { if (content_type_.isString()) { - var zig_str = content_type_.getZigString(ctx.ptr()); + var zig_str = content_type_.getZigString(globalThis); var slicer = zig_str.toSlice(bun.default_allocator); defer slicer.deinit(); var slice = slicer.slice(); - var content_type_buf = getAllocator(ctx).alloc(u8, slice.len) catch unreachable; + var content_type_buf = allocator.alloc(u8, slice.len) catch unreachable; content_type = strings.copyLowercase(slice, content_type_buf); } } @@ -3058,31 +3097,25 @@ pub const Blob = struct { blob.content_type = content_type; blob.content_type_allocated = content_type.len > 0; - var blob_ = getAllocator(ctx).create(Blob) catch unreachable; + var blob_ = allocator.create(Blob) catch unreachable; blob_.* = blob; - blob_.allocator = getAllocator(ctx); - return Blob.Class.make(ctx, blob_); + blob_.allocator = allocator; + return blob_.toJS(globalThis); } pub fn getType( this: *Blob, - ctx: js.JSContextRef, - _: js.JSObjectRef, - _: js.JSStringRef, - _: js.ExceptionRef, - ) js.JSValueRef { - return ZigString.init(this.content_type).toValue(ctx.ptr()).asObjectRef(); + globalThis: *JSC.JSGlobalObject, + ) callconv(.C) JSValue { + return ZigString.init(this.content_type).toValue(globalThis); } pub fn setType( this: *Blob, - ctx: js.JSContextRef, - _: js.JSObjectRef, - _: js.JSStringRef, - value: js.JSValueRef, - _: js.ExceptionRef, - ) bool { - var zig_str = JSValue.fromRef(value).getZigString(ctx.ptr()); + globalThis: *JSC.JSGlobalObject, + value: JSC.JSValue, + ) callconv(.C) bool { + var zig_str = value.getZigString(globalThis); if (zig_str.is16Bit()) return false; @@ -3093,7 +3126,7 @@ pub const Blob = struct { const prev_content_type = this.content_type; { defer if (this.content_type_allocated) bun.default_allocator.free(prev_content_type); - var content_type_buf = getAllocator(ctx).alloc(u8, slice.len) catch unreachable; + var content_type_buf = globalThis.allocator().alloc(u8, slice.len) catch unreachable; this.content_type = strings.copyLowercase(slice, content_type_buf); } @@ -3101,25 +3134,19 @@ pub const Blob = struct { return true; } - pub fn getSize( - this: *Blob, - _: js.JSContextRef, - _: js.JSObjectRef, - _: js.JSStringRef, - _: js.ExceptionRef, - ) js.JSValueRef { + pub fn getSize(this: *Blob, _: *JSC.JSGlobalObject) callconv(.C) JSValue { if (this.size == Blob.max_size) { this.resolveSize(); if (this.size == Blob.max_size and this.store != null) { - return JSValue.jsNumberFromChar(0).asRef(); + return JSValue.jsNumberFromChar(0); } } if (this.size < std.math.maxInt(i32)) { - return JSValue.jsNumber(this.size).asRef(); + return JSValue.jsNumber(this.size); } - return JSC.JSValue.jsNumberFromUint64(this.size).asRef(); + return JSC.JSValue.jsNumberFromUint64(this.size); } pub fn resolveSize(this: *Blob) void { @@ -3138,40 +3165,42 @@ pub const Blob = struct { } pub fn constructor( - ctx: js.JSContextRef, - _: js.JSObjectRef, - args: []const js.JSValueRef, - exception: js.ExceptionRef, - ) js.JSObjectRef { + globalThis: *JSC.JSGlobalObject, + callframe: *JSC.CallFrame, + ) callconv(.C) ?*Blob { + var allocator = globalThis.allocator(); var blob: Blob = undefined; + var arguments = callframe.arguments(2); + var args = arguments.ptr[0..arguments.len]; + switch (args.len) { 0 => { var empty: []u8 = &[_]u8{}; - blob = Blob.init(empty, getAllocator(ctx), ctx.ptr()); + blob = Blob.init(empty, allocator, globalThis); }, else => { - blob = fromJS(ctx.ptr(), JSValue.fromRef(args[0]), false, true) catch |err| { + blob = get(globalThis, args[0], false, true) catch |err| { if (err == error.InvalidArguments) { - JSC.JSError(getAllocator(ctx), "new Blob() expects an Array", .{}, ctx, exception); + globalThis.throwInvalidArguments("new Blob() expects an Array", .{}); return null; } - JSC.JSError(getAllocator(ctx), "out of memory :(", .{}, ctx, exception); + globalThis.throw("out of memory", .{}); return null; }; if (args.len > 1) { - var options = JSValue.fromRef(args[1]); + var options = args[0]; if (options.isCell()) { // type, the ASCII-encoded string in lower case // representing the media type of the Blob. // Normative conditions for this member are provided // in the § 3.1 Constructors. - if (options.get(ctx.ptr(), "type")) |content_type| { + if (options.get(globalThis, "type")) |content_type| { if (content_type.isString()) { - var content_type_str = content_type.getZigString(ctx.ptr()); + var content_type_str = content_type.getZigString(globalThis); if (!content_type_str.is16Bit()) { var slice = content_type_str.trimmedSlice(); - var content_type_buf = getAllocator(ctx).alloc(u8, slice.len) catch unreachable; + var content_type_buf = allocator.alloc(u8, slice.len) catch unreachable; blob.content_type = strings.copyLowercase(slice, content_type_buf); blob.content_type_allocated = true; } @@ -3186,13 +3215,13 @@ pub const Blob = struct { }, } - var blob_ = getAllocator(ctx).create(Blob) catch unreachable; + var blob_ = allocator.create(Blob) catch unreachable; blob_.* = blob; - blob_.allocator = getAllocator(ctx); - return Blob.Class.make(ctx, blob_); + blob_.allocator = allocator; + return blob_; } - pub fn finalize(this: *Blob) void { + pub fn finalize(this: *Blob) callconv(.C) void { this.deinit(); } @@ -3566,7 +3595,7 @@ pub const Blob = struct { return toArrayBufferWithBytes(this, global, bun.constStrToU8(view_), lifetime); } - pub inline fn fromJS( + pub inline fn get( global: *JSGlobalObject, arg: JSValue, comptime move: bool, @@ -3676,28 +3705,20 @@ pub const Blob = struct { return Blob.init(buf, bun.default_allocator, global); }, - else => |tag| { - if (tag != .DOMWrapper) { - if (JSC.C.JSObjectGetPrivate(top_value.asObjectRef())) |priv| { - var data = JSC.JSPrivateDataPtr.from(priv); - switch (data.tag()) { - .Blob => { - var blob: *Blob = data.as(Blob); - if (comptime move) { - var _blob = blob.*; - _blob.allocator = null; - blob.transfer(); - return _blob; - } else { - return blob.dupe(); - } - }, - - else => return Blob.initEmpty(global), - } + .DOMWrapper => { + if (top_value.as(Blob)) |blob| { + if (comptime move) { + var _blob = blob.*; + _blob.allocator = null; + blob.transfer(); + return _blob; + } else { + return blob.dupe(); } } }, + + else => {}, } } @@ -3778,20 +3799,15 @@ pub const Blob = struct { could_have_non_ascii = true; break; }, - else => { - if (JSC.C.JSObjectGetPrivate(item.asObjectRef())) |priv| { - var data = JSC.JSPrivateDataPtr.from(priv); - switch (data.tag()) { - .Blob => { - var blob: *Blob = data.as(Blob); - could_have_non_ascii = could_have_non_ascii or !(blob.is_all_ascii orelse false); - joiner.append(blob.sharedView(), 0, null); - continue; - }, - else => {}, - } + + .DOMWrapper => { + if (item.as(Blob)) |blob| { + could_have_non_ascii = could_have_non_ascii or !(blob.is_all_ascii orelse false); + joiner.append(blob.sharedView(), 0, null); + continue; } }, + else => {}, } } @@ -3799,7 +3815,12 @@ pub const Blob = struct { } }, - .DOMWrapper => {}, + .DOMWrapper => { + if (current.as(Blob)) |blob| { + could_have_non_ascii = could_have_non_ascii or !(blob.is_all_ascii orelse false); + joiner.append(blob.sharedView(), 0, null); + } + }, JSC.JSValue.JSType.ArrayBuffer, JSC.JSValue.JSType.Int8Array, @@ -3821,28 +3842,13 @@ pub const Blob = struct { }, else => { - outer: { - if (JSC.C.JSObjectGetPrivate(current.asObjectRef())) |priv| { - var data = JSC.JSPrivateDataPtr.from(priv); - switch (data.tag()) { - .Blob => { - var blob: *Blob = data.as(Blob); - could_have_non_ascii = could_have_non_ascii or !(blob.is_all_ascii orelse false); - joiner.append(blob.sharedView(), 0, null); - break :outer; - }, - else => {}, - } - } - - var sliced = current.toSlice(global, bun.default_allocator); - could_have_non_ascii = could_have_non_ascii or sliced.allocated; - joiner.append( - sliced.slice(), - 0, - if (sliced.allocated) sliced.allocator else null, - ); - } + var sliced = current.toSlice(global, bun.default_allocator); + could_have_non_ascii = could_have_non_ascii or sliced.allocated; + joiner.append( + sliced.slice(), + 0, + if (sliced.allocated) sliced.allocator else null, + ); }, } current = stack.popOrNull() orelse break; @@ -4101,13 +4107,13 @@ pub const Body = struct { var ptr = bun.default_allocator.create(Blob) catch unreachable; ptr.* = blob; ptr.allocator = bun.default_allocator; - promise.asPromise().?.resolve(global, JSC.JSValue.fromRef(Blob.Class.make(global.ref(), ptr))); + promise.asPromise().?.resolve(global, ptr.toJS(global)); }, else => { var ptr = bun.default_allocator.create(Blob) catch unreachable; ptr.* = blob; ptr.allocator = bun.default_allocator; - promise.asInternalPromise().?.resolve(global, JSC.JSValue.fromRef(Blob.Class.make(global.ref(), ptr))); + promise.asInternalPromise().?.resolve(global, ptr.toJS(global)); }, } JSC.C.JSValueUnprotect(global.ref(), promise.asObjectRef()); @@ -4300,7 +4306,7 @@ pub const Body = struct { } body.value = .{ - .Blob = Blob.fromJS(globalThis, value, true, false) catch |err| { + .Blob = Blob.get(globalThis, value, true, false) catch |err| { if (err == error.InvalidArguments) { globalThis.throwInvalidArguments("Expected an Array", .{}); return null; @@ -4511,7 +4517,7 @@ pub const Request = struct { } if (urlOrObject.fastGet(globalThis, .body)) |body_| { - if (Blob.fromJS(globalThis, body_, true, false)) |blob| { + if (Blob.get(globalThis, body_, true, false)) |blob| { if (blob.size > 0) { request.body = Body.Value{ .Blob = blob }; } @@ -4536,7 +4542,7 @@ pub const Request = struct { } if (arguments[1].fastGet(globalThis, .body)) |body_| { - if (Blob.fromJS(globalThis, body_, true, false)) |blob| { + if (Blob.get(globalThis, body_, true, false)) |blob| { if (blob.size > 0) { request.body = Body.Value{ .Blob = blob }; } @@ -4693,7 +4699,7 @@ fn BlobInterface(comptime Type: type) type { var ptr = getAllocator(ctx).create(Blob) catch unreachable; ptr.* = blob; blob.allocator = getAllocator(ctx); - return JSC.JSPromise.resolvedPromiseValue(ctx.ptr(), JSValue.fromRef(Blob.Class.make(ctx, ptr))).asObjectRef(); + return JSC.JSPromise.resolvedPromiseValue(ctx.ptr(), ptr.toJS(ctx)).asObjectRef(); } // pub fn getBody( @@ -4775,7 +4781,7 @@ fn NewBlobInterface(comptime Type: type) type { var ptr = getAllocator(globalObject).create(Blob) catch unreachable; ptr.* = blob; blob.allocator = getAllocator(globalObject); - return JSC.JSPromise.resolvedPromiseValue(globalObject, JSValue.fromRef(Blob.Class.make(globalObject, ptr))); + return JSC.JSPromise.resolvedPromiseValue(globalObject, ptr.toJS(globalObject)); } // pub fn getBody( diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index b7af9a8df..5c7d08f4f 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -249,6 +249,13 @@ pub const StreamStart = union(Tag) { as_uint8array: bool, stream: bool, }, + FileSink: struct { + chunk_size: Blob.SizeType = 16384, + input_path: PathOrFileDescriptor, + truncate: bool = true, + close: bool = false, + mode: JSC.Node.Mode = 0, + }, HTTPSResponseSink: void, HTTPResponseSink: void, ready: void, @@ -258,6 +265,7 @@ pub const StreamStart = union(Tag) { err, chunk_size, ArrayBufferSink, + FileSink, HTTPSResponseSink, HTTPResponseSink, ready, @@ -334,6 +342,40 @@ pub const StreamStart = union(Tag) { }; } }, + .FileSink => { + var chunk_size: JSC.WebCore.Blob.SizeType = 0; + + if (value.get(globalThis, "highWaterMark")) |chunkSize| { + chunk_size = @intCast(JSC.WebCore.Blob.SizeType, @maximum(0, @truncate(i51, chunkSize.toInt64()))); + } + + if (value.get(globalThis, "path")) |path| { + return .{ + .FileSink = .{ + .chunk_size = chunk_size, + .input_path = .{ + .path = path.toSlice(globalThis, globalThis.bunVM().allocator), + }, + }, + }; + } else if (value.get(globalThis, "fd")) |fd| { + return .{ + .FileSink = .{ + .chunk_size = chunk_size, + .input_path = .{ + .fd = fd.toInt32(), + }, + }, + }; + } + + return .{ + .FileSink = .{ + .input_path = .{ .fd = std.math.maxInt(JSC.Node.FileDescriptor) }, + .chunk_size = chunk_size, + }, + }; + }, .HTTPSResponseSink, .HTTPResponseSink => { var empty = true; var chunk_size: JSC.WebCore.Blob.SizeType = 2048; @@ -406,7 +448,16 @@ pub const StreamResult = union(Tag) { frame: anyframe, result: Writable, consumed: Blob.SizeType = 0, - used: bool = false, + state: StreamResult.Pending.State = .none, + + pub fn run(this: *Writable.Pending) void { + if (this.state != .pending) { + return; + } + + this.state = .used; + resume this.frame; + } }; pub fn toPromised(globalThis: *JSGlobalObject, promise: *JSPromise, pending: *Writable.Pending) void { @@ -424,7 +475,6 @@ pub const StreamResult = union(Tag) { fn toPromisedWrap(globalThis: *JSGlobalObject, promise: *JSPromise, pending: *Writable.Pending) void { suspend {} - pending.used = true; const result: Writable = pending.result; switch (result) { @@ -472,7 +522,19 @@ pub const StreamResult = union(Tag) { pub const Pending = struct { frame: anyframe, result: StreamResult, - used: bool = false, + state: State = .none, + + pub const State = enum { + none, + pending, + used, + }; + + pub fn run(this: *Pending) void { + if (this.state != .pending) return; + this.state = .used; + resume this.frame; + } }; pub fn isDone(this: *const StreamResult) bool { @@ -485,7 +547,6 @@ pub const StreamResult = union(Tag) { fn toPromisedWrap(globalThis: *JSGlobalObject, promise: *JSPromise, pending: *Pending) void { suspend {} - pending.used = true; const result: StreamResult = pending.result; switch (result) { @@ -846,6 +907,360 @@ pub const Sink = struct { } }; +pub const PathOrFileDescriptor = union(enum) { + path: ZigString.Slice, + fd: JSC.Node.FileDescriptor, +}; + +pub const FileSink = struct { + buffer: bun.ByteList, + allocator: std.mem.Allocator, + done: bool = false, + signal: Signal = .{}, + next: ?Sink = null, + auto_close: bool = false, + auto_truncate: bool = false, + opened_fd: JSC.Node.FileDescriptor = std.math.maxInt(JSC.Node.FileDescriptor), + mode: JSC.Node.Mode = 0, + chunk_size: usize = 0, + pending: StreamResult.Writable.Pending = StreamResult.Writable.Pending{ + .frame = undefined, + .result = .{ .done = {} }, + }, + + scheduled_count: u32 = 0, + written: usize = 0, + head: usize = 0, + requested_end: bool = false, + + pub fn prepare(this: *FileSink, input_path: PathOrFileDescriptor, mode: JSC.Node.Mode) JSC.Node.Maybe(void) { + var file_buf: [std.fs.MAX_PATH_BYTES]u8 = undefined; + const auto_close = this.auto_close; + const fd = if (!auto_close) + input_path.fd + else switch (JSC.Node.Syscall.open(input_path.path.toSliceZ(&file_buf), std.os.O.WRONLY | std.os.O.NONBLOCK | std.os.O.CLOEXEC | std.os.O.CREAT, mode)) { + .result => |_fd| _fd, + .err => |err| return .{ .err = err.withPath(input_path.path.slice()) }, + }; + + const stat: std.os.Stat = switch (JSC.Node.Syscall.fstat(fd)) { + .result => |result| result, + .err => |err| { + if (auto_close) { + _ = JSC.Node.Syscall.close(fd); + } + if (input_path == .path) + return .{ .err = err.withPath(input_path.path.slice()) }; + return .{ .err = err }; + }, + }; + + this.mode = stat.mode; + this.opened_fd = fd; + + return .{ .result = {} }; + } + + pub fn connect(this: *FileSink, signal: Signal) void { + std.debug.assert(this.reader == null); + this.signal = signal; + } + + pub fn start(this: *FileSink, stream_start: StreamStart) JSC.Node.Maybe(void) { + if (this.opened_fd != std.math.maxInt(JSC.Node.FileDescriptor)) { + _ = JSC.Node.Syscall.close(this.opened_fd); + this.opened_fd = std.math.maxInt(JSC.Node.FileDescriptor); + } + + this.done = false; + this.written = 0; + this.auto_close = false; + this.auto_truncate = false; + this.requested_end = false; + + this.buffer.len = 0; + + switch (stream_start) { + .FileSink => |config| { + this.chunk_size = config.chunk_size; + this.auto_close = config.close or config.input_path == .path; + this.auto_truncate = config.truncate; + + defer if (config.input_path == .path) config.input_path.path.deinit(); + + switch (this.prepare(config.input_path, config.mode)) { + .err => |err| { + return .{ .err = err }; + }, + .result => {}, + } + }, + else => {}, + } + + this.signal.start(); + return .{ .result = {} }; + } + + pub fn flush(this: *FileSink) StreamResult.Writable { + std.debug.assert(this.opened_fd != std.math.maxInt(JSC.Node.FileDescriptor)); + + var total: usize = this.written; + const initial = total; + defer this.written = total; + const fd = this.opened_fd; + var remain = this.buffer.slice(); + remain = remain[@minimum(this.head, remain.len)..]; + defer { + std.debug.assert(total - initial == @ptrToInt(remain.ptr) - @ptrToInt(this.buffer.ptr)); + + if (remain.len == 0) { + this.head = 0; + this.buffer.len = 0; + } else { + this.head += total - initial; + } + } + while (remain.len > 0) { + const res = JSC.Node.Syscall.write(fd, remain); + if (res == .err) { + const retry = comptime if (Environment.isLinux) + std.os.E.WOULDBLOCK + else + std.os.E.AGAIN; + + switch (res.err.getErrno()) { + retry => { + this.watch(); + return .{ + .pending = &this.pending, + }; + }, + else => {}, + } + this.pending.result = .{ .err = res.err }; + this.pending.consumed = @truncate(Blob.SizeType, total - initial); + + return .{ .err = res.err }; + } + + remain = remain[res.result..]; + total += res.result; + if (res.result == 0) break; + } + + this.pending.result = .{ + .owned = @truncate(Blob.SizeType, total), + }; + this.pending.consumed = @truncate(Blob.SizeType, total - initial); + if (this.requested_end) { + this.done = true; + if (this.auto_truncate) + std.os.ftruncate(this.opened_fd, total) catch {}; + + if (this.auto_close) { + _ = JSC.Node.Syscall.close(this.opened_fd); + this.opened_fd = std.math.maxInt(JSC.Node.FileDescriptor); + } + } + this.pending.run(); + return .{ .owned = @truncate(Blob.SizeType, total - initial) }; + } + + pub fn flushFromJS(this: *FileSink, globalThis: *JSGlobalObject, _: bool) JSC.Node.Maybe(JSValue) { + const result = this.flush(); + + if (result == .err) { + return .{ .err = result.err }; + } + + return JSC.Node.Maybe(JSValue){ + .result = result.toJS(globalThis), + }; + } + + fn cleanup(this: *FileSink) void { + if (this.opened_fd != std.math.maxInt(JSC.Node.FileDescriptor)) { + _ = JSC.Node.Syscall.close(this.opened_fd); + this.opened_fd = std.math.maxInt(JSC.Node.FileDescriptor); + } + + if (this.buffer.len > 0) { + this.buffer.listManaged(this.allocator).deinit(); + this.buffer = bun.ByteList.init(""); + this.done = true; + this.head = 0; + } + } + + pub fn finalize(this: *FileSink) void { + this.cleanup(); + + this.allocator.destroy(this); + } + + pub fn init(allocator: std.mem.Allocator, next: ?Sink) !*FileSink { + var this = try allocator.create(FileSink); + this.* = FileSink{ + .buffer = bun.ByteList.init(&.{}), + .allocator = allocator, + .next = next, + }; + return this; + } + + pub fn construct( + this: *FileSink, + allocator: std.mem.Allocator, + ) void { + this.* = FileSink{ + .buffer = bun.ByteList.init(&.{}), + .allocator = allocator, + .next = null, + }; + } + + pub fn watch(this: *FileSink) void { + std.debug.assert(this.opened_fd != std.math.maxInt(JSC.Node.FileDescriptor)); + _ = JSC.VirtualMachine.vm.poller.watch(this.opened_fd, .write, FileSink, this); + this.scheduled_count += 1; + } + + pub fn toJS(this: *FileSink, globalThis: *JSGlobalObject) JSValue { + return JSSink.createObject(globalThis, this); + } + + pub fn onPoll(this: *FileSink, _: i64, _: u16) void { + this.scheduled_count -= 1; + this.flush(); + } + + pub fn write(this: *@This(), data: StreamResult) StreamResult.Writable { + const input = data.slice(); + + if (!this.isPending() and this.buffer.len == 0 and input.len >= this.chunk_size) { + var temp = this.buffer; + defer this.buffer = temp; + this.buffer = bun.ByteList.init(input); + const result = this.flush(); + if (this.isPending()) { + _ = temp.write(this.allocator, input) catch { + return .{ .err = Syscall.Error.oom }; + }; + } + + return result; + } + + const len = this.buffer.write(this.allocator, input) catch { + return .{ .err = Syscall.Error.oom }; + }; + + if (!this.isPending() and this.buffer.len >= this.chunk_size) { + return this.flush(); + } + + this.signal.ready(null, null); + return .{ .owned = len }; + } + pub const writeBytes = write; + pub fn writeLatin1(this: *@This(), data: StreamResult) StreamResult.Writable { + const input = data.slice(); + + if (!this.isPending() and this.buffer.len == 0 and input.len >= this.chunk_size and strings.isAllASCII(input)) { + var temp = this.buffer; + defer this.buffer = temp; + this.buffer = bun.ByteList.init(input); + const result = this.flush(); + if (this.isPending()) { + _ = temp.write(this.allocator, input) catch { + return .{ .err = Syscall.Error.oom }; + }; + } + + return result; + } + + const len = this.buffer.writeLatin1(this.allocator, input) catch { + return .{ .err = Syscall.Error.oom }; + }; + + if (!this.isPending() and this.buffer.len >= this.chunk_size) { + return this.flush(); + } + + this.signal.ready(null, null); + return .{ .owned = len }; + } + pub fn writeUTF16(this: *@This(), data: StreamResult) StreamResult.Writable { + if (this.next) |*next| { + return next.writeUTF16(data); + } + const len = this.buffer.writeUTF16(this.allocator, @ptrCast([*]const u16, @alignCast(@alignOf(u16), data.slice().ptr))[0..std.mem.bytesAsSlice(u16, data.slice()).len]) catch { + return .{ .err = Syscall.Error.oom }; + }; + if (!this.isPending() and this.buffer.len >= this.chunk_size) { + return this.flush(); + } + this.signal.ready(null, null); + + return .{ .owned = len }; + } + + fn isPending(this: *const FileSink) bool { + return this.scheduled_count > 0; + } + + pub fn end(this: *FileSink, err: ?Syscall.Error) JSC.Node.Maybe(void) { + if (this.next) |*next| { + return next.end(err); + } + this.requested_end = true; + + const flushy = this.flush(); + + if (flushy == .err) { + return .{ .err = flushy.err }; + } + + if (flushy != .pending) { + this.cleanup(); + } + + this.signal.close(err); + return .{ .result = {} }; + } + + pub fn endFromJS(this: *FileSink, globalThis: *JSGlobalObject) JSC.Node.Maybe(JSValue) { + if (this.done) { + return .{ .result = JSValue.jsNumber(this.written) }; + } + + std.debug.assert(this.next == null); + this.requested_end = true; + + const flushed = this.flush(); + + if (flushed == .err) { + return .{ .err = flushed.err }; + } + + if (flushed != .pending) { + this.cleanup(); + } + + this.signal.close(null); + + return .{ .result = flushed.toJS(globalThis) }; + } + + pub fn sink(this: *FileSink) Sink { + return Sink.init(this); + } + + pub const JSSink = NewJSSink(@This(), "FileSink"); +}; + pub const ArrayBufferSink = struct { bytes: bun.ByteList, allocator: std.mem.Allocator, @@ -2302,7 +2717,7 @@ pub const FileBlobLoader = struct { callback: anyframe = undefined, pending: StreamResult.Pending = StreamResult.Pending{ .frame = undefined, - .used = false, + .state = .none, .result = .{ .done = {} }, }, cancelled: bool = false, @@ -2348,7 +2763,7 @@ pub const FileBlobLoader = struct { pub fn taskCallback(task: *NetworkThread.Task) void { var this = @fieldParentPtr(FileBlobLoader, "concurrent", @fieldParentPtr(Concurrent, "task", task)); - var frame = HTTPClient.getAllocator().create(@Frame(runAsync)) catch unreachable; + var frame = bun.default_allocator.create(@Frame(runAsync)) catch unreachable; _ = @asyncCall(std.mem.asBytes(frame), undefined, runAsync, .{this}); } @@ -2430,7 +2845,7 @@ pub const FileBlobLoader = struct { suspend { var _frame = @frame(); - var this_frame = HTTPClient.getAllocator().create(std.meta.Child(@TypeOf(_frame))) catch unreachable; + var this_frame = bun.default_allocator.create(std.meta.Child(@TypeOf(_frame))) catch unreachable; this_frame.* = _frame.*; this.concurrent.read_frame = this_frame; } @@ -2445,9 +2860,7 @@ pub const FileBlobLoader = struct { this.protected_view = JSC.JSValue.zero; if (this.finalized and this.scheduled_count == 0) { - if (!this.pending.used) { - resume this.pending.frame; - } + this.pending.run(); this.scheduled_count -= 1; this.deinit(); @@ -2455,7 +2868,7 @@ pub const FileBlobLoader = struct { return; } - if (!this.pending.used and this.pending.result == .err and this.concurrent.read == 0) { + if (this.pending.state == .pending and this.pending.result == .err and this.concurrent.read == 0) { resume this.pending.frame; this.scheduled_count -= 1; this.finalize(); @@ -2489,7 +2902,7 @@ pub const FileBlobLoader = struct { Concurrent.scheduleRead(this); suspend { - HTTPClient.getAllocator().destroy(@frame()); + bun.default_allocator.destroy(@frame()); } } }; @@ -2742,7 +3155,7 @@ pub const FileBlobLoader = struct { } } if (this.finalized and this.scheduled_count == 0) { - if (!this.pending.used) { + if (this.pending.state == .pending) { // should never be reached this.pending.result = .{ .err = Syscall.Error.todo, @@ -2762,7 +3175,7 @@ pub const FileBlobLoader = struct { } this.pending.result = this.read(this.buf, this.protected_view); - resume this.pending.frame; + this.pending.run(); } pub fn finalize(this: *FileBlobLoader) void { @@ -2783,7 +3196,7 @@ pub const FileBlobLoader = struct { pub fn deinit(this: *FileBlobLoader) void { this.finalize(); - if (this.scheduled_count == 0 and !this.pending.used) { + if (this.scheduled_count == 0 and this.pending.state == .pending) { this.destroy(); } } |