aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js
diff options
context:
space:
mode:
authorGravatar Ciro Spaciari <ciro.spaciari@gmail.com> 2023-08-22 00:30:34 -0300
committerGravatar GitHub <noreply@github.com> 2023-08-21 20:30:34 -0700
commit9027484ae1a9eaf9769d79828db48de66450a3fc (patch)
treee6b679132046bbc0ef254a170c0b0ec3030d3fa2 /src/bun.js
parent91eacade975a522f7ad3d62185ff8adace3aad97 (diff)
downloadbun-9027484ae1a9eaf9769d79828db48de66450a3fc.tar.gz
bun-9027484ae1a9eaf9769d79828db48de66450a3fc.tar.zst
bun-9027484ae1a9eaf9769d79828db48de66450a3fc.zip
fetch(stream) add stream support for compressed and uncompressed data (#4127)
* streams non compressed data in 64kb chunks (at least) * fmt * wip remove pause * fix default streaming and buffering * fix atomic lags * fix size * make chunked encoding work again (WIP streaming chunked) * WIP: chunked encoding streaming * fix end of streamings * working streaming + compression * add fixes + tests * fmt + fix proxy * fix oopsies * codegen after merge * fmt + fixes * more fixes * more fixes and logs * avoid double free * check empty before pop * check empty on pop * fix copy to real when complete * remove unnecessary logs * better has_schedule_callback swap, body locked size helper, remove isEmpty from unbounded_queue pop * fix response ref, fix body_size * add deflate support, fix error throw, add more tests * codegen after merge * remove logs, add connection close test * fix macOS build * fix redirect error option * make body_size more clear * support new Reponse(response) * toString DOMWrapper objects properly instead of supporting response in Response constructor * ignore headers with no name, add more tests * oops * handle transform with fetch * add gz image stream test * remove duplicate test * fix missing chunk on macOS under pressure * oops include all OS * some fixes * compare buffers instead of sizes * refactor err.err and protect it
Diffstat (limited to 'src/bun.js')
-rw-r--r--src/bun.js/api/bun/subprocess.zig6
-rw-r--r--src/bun.js/bindings/bindings.cpp2
-rw-r--r--src/bun.js/event_loop.zig3
-rw-r--r--src/bun.js/webcore/blob.zig20
-rw-r--r--src/bun.js/webcore/body.zig18
-rw-r--r--src/bun.js/webcore/response.zig260
-rw-r--r--src/bun.js/webcore/streams.zig89
7 files changed, 355 insertions, 43 deletions
diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig
index 4fb02b8af..b8bf6939c 100644
--- a/src/bun.js/api/bun/subprocess.zig
+++ b/src/bun.js/api/bun/subprocess.zig
@@ -616,7 +616,11 @@ pub const Subprocess = struct {
return;
},
.err => |err| {
- this.status = .{ .err = err };
+ if (err == .Error) {
+ this.status = .{ .err = err.Error };
+ } else {
+ this.status = .{ .err = JSC.Node.Syscall.Error.fromCode(.CANCELED, .read) };
+ }
this.fifo.close();
return;
diff --git a/src/bun.js/bindings/bindings.cpp b/src/bun.js/bindings/bindings.cpp
index 37594288d..badfd3437 100644
--- a/src/bun.js/bindings/bindings.cpp
+++ b/src/bun.js/bindings/bindings.cpp
@@ -1181,7 +1181,7 @@ WebCore::FetchHeaders* WebCore__FetchHeaders__createFromPicoHeaders_(const void*
for (size_t j = 0; j < end; j++) {
PicoHTTPHeader header = pico_headers.ptr[j];
- if (header.value.len == 0)
+ if (header.value.len == 0 || header.name.len == 0)
continue;
StringView nameView = StringView(reinterpret_cast<const char*>(header.name.ptr), header.name.len);
diff --git a/src/bun.js/event_loop.zig b/src/bun.js/event_loop.zig
index 92613d0f0..92874b6a4 100644
--- a/src/bun.js/event_loop.zig
+++ b/src/bun.js/event_loop.zig
@@ -545,8 +545,7 @@ pub const EventLoop = struct {
},
.FetchTasklet => {
var fetch_task: *Fetch.FetchTasklet = task.get(Fetch.FetchTasklet).?;
- fetch_task.onDone();
- fetch_task.deinit();
+ fetch_task.onProgressUpdate();
},
@field(Task.Tag, @typeName(AsyncTransformTask)) => {
var transform_task: *AsyncTransformTask = task.get(AsyncTransformTask).?;
diff --git a/src/bun.js/webcore/blob.zig b/src/bun.js/webcore/blob.zig
index df2e17ce4..c794ab59b 100644
--- a/src/bun.js/webcore/blob.zig
+++ b/src/bun.js/webcore/blob.zig
@@ -3794,6 +3794,10 @@ pub const Blob = struct {
} else {
return build.blob.dupe();
}
+ } else if (current.toSliceClone(global)) |sliced| {
+ if (sliced.allocator.get()) |allocator| {
+ return Blob.initWithAllASCII(bun.constStrToU8(sliced.slice()), allocator, global, false);
+ }
}
},
@@ -3886,6 +3890,14 @@ pub const Blob = struct {
could_have_non_ascii = could_have_non_ascii or !(blob.is_all_ascii orelse false);
joiner.append(blob.sharedView(), 0, null);
continue;
+ } else if (current.toSliceClone(global)) |sliced| {
+ const allocator = sliced.allocator.get();
+ could_have_non_ascii = could_have_non_ascii or allocator != null;
+ joiner.append(
+ sliced.slice(),
+ 0,
+ allocator,
+ );
}
},
else => {},
@@ -3900,6 +3912,14 @@ pub const Blob = struct {
if (current.as(Blob)) |blob| {
could_have_non_ascii = could_have_non_ascii or !(blob.is_all_ascii orelse false);
joiner.append(blob.sharedView(), 0, null);
+ } else if (current.toSliceClone(global)) |sliced| {
+ const allocator = sliced.allocator.get();
+ could_have_non_ascii = could_have_non_ascii or allocator != null;
+ joiner.append(
+ sliced.slice(),
+ 0,
+ allocator,
+ );
}
},
diff --git a/src/bun.js/webcore/body.zig b/src/bun.js/webcore/body.zig
index b59125bc6..fa0ec9b24 100644
--- a/src/bun.js/webcore/body.zig
+++ b/src/bun.js/webcore/body.zig
@@ -210,10 +210,24 @@ pub const Body = struct {
/// used in HTTP server to ignore request bodies unless asked for it
onStartBuffering: ?*const fn (ctx: *anyopaque) void = null,
onStartStreaming: ?*const fn (ctx: *anyopaque) JSC.WebCore.DrainResult = null,
+ size_hint: Blob.SizeType = 0,
deinit: bool = false,
action: Action = Action{ .none = {} },
+ /// For Http Client requests
+ /// when Content-Length is provided this represents the whole size of the request
+ /// If chunked encoded this will represent the total received size (ignoring the chunk headers)
+ /// If the size is unknown will be 0
+ fn sizeHint(this: *const PendingValue) Blob.SizeType {
+ if (this.readable) |readable| {
+ if (readable.ptr == .Bytes) {
+ return readable.ptr.Bytes.size_hint;
+ }
+ }
+ return this.size_hint;
+ }
+
pub fn toAnyBlob(this: *PendingValue) ?AnyBlob {
if (this.promise != null)
return null;
@@ -375,6 +389,7 @@ pub const Body = struct {
.Blob => this.Blob.size,
.InternalBlob => @as(Blob.SizeType, @truncate(this.InternalBlob.sliceConst().len)),
.WTFStringImpl => @as(Blob.SizeType, @truncate(this.WTFStringImpl.utf8ByteLength())),
+ .Locked => this.Locked.sizeHint(),
// .InlineBlob => @truncate(Blob.SizeType, this.InlineBlob.sliceConst().len),
else => 0,
};
@@ -382,9 +397,9 @@ pub const Body = struct {
pub fn fastSize(this: *const Value) Blob.SizeType {
return switch (this.*) {
- .Blob => this.Blob.size,
.InternalBlob => @as(Blob.SizeType, @truncate(this.InternalBlob.sliceConst().len)),
.WTFStringImpl => @as(Blob.SizeType, @truncate(this.WTFStringImpl.byteSlice().len)),
+ .Locked => this.Locked.sizeHint(),
// .InlineBlob => @truncate(Blob.SizeType, this.InlineBlob.sliceConst().len),
else => 0,
};
@@ -394,6 +409,7 @@ pub const Body = struct {
return switch (this.*) {
.InternalBlob => this.InternalBlob.sliceConst().len,
.WTFStringImpl => this.WTFStringImpl.byteSlice().len,
+ .Locked => this.Locked.sizeHint(),
// .InlineBlob => this.InlineBlob.sliceConst().len,
else => 0,
};
diff --git a/src/bun.js/webcore/response.zig b/src/bun.js/webcore/response.zig
index 4b15efc0e..01ecfad36 100644
--- a/src/bun.js/webcore/response.zig
+++ b/src/bun.js/webcore/response.zig
@@ -44,6 +44,7 @@ const JSPrinter = bun.js_printer;
const picohttp = @import("root").bun.picohttp;
const StringJoiner = @import("../../string_joiner.zig");
const uws = @import("root").bun.uws;
+const Mutex = @import("../../lock.zig").Lock;
const InlineBlob = JSC.WebCore.InlineBlob;
const AnyBlob = JSC.WebCore.AnyBlob;
@@ -123,6 +124,7 @@ pub const Response = struct {
pub fn writeFormat(this: *Response, comptime Formatter: type, formatter: *Formatter, writer: anytype, comptime enable_ansi_colors: bool) !void {
const Writer = @TypeOf(writer);
try writer.print("Response ({}) {{\n", .{bun.fmt.size(this.body.len())});
+
{
formatter.indent += 1;
defer formatter.indent -|= 1;
@@ -618,11 +620,21 @@ pub const Fetch = struct {
javascript_vm: *VirtualMachine = undefined,
global_this: *JSGlobalObject = undefined,
request_body: HTTPRequestBody = undefined,
+ /// buffer being used by AsyncHTTP
response_buffer: MutableString = undefined,
+ /// buffer used to stream response to JS
+ scheduled_response_buffer: MutableString = undefined,
+ /// response strong ref
+ response: JSC.Strong = .{},
request_headers: Headers = Headers{ .allocator = undefined },
promise: JSC.JSPromise.Strong,
concurrent_task: JSC.ConcurrentTask = .{},
poll_ref: JSC.PollRef = .{},
+ /// For Http Client requests
+ /// when Content-Length is provided this represents the whole size of the request
+ /// If chunked encoded this will represent the total received size (ignoring the chunk headers)
+ /// If is not chunked encoded and Content-Length is not provided this will be unknown
+ body_size: HTTPClient.HTTPClientResult.BodySize = .unknown,
/// This is url + proxy memory buffer and is owned by FetchTasklet
/// We always clone url and proxy (if informed)
@@ -630,11 +642,14 @@ pub const Fetch = struct {
signal: ?*JSC.WebCore.AbortSignal = null,
aborted: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false),
+ has_schedule_callback: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false),
// must be stored because AbortSignal stores reason weakly
abort_reason: JSValue = JSValue.zero,
// Custom Hostname
hostname: ?[]u8 = null,
+ is_waiting_body: bool = false,
+ mutex: Mutex,
tracker: JSC.AsyncTaskTracker,
@@ -691,6 +706,8 @@ pub const Fetch = struct {
this.result.deinitMetadata();
this.response_buffer.deinit();
+ this.response.deinit();
+ this.scheduled_response_buffer.deinit();
this.request_body.detach();
if (this.abort_reason != .zero)
@@ -707,9 +724,122 @@ pub const Fetch = struct {
this.javascript_vm.allocator.destroy(this);
}
- pub fn onDone(this: *FetchTasklet) void {
+ pub fn onBodyReceived(this: *FetchTasklet) void {
+ const success = this.result.isSuccess();
+ const globalThis = this.global_this;
+ defer {
+ if (!success or !this.result.has_more) {
+ var vm = globalThis.bunVM();
+ this.poll_ref.unref(vm);
+ this.clearData();
+ this.deinit();
+ }
+ }
+
+ if (!success) {
+ const err = this.onReject();
+ err.ensureStillAlive();
+ if (this.response.get()) |response_js| {
+ if (response_js.as(Response)) |response| {
+ const body = response.body;
+ if (body.value == .Locked) {
+ if (body.value.Locked.readable) |readable| {
+ readable.ptr.Bytes.onData(
+ .{
+ .err = .{ .JSValue = err },
+ },
+ bun.default_allocator,
+ );
+ return;
+ }
+ }
+
+ response.body.value.toErrorInstance(err, globalThis);
+ return;
+ }
+ }
+
+ globalThis.throwValue(err);
+ return;
+ }
+
+ if (this.response.get()) |response_js| {
+ if (response_js.as(Response)) |response| {
+ const body = response.body;
+ if (body.value == .Locked) {
+ if (body.value.Locked.readable) |readable| {
+ if (readable.ptr == .Bytes) {
+ readable.ptr.Bytes.size_hint = this.getSizeHint();
+
+ var scheduled_response_buffer = this.scheduled_response_buffer.list;
+
+ const chunk = scheduled_response_buffer.items;
+
+ if (this.result.has_more) {
+ readable.ptr.Bytes.onData(
+ .{
+ .temporary = bun.ByteList.initConst(chunk),
+ },
+ bun.default_allocator,
+ );
+
+ // clean for reuse later
+ this.scheduled_response_buffer.reset();
+ } else {
+ readable.ptr.Bytes.onData(
+ .{
+ .temporary_and_done = bun.ByteList.initConst(chunk),
+ },
+ bun.default_allocator,
+ );
+ }
+
+ return;
+ }
+ } else {
+ response.body.value.Locked.size_hint = this.getSizeHint();
+ }
+ // we will reach here when not streaming
+ if (!this.result.has_more) {
+ var scheduled_response_buffer = this.scheduled_response_buffer.list;
+
+ // done resolve body
+ var old = body.value;
+ var body_value = Body.Value{
+ .InternalBlob = .{
+ .bytes = scheduled_response_buffer.toManaged(bun.default_allocator),
+ },
+ };
+ response.body.value = body_value;
+
+ this.scheduled_response_buffer = .{
+ .allocator = bun.default_allocator,
+ .list = .{
+ .items = &.{},
+ .capacity = 0,
+ },
+ };
+
+ if (old == .Locked) {
+ old.resolve(&response.body.value, this.global_this);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ pub fn onProgressUpdate(this: *FetchTasklet) void {
JSC.markBinding(@src());
+ this.mutex.lock();
+ defer {
+ this.has_schedule_callback.store(false, .Monotonic);
+ this.mutex.unlock();
+ }
+ if (this.is_waiting_body) {
+ return this.onBodyReceived();
+ }
const globalThis = this.global_this;
var ref = this.promise;
@@ -718,13 +848,22 @@ pub const Fetch = struct {
var poll_ref = this.poll_ref;
var vm = globalThis.bunVM();
- defer poll_ref.unref(vm);
if (promise_value.isEmptyOrUndefinedOrNull()) {
+ poll_ref.unref(vm);
this.clearData();
+ this.deinit();
return;
}
+ defer {
+ if (!this.is_waiting_body) {
+ poll_ref.unref(vm);
+ this.clearData();
+ this.deinit();
+ }
+ }
+
const promise = promise_value.asAnyPromise().?;
const tracker = this.tracker;
tracker.willDispatch(globalThis);
@@ -735,7 +874,6 @@ pub const Fetch = struct {
false => this.onReject(),
};
result.ensureStillAlive();
- this.clearData();
promise_value.ensureStillAlive();
@@ -784,25 +922,77 @@ pub const Fetch = struct {
return fetch_error.toErrorInstance(this.global_this);
}
- fn toBodyValue(this: *FetchTasklet) Body.Value {
- var response_buffer = this.response_buffer.list;
- this.response_buffer = .{
- .allocator = default_allocator,
- .list = .{
- .items = &.{},
- .capacity = 0,
- },
+ pub fn onStartStreamingRequestBodyCallback(ctx: *anyopaque) JSC.WebCore.DrainResult {
+ const this = bun.cast(*FetchTasklet, ctx);
+ if (this.http) |http| {
+ http.enableBodyStreaming();
+ }
+ if (this.aborted.load(.Acquire)) {
+ return JSC.WebCore.DrainResult{
+ .aborted = {},
+ };
+ }
+
+ this.mutex.lock();
+ defer this.mutex.unlock();
+ const size_hint = this.getSizeHint();
+
+ var scheduled_response_buffer = this.scheduled_response_buffer.list;
+ // This means we have received part of the body but not the whole thing
+ if (scheduled_response_buffer.items.len > 0) {
+ this.scheduled_response_buffer = .{
+ .allocator = default_allocator,
+ .list = .{
+ .items = &.{},
+ .capacity = 0,
+ },
+ };
+
+ return .{
+ .owned = .{
+ .list = scheduled_response_buffer.toManaged(bun.default_allocator),
+ .size_hint = size_hint,
+ },
+ };
+ }
+
+ return .{
+ .estimated_size = size_hint,
};
+ }
+
+ fn getSizeHint(this: *FetchTasklet) Blob.SizeType {
+ return switch (this.body_size) {
+ .content_length => @truncate(this.body_size.content_length),
+ .total_received => @truncate(this.body_size.total_received),
+ else => 0,
+ };
+ }
- // if (response_buffer.items.len < InlineBlob.available_bytes) {
- // const inline_blob = InlineBlob.init(response_buffer.items);
- // defer response_buffer.deinit(bun.default_allocator);
- // return .{ .InlineBlob = inline_blob };
- // }
+ fn toBodyValue(this: *FetchTasklet) Body.Value {
+ if (this.is_waiting_body) {
+ const response = Body.Value{
+ .Locked = .{
+ .size_hint = this.getSizeHint(),
+ .task = this,
+ .global = this.global_this,
+ .onStartStreaming = FetchTasklet.onStartStreamingRequestBodyCallback,
+ },
+ };
+ return response;
+ }
+ var scheduled_response_buffer = this.scheduled_response_buffer.list;
const response = Body.Value{
.InternalBlob = .{
- .bytes = response_buffer.toManaged(bun.default_allocator),
+ .bytes = scheduled_response_buffer.toManaged(bun.default_allocator),
+ },
+ };
+ this.scheduled_response_buffer = .{
+ .allocator = default_allocator,
+ .list = .{
+ .items = &.{},
+ .capacity = 0,
},
};
@@ -811,6 +1001,7 @@ pub const Fetch = struct {
fn toResponse(this: *FetchTasklet, allocator: std.mem.Allocator) Response {
const http_response = this.result.response;
+ this.is_waiting_body = this.result.has_more;
return Response{
.allocator = allocator,
.url = bun.String.createAtomIfPossible(this.result.href),
@@ -830,7 +1021,10 @@ pub const Fetch = struct {
const allocator = bun.default_allocator;
var response = allocator.create(Response) catch unreachable;
response.* = this.toResponse(allocator);
- return Response.makeMaybePooled(@as(js.JSContextRef, @ptrCast(this.global_this)), response);
+ const response_js = Response.makeMaybePooled(@as(js.JSContextRef, this.global_this), response);
+ response_js.ensureStillAlive();
+ this.response = JSC.Strong.create(response_js, this.global_this);
+ return response_js;
}
pub fn get(
@@ -843,6 +1037,14 @@ pub const Fetch = struct {
var fetch_tasklet = try jsc_vm.allocator.create(FetchTasklet);
fetch_tasklet.* = .{
+ .mutex = Mutex.init(),
+ .scheduled_response_buffer = .{
+ .allocator = bun.default_allocator,
+ .list = .{
+ .items = &.{},
+ .capacity = 0,
+ },
+ },
.response_buffer = MutableString{
.allocator = bun.default_allocator,
.list = .{
@@ -905,6 +1107,8 @@ pub const Fetch = struct {
fetch_tasklet.http.?.client.disable_timeout = fetch_options.disable_timeout;
fetch_tasklet.http.?.client.verbose = fetch_options.verbose;
fetch_tasklet.http.?.client.disable_keepalive = fetch_options.disable_keepalive;
+ // we wanna to return after headers are received
+ fetch_tasklet.http.?.signalHeaderProgress();
if (fetch_tasklet.request_body == .Sendfile) {
std.debug.assert(fetch_options.url.isHTTP());
@@ -973,8 +1177,26 @@ pub const Fetch = struct {
}
pub fn callback(task: *FetchTasklet, result: HTTPClient.HTTPClientResult) void {
- task.response_buffer = result.body.?.*;
+ task.mutex.lock();
+ defer task.mutex.unlock();
task.result = result;
+ task.body_size = result.body_size;
+
+ const success = result.isSuccess();
+ task.response_buffer = result.body.?.*;
+
+ if (success) {
+ _ = task.scheduled_response_buffer.write(task.response_buffer.list.items) catch @panic("OOM");
+ }
+
+ // reset for reuse
+ task.response_buffer.reset();
+
+ if (task.has_schedule_callback.compareAndSwap(false, true, .Acquire, .Monotonic)) |has_schedule_callback| {
+ if (has_schedule_callback) {
+ return;
+ }
+ }
task.javascript_vm.eventLoop().enqueueTaskConcurrent(task.concurrent_task.from(task, .manual_deinit));
}
};
diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig
index be6942392..955d10ffb 100644
--- a/src/bun.js/webcore/streams.zig
+++ b/src/bun.js/webcore/streams.zig
@@ -525,9 +525,16 @@ pub const StreamResult = union(Tag) {
into_array: IntoArray,
into_array_and_done: IntoArray,
pending: *Pending,
- err: Syscall.Error,
+
+ err: union(Err) { Error: Syscall.Error, JSValue: JSC.JSValue },
+
done: void,
+ pub const Err = enum {
+ Error,
+ JSValue,
+ };
+
pub const Tag = enum {
owned,
owned_and_done,
@@ -757,7 +764,14 @@ pub const StreamResult = union(Tag) {
promise.asValue(globalThis).unprotect();
switch (result) {
.err => |err| {
- promise.reject(globalThis, err.toJSC(globalThis));
+ if (err == .Error) {
+ promise.reject(globalThis, err.Error.toJSC(globalThis));
+ } else {
+ const js_err = err.JSValue;
+ js_err.ensureStillAlive();
+ js_err.unprotect();
+ promise.reject(globalThis, js_err);
+ }
},
.done => {
promise.resolve(globalThis, JSValue.jsBoolean(false));
@@ -803,7 +817,13 @@ pub const StreamResult = union(Tag) {
},
.err => |err| {
- return JSC.JSPromise.rejectedPromise(globalThis, JSValue.c(err.toJS(globalThis))).asValue(globalThis);
+ if (err == .Error) {
+ return JSC.JSPromise.rejectedPromise(globalThis, JSValue.c(err.Error.toJS(globalThis))).asValue(globalThis);
+ }
+ const js_err = err.JSValue;
+ js_err.ensureStillAlive();
+ js_err.unprotect();
+ return JSC.JSPromise.rejectedPromise(globalThis, js_err).asValue(globalThis);
},
// false == controller.close()
@@ -2380,6 +2400,9 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
return true;
} else {
this.has_backpressure = !this.res.write(buf);
+ if (this.has_backpressure) {
+ this.res.onWritable(*@This(), onWritable, this);
+ }
return true;
}
@@ -2986,7 +3009,14 @@ pub fn ReadableStreamSource(
pub fn processResult(globalThis: *JSGlobalObject, flags: JSValue, result: StreamResult) JSC.JSValue {
switch (result) {
.err => |err| {
- globalThis.vm().throwError(globalThis, err.toJSC(globalThis));
+ if (err == .Error) {
+ globalThis.vm().throwError(globalThis, err.Error.toJSC(globalThis));
+ } else {
+ const js_err = err.JSValue;
+ js_err.ensureStillAlive();
+ js_err.unprotect();
+ globalThis.vm().throwError(globalThis, js_err);
+ }
return JSValue.jsUndefined();
},
.temporary_and_done, .owned_and_done, .into_array_and_done => {
@@ -3301,12 +3331,29 @@ pub const ByteStream = struct {
if (is_really_done) {
this.done = true;
- this.pending.result = .{
- .into_array_and_done = .{
- .value = this.value(),
- .len = @as(Blob.SizeType, @truncate(to_copy.len)),
- },
- };
+
+ if (to_copy.len == 0) {
+ if (stream == .err) {
+ if (stream.err == .Error) {
+ this.pending.result = .{ .err = .{ .Error = stream.err.Error } };
+ }
+ const js_err = stream.err.JSValue;
+ js_err.ensureStillAlive();
+ js_err.protect();
+ this.pending.result = .{ .err = .{ .JSValue = js_err } };
+ } else {
+ this.pending.result = .{
+ .done = {},
+ };
+ }
+ } else {
+ this.pending.result = .{
+ .into_array_and_done = .{
+ .value = this.value(),
+ .len = @as(Blob.SizeType, @truncate(to_copy.len)),
+ },
+ };
+ }
} else {
this.pending.result = .{
.into_array = .{
@@ -3488,7 +3535,7 @@ pub const ReadResult = union(enum) {
pub fn toStreamWithIsDone(this: ReadResult, pending: *StreamResult.Pending, buf: []u8, view: JSValue, close_on_empty: bool, is_done: bool) StreamResult {
return switch (this) {
.pending => .{ .pending = pending },
- .err => .{ .err = this.err },
+ .err => .{ .err = .{ .Error = this.err } },
.done => .{ .done = {} },
.read => |slice| brk: {
const owned = slice.ptr != buf.ptr;
@@ -4064,17 +4111,21 @@ pub const File = struct {
this.concurrent.read = @as(Blob.SizeType, @truncate(result catch |err| {
if (@hasField(HTTPClient.NetworkThread.Completion, "result")) {
this.pending.result = .{
- .err = Syscall.Error{
- .errno = @as(Syscall.Error.Int, @intCast(-completion.result)),
- .syscall = .read,
+ .err = .{
+ .Error = Syscall.Error{
+ .errno = @as(Syscall.Error.Int, @intCast(-completion.result)),
+ .syscall = .read,
+ },
},
};
} else {
this.pending.result = .{
- .err = Syscall.Error{
- // this is too hacky
- .errno = @as(Syscall.Error.Int, @truncate(@as(u16, @intCast(@max(1, @intFromError(err)))))),
- .syscall = .read,
+ .err = .{
+ .Error = Syscall.Error{
+ // this is too hacky
+ .errno = @as(Syscall.Error.Int, @truncate(@as(u16, @intCast(@max(1, @intFromError(err)))))),
+ .syscall = .read,
+ },
},
};
}
@@ -4101,7 +4152,7 @@ pub const File = struct {
else => {},
}
- this.pending.result = .{ .err = err };
+ this.pending.result = .{ .err = .{ .Error = err } };
scheduleMainThreadTask(this);
return;
},