diff options
author | 2022-06-02 03:00:45 -0700 | |
---|---|---|
committer | 2022-06-02 03:00:45 -0700 | |
commit | e5eabc0658d2133603596ec17a6e7c956c5fe28c (patch) | |
tree | 8e50a0bfa0ca9eba4145191720bb7d412bf8d26f /src/javascript/jsc/webcore/response.zig | |
parent | 121c2960de87c53cc6bdd5f92fab627a74d21a2b (diff) | |
download | bun-e5eabc0658d2133603596ec17a6e7c956c5fe28c.tar.gz bun-e5eabc0658d2133603596ec17a6e7c956c5fe28c.tar.zst bun-e5eabc0658d2133603596ec17a6e7c956c5fe28c.zip |
Faster ReadableStream
Diffstat (limited to 'src/javascript/jsc/webcore/response.zig')
-rw-r--r-- | src/javascript/jsc/webcore/response.zig | 329 |
1 files changed, 114 insertions, 215 deletions
diff --git a/src/javascript/jsc/webcore/response.zig b/src/javascript/jsc/webcore/response.zig index 137ce35b8..3e42c174a 100644 --- a/src/javascript/jsc/webcore/response.zig +++ b/src/javascript/jsc/webcore/response.zig @@ -1557,7 +1557,7 @@ pub const Blob = struct { is_all_ascii: ?bool = null, allocator: std.mem.Allocator, - // 8 MB ought to be enough + // 2 MB ought to be enough pub const max_chunk_size = 1024 * 1024 * 2; pub export fn BlobStore__ref(ptr: *anyopaque) void { @@ -4988,6 +4988,12 @@ pub const FetchEvent = struct { } }; +pub const StreamStart = union(enum) { + empty: void, + err: JSC.Node.Syscall.Error, + chunk_size: Blob.SizeType, +}; + pub const StreamResult = union(enum) { owned: bun.ByteList, owned_and_done: bun.ByteList, @@ -5000,8 +5006,8 @@ pub const StreamResult = union(enum) { done: void, pub const IntoArray = struct { - value: JSValue, - len: usize = std.math.maxInt(usize), + value: JSValue = JSValue.zero, + len: Blob.SizeType = std.math.maxInt(Blob.SizeType), }; pub const Pending = struct { @@ -5010,6 +5016,13 @@ pub const StreamResult = union(enum) { used: bool = false, }; + pub fn isDone(this: *const StreamResult) bool { + return switch (this.*) { + .owned_and_done, .temporary_and_done, .into_array_and_done, .done, .err => true, + else => false, + }; + } + fn toPromisedWrap(globalThis: *JSGlobalObject, promise: *JSPromise, pending: *Pending) void { suspend {} @@ -5038,10 +5051,10 @@ pub const StreamResult = union(enum) { pub fn toJS(this: *const StreamResult, globalThis: *JSGlobalObject) JSValue { switch (this.*) { .owned => |list| { - return JSC.ArrayBuffer.fromBytes(list.slice(), .Uint8Array).toJSAutoAllocator(globalThis.ref(), null); + return JSC.ArrayBuffer.fromBytes(list.slice(), .Uint8Array).toJS(globalThis.ref(), null); }, .owned_and_done => |list| { - return JSC.ArrayBuffer.fromBytes(list.slice(), .Uint8Array).toJSAutoAllocator(globalThis.ref(), null); + return JSC.ArrayBuffer.fromBytes(list.slice(), .Uint8Array).toJS(globalThis.ref(), null); }, .temporary => |temp| { var array = JSC.JSValue.createUninitializedUint8Array(globalThis, temp.len); @@ -5056,10 +5069,10 @@ pub const StreamResult = union(enum) { return array; }, .into_array => |array| { - return array.value; + return JSC.JSValue.jsNumberFromInt64(array.len); }, .into_array_and_done => |array| { - return array.value; + return JSC.JSValue.jsNumberFromInt64(array.len); }, .pending => |pending| { var promise = JSC.JSPromise.create(globalThis); @@ -5115,7 +5128,7 @@ pub fn WritableStreamSink( return onWrite(&this.context, bytes); } - pub fn start(this: *This) StreamResult { + pub fn start(this: *This) StreamStart { return onStart(&this.context); } @@ -5291,21 +5304,21 @@ pub fn ReadableStreamSource( const This = @This(); const ReadableStreamSourceType = @This(); - pub fn pull(this: *This) StreamResult { - return onPull(&this.context); + pub fn pull(this: *This, buf: []u8) StreamResult { + return onPull(&this.context, buf, JSValue.zero); } pub fn start( this: *This, - ) StreamResult { + ) StreamStart { return onStart(&this.context); } - pub fn pullFromJS(this: *This) StreamResult { - return onPull(&this.context); + pub fn pullFromJS(this: *This, buf: []u8, view: JSValue) StreamResult { + return onPull(&this.context, buf, view); } - pub fn startFromJS(this: *This) StreamResult { + pub fn startFromJS(this: *This) StreamStart { return onStart(&this.context); } @@ -5356,19 +5369,24 @@ pub fn ReadableStreamSource( pub fn pull(globalThis: *JSGlobalObject, callFrame: *JSC.CallFrame) callconv(.C) JSC.JSValue { var this = callFrame.argument(0).asPtr(ReadableStreamSourceType); + const view = callFrame.argument(1); + view.ensureStillAlive(); + var buffer = view.asArrayBuffer(globalThis) orelse return JSC.JSValue.jsUndefined(); return processResult( globalThis, callFrame, - this.pullFromJS(), + this.pullFromJS(buffer.slice(), view), ); } pub fn start(globalThis: *JSGlobalObject, callFrame: *JSC.CallFrame) callconv(.C) JSC.JSValue { var this = callFrame.argument(0).asPtr(ReadableStreamSourceType); - return processResult( - globalThis, - callFrame, - this.startFromJS(), - ); + switch (this.startFromJS()) { + .empty => return JSValue.jsNumber(0), + .chunk_size => |size| return JSValue.jsNumber(size), + .err => |err| { + return err.toJSC(globalThis); + }, + } } pub fn processResult(globalThis: *JSGlobalObject, callFrame: *JSC.CallFrame, result: StreamResult) JSC.JSValue { @@ -5475,30 +5493,35 @@ pub const ByteBlobLoader = struct { }; } - pub fn onStart(this: *ByteBlobLoader) StreamResult { - return this.onPull(); + pub fn onStart(this: *ByteBlobLoader) StreamStart { + return .{ .chunk_size = this.chunk_size }; } - pub fn onPull(this: *ByteBlobLoader) StreamResult { + pub fn onPull(this: *ByteBlobLoader, buffer: []u8, array: JSC.JSValue) StreamResult { if (this.done) { return .{ .done = {} }; } var temporary = this.store.sharedView(); temporary = temporary[this.offset..]; - temporary = temporary[0..@minimum(this.chunk_size, @minimum(temporary.len, this.remain))]; + + temporary = temporary[0..@minimum(buffer.len, @minimum(temporary.len, this.remain))]; if (temporary.len == 0) { this.store.deref(); this.done = true; return .{ .done = {} }; } - this.remain -|= @intCast(Blob.SizeType, temporary.len); - this.offset +|= @intCast(Blob.SizeType, temporary.len); + const copied = @intCast(Blob.SizeType, temporary.len); - return .{ - .temporary = bun.ByteList.init(temporary), - }; + this.remain -|= copied; + this.offset +|= copied; + @memcpy(temporary.ptr, buffer.ptr, temporary.len); + if (this.remain == 0) { + return .{ .into_array_and_done = .{ .value = array, .len = copied } }; + } + + return .{ .into_array = .{ .value = array, .len = copied } }; } pub fn onCancel(_: *ByteBlobLoader) void {} @@ -5517,7 +5540,7 @@ pub const ByteBlobLoader = struct { pub const FileBlobLoader = struct { buf: []u8 = &[_]u8{}, - uint8array: JSC.JSValue = JSC.JSValue.zero, + protected_view: JSC.JSValue = JSC.JSValue.zero, fd: JSC.Node.FileDescriptor = 0, auto_close: bool = false, loop: *JSC.EventLoop = undefined, @@ -5538,7 +5561,7 @@ pub const FileBlobLoader = struct { const FileReader = @This(); - const run_on_different_thread_size = 1024 * 512; + const run_on_different_thread_size = bun.huge_allocator_threshold; pub const tag = ReadableStream.Tag.File; @@ -5635,40 +5658,23 @@ pub const FileBlobLoader = struct { scheduleMainThreadTask(this); return; } + } - AsyncIO.global.read( - *FileBlobLoader, - this, - onRead, - &this.concurrent.completion, - this.fd, - this.buf[this.concurrent.read..], - null, - ); - - suspend { - var _frame = @frame(); - var this_frame = HTTPClient.getAllocator().create(std.meta.Child(@TypeOf(_frame))) catch unreachable; - this_frame.* = _frame.*; - this.concurrent.read_frame = this_frame; - } - } else { - AsyncIO.global.read( - *FileBlobLoader, - this, - onRead, - &this.concurrent.completion, - this.fd, - this.buf[this.concurrent.read..], - null, - ); + AsyncIO.global.read( + *FileBlobLoader, + this, + onRead, + &this.concurrent.completion, + this.fd, + this.buf[this.concurrent.read..], + null, + ); - suspend { - var _frame = @frame(); - var this_frame = HTTPClient.getAllocator().create(std.meta.Child(@TypeOf(_frame))) catch unreachable; - this_frame.* = _frame.*; - this.concurrent.read_frame = this_frame; - } + suspend { + var _frame = @frame(); + var this_frame = HTTPClient.getAllocator().create(std.meta.Child(@TypeOf(_frame))) catch unreachable; + this_frame.* = _frame.*; + this.concurrent.read_frame = this_frame; } scheduleMainThreadTask(this); @@ -5676,12 +5682,16 @@ pub const FileBlobLoader = struct { pub fn onJSThread(task_ctx: *anyopaque) void { var this: *FileBlobLoader = bun.cast(*FileBlobLoader, task_ctx); + const protected_view = this.protected_view; + defer protected_view.unprotect(); + this.protected_view = JSC.JSValue.zero; if (this.finalized and this.scheduled_count == 0) { if (!this.pending.used) { resume this.pending.frame; } this.scheduled_count -= 1; + this.deinit(); return; @@ -5702,9 +5712,12 @@ pub const FileBlobLoader = struct { return; } - this.pending.result = this.handleReadChunk(this.buf, @as(usize, this.concurrent.read)); + this.pending.result = this.handleReadChunk(@as(usize, this.concurrent.read)); resume this.pending.frame; this.scheduled_count -= 1; + if (this.pending.result.isDone()) { + this.finalize(); + } } pub fn scheduleMainThreadTask(this: *FileBlobLoader) void { @@ -5714,14 +5727,6 @@ pub const FileBlobLoader = struct { fn runAsync(this: *FileBlobLoader) void { this.concurrent.read = 0; - this.buf = bun.auto_allocator.alloc(u8, this.concurrent.chunk_size) catch { - this.pending.result = .{ .err = JSC.Node.Syscall.Error.oom }; - this.concurrent.read = 0; - scheduleMainThreadTask(this); - - return; - }; - _ = bun.C.posix_madvise(this.buf.ptr, this.buf.len, 2); Concurrent.scheduleRead(this); @@ -5742,7 +5747,7 @@ pub const FileBlobLoader = struct { const default_fifo_chunk_size = 1024; const default_file_chunk_size = 1024 * 1024 * 2; - pub fn onStart(this: *FileBlobLoader) StreamResult { + pub fn onStart(this: *FileBlobLoader) StreamStart { var file = &this.store.data.file; var file_buf: [std.fs.MAX_PATH_BYTES]u8 = undefined; var auto_close = this.auto_close; @@ -5800,15 +5805,6 @@ pub const FileBlobLoader = struct { return .{ .err = err }; } - // if (comptime Environment.isMac) { - // if (std.os.S.ISSOCK(stat.mode)) { - // // darwin doesn't support os.MSG.NOSIGNAL, - // // but instead a socket option to avoid SIGPIPE. - // const _bytes = &std.mem.toBytes(@as(c_int, 1)); - // _ = std.os.darwin.setsockopt(fd, std.os.SOL.SOCKET, std.os.SO.NOSIGPIPE, _bytes, @intCast(std.os.socklen_t, _bytes.len)); - // } - // } - file.seekable = std.os.S.ISREG(stat.mode); file.mode = @intCast(JSC.Node.Mode, stat.mode); this.mode = file.mode; @@ -5821,45 +5817,14 @@ pub const FileBlobLoader = struct { _ = JSC.Node.Syscall.close(fd); } this.deinit(); - return .{ .done = {} }; + return .{ .empty = {} }; } this.fd = fd; this.auto_close = auto_close; const chunk_size = this.calculateChunkSize(std.math.maxInt(usize)); - - if (chunk_size >= run_on_different_thread_size) { - // should never be reached - this.pending.result = .{ - .err = JSC.Node.Syscall.Error.todo, - }; - - this.scheduleAsync(@truncate(Blob.SizeType, chunk_size)); - - return .{ .pending = &this.pending }; - } - - if (this.allocateBuffer(chunk_size)) |err| { - this.deinit(); - return .{ .err = err }; - } - - return this.read(this.buf); - } - - // Disabled because it's not fully implemented - // Maybe we should allocate as an ArrayBuffer instead of a Uint8Array? - fn shouldUseJSCHeap(this: *const FileBlobLoader, chunk_size: Blob.SizeType) bool { - _ = this; - _ = chunk_size; - return false; - // const file = &this.store.data.file; - - // if (!(file.seekable orelse false)) - // return false; - - // return file.max_size - this.total_read >= chunk_size; + return .{ .chunk_size = @truncate(Blob.SizeType, chunk_size) }; } fn calculateChunkSize(this: *FileBlobLoader, available_to_read: usize) usize { @@ -5878,55 +5843,32 @@ pub const FileBlobLoader = struct { @minimum(available_to_read, chunk_size); } - fn allocateBuffer(this: *FileBlobLoader, chunk_size: usize) ?JSC.Node.Syscall.Error { - var file = &this.store.data.file; - - if (this.shouldUseJSCHeap(@intCast(Blob.SizeType, chunk_size))) { - this.uint8array = JSValue.createUninitializedUint8Array(this.loop.global, chunk_size); - this.uint8array.ensureStillAlive(); - this.buf = this.uint8array.asArrayBuffer(this.loop.global).?.slice(); - } else { - this.buf = bun.auto_allocator.alloc( - u8, - chunk_size, - ) catch { - this.maybeAutoClose(); - return JSC.Node.Syscall.Error.oom.withPath(if (file.pathlike.path.slice().len > 0) file.pathlike.path.slice() else ""); - }; - } - - return null; - } - - pub fn onPull(this: *FileBlobLoader) StreamResult { - if (this.buf.len == 0) { - const chunk_size = this.calculateChunkSize(std.math.maxInt(usize)); + pub fn onPull(this: *FileBlobLoader, buffer: []u8, view: JSC.JSValue) StreamResult { + const chunk_size = this.calculateChunkSize(std.math.maxInt(usize)); - switch (chunk_size) { - 0 => { - std.debug.assert(this.store.data.file.seekable orelse false); - this.finalize(); - return .{ .done = {} }; - }, - run_on_different_thread_size...std.math.maxInt(@TypeOf(chunk_size)) => { - // should never be reached - this.pending.result = .{ - .err = JSC.Node.Syscall.Error.todo, - }; + switch (chunk_size) { + 0 => { + std.debug.assert(this.store.data.file.seekable orelse false); + this.finalize(); + return .{ .done = {} }; + }, + run_on_different_thread_size...std.math.maxInt(@TypeOf(chunk_size)) => { + this.protected_view = view; + this.protected_view.protect(); + // should never be reached + this.pending.result = .{ + .err = JSC.Node.Syscall.Error.todo, + }; + this.buf = buffer; - this.scheduleAsync(@truncate(Blob.SizeType, chunk_size)); + this.scheduleAsync(@truncate(Blob.SizeType, chunk_size)); - return .{ .pending = &this.pending }; - }, - else => { - if (this.allocateBuffer(chunk_size)) |err| { - return .{ .err = err }; - } - }, - } + return .{ .pending = &this.pending }; + }, + else => {}, } - return this.read(this.buf); + return this.read(buffer, view); } fn maybeAutoClose(this: *FileBlobLoader) void { @@ -5936,7 +5878,7 @@ pub const FileBlobLoader = struct { } } - fn handleReadChunk(this: *FileBlobLoader, read_buf: []u8, result: usize) StreamResult { + fn handleReadChunk(this: *FileBlobLoader, result: usize) StreamResult { this.total_read += @intCast(Blob.SizeType, result); const remaining: Blob.SizeType = if (this.store.data.file.seekable orelse false) this.store.data.file.max_size -| this.total_read @@ -5954,52 +5896,16 @@ pub const FileBlobLoader = struct { const has_more = remaining > 0; if (!has_more) { - this.uint8array.ensureStillAlive(); - - defer { - this.uint8array.ensureStillAlive(); - this.buf = &.{}; - this.uint8array = JSValue.zero; - this.finalize(); - } - - if (this.uint8array.isEmpty()) { - return .{ - .owned_and_done = bun.ByteList.init(read_buf[0..result]), - }; - } else { - return .{ - .into_array_and_done = .{ - .value = this.uint8array, - .len = result, - }, - }; - } + return .{ .into_array_and_done = .{ .len = @truncate(Blob.SizeType, result) } }; } - if (this.uint8array.isEmpty()) { - defer this.buf = &.{}; - return .{ - .owned = bun.ByteList.init(read_buf[0..result]), - }; - } else { - defer { - this.buf = &.{}; - this.uint8array = JSValue.zero; - } - - return .{ - .into_array = .{ - .value = this.uint8array, - .len = result, - }, - }; - } + return .{ .into_array = .{ .len = @truncate(Blob.SizeType, result) } }; } pub fn read( this: *FileBlobLoader, read_buf: []u8, + view: JSC.JSValue, ) StreamResult { const rc = JSC.Node.Syscall.read(this.fd, read_buf); @@ -6013,6 +5919,9 @@ pub const FileBlobLoader = struct { switch (err.getErrno()) { retry => { + this.protected_view = view; + this.protected_view.protect(); + this.buf = read_buf; this.watch(); return .{ .pending = &this.pending, @@ -6029,7 +5938,7 @@ pub const FileBlobLoader = struct { return .{ .err = sys }; }, .result => |result| { - return this.handleReadChunk(read_buf, result); + return this.handleReadChunk(result); }, } } @@ -6037,6 +5946,9 @@ pub const FileBlobLoader = struct { pub fn callback(task: ?*anyopaque, sizeOrOffset: i64, _: u16) void { var this: *FileReader = bun.cast(*FileReader, task.?); this.scheduled_count -= 1; + const protected_view = this.protected_view; + defer protected_view.unprotect(); + this.protected_view = JSValue.zero; var available_to_read: usize = std.math.maxInt(usize); if (comptime Environment.isMac) { @@ -6064,16 +5976,12 @@ pub const FileBlobLoader = struct { return; if (this.buf.len == 0) { - if (this.allocateBuffer(available_to_read)) |err| { - this.pending.result = .{ .err = err }; - resume this.pending.frame; - return; - } + return; } else { this.buf.len = @minimum(this.buf.len, available_to_read); } - this.pending.result = this.read(this.buf); + this.pending.result = this.read(this.buf, this.protected_view); resume this.pending.frame; } @@ -6081,15 +5989,6 @@ pub const FileBlobLoader = struct { if (this.finalized) return; this.finalized = true; - if (this.buf.len > 0) { - if (this.uint8array.isEmpty()) { - bun.default_allocator.free(this.buf); - this.buf = &.{}; - } else { - this.buf = &.{}; - this.uint8array = JSValue.zero; - } - } this.maybeAutoClose(); |