aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js/webcore/streams.zig
diff options
context:
space:
mode:
Diffstat (limited to 'src/bun.js/webcore/streams.zig')
-rw-r--r--src/bun.js/webcore/streams.zig89
1 files changed, 70 insertions, 19 deletions
diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig
index be6942392..955d10ffb 100644
--- a/src/bun.js/webcore/streams.zig
+++ b/src/bun.js/webcore/streams.zig
@@ -525,9 +525,16 @@ pub const StreamResult = union(Tag) {
into_array: IntoArray,
into_array_and_done: IntoArray,
pending: *Pending,
- err: Syscall.Error,
+
+ err: union(Err) { Error: Syscall.Error, JSValue: JSC.JSValue },
+
done: void,
+ pub const Err = enum {
+ Error,
+ JSValue,
+ };
+
pub const Tag = enum {
owned,
owned_and_done,
@@ -757,7 +764,14 @@ pub const StreamResult = union(Tag) {
promise.asValue(globalThis).unprotect();
switch (result) {
.err => |err| {
- promise.reject(globalThis, err.toJSC(globalThis));
+ if (err == .Error) {
+ promise.reject(globalThis, err.Error.toJSC(globalThis));
+ } else {
+ const js_err = err.JSValue;
+ js_err.ensureStillAlive();
+ js_err.unprotect();
+ promise.reject(globalThis, js_err);
+ }
},
.done => {
promise.resolve(globalThis, JSValue.jsBoolean(false));
@@ -803,7 +817,13 @@ pub const StreamResult = union(Tag) {
},
.err => |err| {
- return JSC.JSPromise.rejectedPromise(globalThis, JSValue.c(err.toJS(globalThis))).asValue(globalThis);
+ if (err == .Error) {
+ return JSC.JSPromise.rejectedPromise(globalThis, JSValue.c(err.Error.toJS(globalThis))).asValue(globalThis);
+ }
+ const js_err = err.JSValue;
+ js_err.ensureStillAlive();
+ js_err.unprotect();
+ return JSC.JSPromise.rejectedPromise(globalThis, js_err).asValue(globalThis);
},
// false == controller.close()
@@ -2380,6 +2400,9 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
return true;
} else {
this.has_backpressure = !this.res.write(buf);
+ if (this.has_backpressure) {
+ this.res.onWritable(*@This(), onWritable, this);
+ }
return true;
}
@@ -2986,7 +3009,14 @@ pub fn ReadableStreamSource(
pub fn processResult(globalThis: *JSGlobalObject, flags: JSValue, result: StreamResult) JSC.JSValue {
switch (result) {
.err => |err| {
- globalThis.vm().throwError(globalThis, err.toJSC(globalThis));
+ if (err == .Error) {
+ globalThis.vm().throwError(globalThis, err.Error.toJSC(globalThis));
+ } else {
+ const js_err = err.JSValue;
+ js_err.ensureStillAlive();
+ js_err.unprotect();
+ globalThis.vm().throwError(globalThis, js_err);
+ }
return JSValue.jsUndefined();
},
.temporary_and_done, .owned_and_done, .into_array_and_done => {
@@ -3301,12 +3331,29 @@ pub const ByteStream = struct {
if (is_really_done) {
this.done = true;
- this.pending.result = .{
- .into_array_and_done = .{
- .value = this.value(),
- .len = @as(Blob.SizeType, @truncate(to_copy.len)),
- },
- };
+
+ if (to_copy.len == 0) {
+ if (stream == .err) {
+ if (stream.err == .Error) {
+ this.pending.result = .{ .err = .{ .Error = stream.err.Error } };
+ }
+ const js_err = stream.err.JSValue;
+ js_err.ensureStillAlive();
+ js_err.protect();
+ this.pending.result = .{ .err = .{ .JSValue = js_err } };
+ } else {
+ this.pending.result = .{
+ .done = {},
+ };
+ }
+ } else {
+ this.pending.result = .{
+ .into_array_and_done = .{
+ .value = this.value(),
+ .len = @as(Blob.SizeType, @truncate(to_copy.len)),
+ },
+ };
+ }
} else {
this.pending.result = .{
.into_array = .{
@@ -3488,7 +3535,7 @@ pub const ReadResult = union(enum) {
pub fn toStreamWithIsDone(this: ReadResult, pending: *StreamResult.Pending, buf: []u8, view: JSValue, close_on_empty: bool, is_done: bool) StreamResult {
return switch (this) {
.pending => .{ .pending = pending },
- .err => .{ .err = this.err },
+ .err => .{ .err = .{ .Error = this.err } },
.done => .{ .done = {} },
.read => |slice| brk: {
const owned = slice.ptr != buf.ptr;
@@ -4064,17 +4111,21 @@ pub const File = struct {
this.concurrent.read = @as(Blob.SizeType, @truncate(result catch |err| {
if (@hasField(HTTPClient.NetworkThread.Completion, "result")) {
this.pending.result = .{
- .err = Syscall.Error{
- .errno = @as(Syscall.Error.Int, @intCast(-completion.result)),
- .syscall = .read,
+ .err = .{
+ .Error = Syscall.Error{
+ .errno = @as(Syscall.Error.Int, @intCast(-completion.result)),
+ .syscall = .read,
+ },
},
};
} else {
this.pending.result = .{
- .err = Syscall.Error{
- // this is too hacky
- .errno = @as(Syscall.Error.Int, @truncate(@as(u16, @intCast(@max(1, @intFromError(err)))))),
- .syscall = .read,
+ .err = .{
+ .Error = Syscall.Error{
+ // this is too hacky
+ .errno = @as(Syscall.Error.Int, @truncate(@as(u16, @intCast(@max(1, @intFromError(err)))))),
+ .syscall = .read,
+ },
},
};
}
@@ -4101,7 +4152,7 @@ pub const File = struct {
else => {},
}
- this.pending.result = .{ .err = err };
+ this.pending.result = .{ .err = .{ .Error = err } };
scheduleMainThreadTask(this);
return;
},