aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2022-10-03 01:09:16 -0700
committerGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2022-10-03 01:09:16 -0700
commit47007621326c5537b9d5d2a11eaa7dc086fc6a82 (patch)
treef86c78f714ec9942b32c8ebf9326843590eeeb30
parent08d606c3d7ff5accd881b016d51291af54164f06 (diff)
downloadbun-47007621326c5537b9d5d2a11eaa7dc086fc6a82.tar.gz
bun-47007621326c5537b9d5d2a11eaa7dc086fc6a82.tar.zst
bun-47007621326c5537b9d5d2a11eaa7dc086fc6a82.zip
`Bun.spawn` start to implement support for buffered input (ArrayBuffer, Response, Request body)
-rw-r--r--src/bun.js/api/bun.classes.ts8
-rw-r--r--src/bun.js/api/bun.zig331
-rw-r--r--src/bun.js/base.zig20
-rw-r--r--src/bun.js/builtins/js/ReadableStreamInternals.js3
-rw-r--r--src/bun.js/event_loop.zig3
-rw-r--r--src/bun.js/webcore/response.zig50
-rw-r--r--src/bun.js/webcore/streams.zig87
7 files changed, 384 insertions, 118 deletions
diff --git a/src/bun.js/api/bun.classes.ts b/src/bun.js/api/bun.classes.ts
index 5a6ae47cc..bbb1df944 100644
--- a/src/bun.js/api/bun.classes.ts
+++ b/src/bun.js/api/bun.classes.ts
@@ -19,6 +19,14 @@ export default [
getter: "getStdout",
cache: true,
},
+ writable: {
+ getter: "getStdin",
+ cache: true,
+ },
+ readable: {
+ getter: "getStdout",
+ cache: true,
+ },
stderr: {
getter: "getStderr",
cache: true,
diff --git a/src/bun.js/api/bun.zig b/src/bun.js/api/bun.zig
index 1171d804e..e1df32bc3 100644
--- a/src/bun.js/api/bun.zig
+++ b/src/bun.js/api/bun.zig
@@ -3514,6 +3514,7 @@ pub const JSZlib = struct {
};
pub const Subprocess = struct {
+ const log = Output.scoped(.Subprocess, true);
pub usingnamespace JSC.Codegen.JSSubprocess;
pid: std.os.pid_t,
@@ -3569,7 +3570,7 @@ pub const Subprocess = struct {
ignore: void,
closed: void,
- pub fn init(stdio: std.meta.Tag(Stdio), fd: i32, globalThis: *JSC.JSGlobalObject) Readable {
+ pub fn init(stdio: Stdio, fd: i32, globalThis: *JSC.JSGlobalObject) Readable {
return switch (stdio) {
.inherit => Readable{ .inherit = {} },
.ignore => Readable{ .ignore = {} },
@@ -3582,7 +3583,8 @@ pub const Subprocess = struct {
out.ptr.File.stored_global_this_ = globalThis;
break :brk Readable{ .pipe = out };
},
- .callback, .fd, .path, .blob => Readable{ .fd = @intCast(JSC.Node.FileDescriptor, fd) },
+ .path, .blob, .fd => Readable{ .fd = @intCast(JSC.Node.FileDescriptor, fd) },
+ else => unreachable,
};
}
@@ -3653,32 +3655,43 @@ pub const Subprocess = struct {
return JSValue.jsUndefined();
}
+ switch (this.tryKill(sig)) {
+ .result => {},
+ .err => |err| {
+ globalThis.throwValue(err.toJSC(globalThis));
+ return JSValue.jsUndefined();
+ },
+ }
+
+ return JSValue.jsUndefined();
+ }
+
+ pub fn tryKill(this: *Subprocess, sig: i32) JSC.Node.Maybe(void) {
if (this.killed) {
- return JSValue.jsUndefined();
+ return .{ .result = {} };
}
if (comptime Environment.isLinux) {
// should this be handled differently?
// this effectively shouldn't happen
if (this.pidfd == std.math.maxInt(std.os.fd_t)) {
- return JSValue.jsUndefined();
+ return .{ .result = {} };
}
// first appeared in Linux 5.1
const rc = std.os.linux.pidfd_send_signal(this.pidfd, @intCast(u8, sig), null, 0);
if (rc != 0) {
- globalThis.throwValue(JSC.Node.Syscall.Error.fromCode(std.os.linux.getErrno(rc), .kill).toJSC(globalThis));
- return JSValue.jsUndefined();
+ return .{ .err = JSC.Node.Syscall.Error.fromCode(std.os.linux.getErrno(rc), .kill) };
}
} else {
const err = std.c.kill(this.pid, sig);
if (err != 0) {
- return JSC.Node.Syscall.Error.fromCode(std.c.getErrno(err), .kill).toJSC(globalThis);
+ return .{ .err = JSC.Node.Syscall.Error.fromCode(std.c.getErrno(err), .kill) };
}
}
- return JSValue.jsUndefined();
+ return .{ .result = {} };
}
pub fn onKill(
@@ -3701,11 +3714,13 @@ pub const Subprocess = struct {
}
if (this.stdout == .pipe) {
- this.stdout.pipe.cancel(this.globalThis);
+ if (this.stdout.pipe.isDisturbed(this.globalThis))
+ this.stdout.pipe.cancel(this.globalThis);
}
if (this.stderr == .pipe) {
- this.stderr.pipe.cancel(this.globalThis);
+ if (this.stderr.pipe.isDisturbed(this.globalThis))
+ this.stderr.pipe.cancel(this.globalThis);
}
this.stdin.close();
@@ -3737,15 +3752,112 @@ pub const Subprocess = struct {
return JSValue.jsBoolean(this.killed);
}
+ pub const BufferedInput = struct {
+ remain: []const u8 = "",
+ fd: JSC.Node.FileDescriptor = std.math.maxInt(JSC.Node.FileDescriptor),
+ poll_ref: JSC.PollRef = .{},
+ written: usize = 0,
+
+ source: union(enum) {
+ blob: JSC.WebCore.AnyBlob,
+ array_buffer: JSC.ArrayBuffer.Strong,
+ },
+
+ pub usingnamespace JSC.WebCore.NewReadyWatcher(BufferedInput, .write, onReady);
+
+ pub fn onReady(this: *BufferedInput, size_or_offset: i64) void {
+ this.write(@intCast(usize, @maximum(size_or_offset, 0)));
+ }
+
+ pub fn write(this: *BufferedInput, _: usize) void {
+ var to_write = this.remain;
+
+ if (to_write.len == 0) {
+ if (this.poll_ref.isActive()) this.unwatch(this.fd);
+ // we are done!
+ this.closeFDIfOpen();
+ return;
+ }
+
+ while (to_write.len > 0) {
+ switch (JSC.Node.Syscall.write(this.fd, to_write)) {
+ .err => |e| {
+ if (e.isRetry()) {
+ log("write({d}) retry", .{
+ to_write.len,
+ });
+
+ this.watch(this.fd);
+ return;
+ }
+
+ // fail
+ log("write({d}) fail: {d}", .{ to_write.len, e.errno });
+ this.deinit();
+ return;
+ },
+
+ .result => |bytes_written| {
+ this.written += bytes_written;
+
+ log(
+ "write({d}) {d}",
+ .{
+ to_write.len,
+ bytes_written,
+ },
+ );
+
+ this.remain = this.remain[@minimum(bytes_written, this.remain.len)..];
+ to_write = to_write[bytes_written..];
+
+ // we are done or it accepts no more input
+ if (this.remain.len == 0 or bytes_written == 0) {
+ this.deinit();
+ return;
+ }
+ },
+ }
+ }
+ }
+
+ fn closeFDIfOpen(this: *BufferedInput) void {
+ if (this.poll_ref.isActive()) this.unwatch(this.fd);
+
+ if (this.fd != std.math.maxInt(JSC.Node.FileDescriptor)) {
+ _ = JSC.Node.Syscall.close(this.fd);
+ this.fd = std.math.maxInt(JSC.Node.FileDescriptor);
+ }
+ }
+
+ pub fn deinit(this: *BufferedInput) void {
+ this.closeFDIfOpen();
+
+ switch (this.source) {
+ .blob => |*blob| {
+ blob.detach();
+ },
+ .array_buffer => |*array_buffer| {
+ array_buffer.deinit();
+ },
+ }
+ }
+ };
+
const Writable = union(enum) {
pipe: *JSC.WebCore.FileSink,
+ pipe_to_readable_stream: struct {
+ pipe: *JSC.WebCore.FileSink,
+ readable_stream: JSC.WebCore.ReadableStream,
+ },
fd: JSC.Node.FileDescriptor,
+ buffered_input: BufferedInput,
inherit: void,
ignore: void,
- pub fn init(stdio: std.meta.Tag(Stdio), fd: i32, globalThis: *JSC.JSGlobalObject) !Writable {
+ pub fn init(stdio: Stdio, fd: i32, globalThis: *JSC.JSGlobalObject) !Writable {
switch (stdio) {
- .path, .pipe, .callback => {
+ .path, .pipe => {
var sink = try globalThis.bunVM().allocator.create(JSC.WebCore.FileSink);
sink.* = .{
.fd = fd,
@@ -3753,9 +3865,33 @@ pub const Subprocess = struct {
.allocator = globalThis.bunVM().allocator,
};
+ if (stdio == .pipe) {
+ if (stdio.pipe) |readable| {
+ return Writable{
+ .pipe_to_readable_stream = .{
+ .pipe = sink,
+ .readable_stream = readable,
+ },
+ };
+ }
+ }
+
return Writable{ .pipe = sink };
},
- .blob, .fd => {
+ .array_buffer, .blob => {
+ var buffered_input: BufferedInput = .{ .fd = fd, .source = undefined };
+ switch (stdio) {
+ .array_buffer => |array_buffer| {
+ buffered_input.source = .{ .array_buffer = array_buffer };
+ },
+ .blob => |blob| {
+ buffered_input.source = .{ .blob = blob };
+ },
+ else => unreachable,
+ }
+ return Writable{ .buffered_input = buffered_input };
+ },
+ .fd => {
return Writable{ .fd = @intCast(JSC.Node.FileDescriptor, fd) };
},
.inherit => {
@@ -3773,6 +3909,8 @@ pub const Subprocess = struct {
.fd => |fd| JSValue.jsNumber(fd),
.ignore => JSValue.jsUndefined(),
.inherit => JSValue.jsUndefined(),
+ .buffered_input => JSValue.jsUndefined(),
+ .pipe_to_readable_stream => this.pipe_to_readable_stream.readable_stream.value,
};
}
@@ -3781,9 +3919,15 @@ pub const Subprocess = struct {
.pipe => |pipe| {
_ = pipe.end(null);
},
+ .pipe_to_readable_stream => |*pipe_to_readable_stream| {
+ _ = pipe_to_readable_stream.pipe.end(null);
+ },
.fd => |fd| {
_ = JSC.Node.Syscall.close(fd);
},
+ .buffered_input => {
+ this.buffered_input.deinit();
+ },
.ignore => {},
.inherit => {},
};
@@ -3841,7 +3985,7 @@ pub const Subprocess = struct {
var stdio = [3]Stdio{
.{ .ignore = .{} },
.{ .inherit = .{} },
- .{ .pipe = .{} },
+ .{ .pipe = null },
};
var PATH = globalThis.bunVM().bundler.env.get("PATH") orelse "";
@@ -4007,19 +4151,19 @@ pub const Subprocess = struct {
-1,
);
- const stdin_pipe = if (stdio[0].isPiped()) os.pipe2(os.O.NONBLOCK) catch |err| {
+ const stdin_pipe = if (stdio[0].isPiped()) os.pipe2(0) catch |err| {
globalThis.throw("failed to create stdin pipe: {s}", .{err});
return JSValue.jsUndefined();
} else undefined;
errdefer if (stdio[0].isPiped()) destroyPipe(stdin_pipe);
- const stdout_pipe = if (stdio[1].isPiped()) os.pipe2(os.O.NONBLOCK) catch |err| {
+ const stdout_pipe = if (stdio[1].isPiped()) os.pipe2(0) catch |err| {
globalThis.throw("failed to create stdout pipe: {s}", .{err});
return JSValue.jsUndefined();
} else undefined;
errdefer if (stdio[1].isPiped()) destroyPipe(stdout_pipe);
- const stderr_pipe = if (stdio[2].isPiped()) os.pipe2(os.O.NONBLOCK) catch |err| {
+ const stderr_pipe = if (stdio[2].isPiped()) os.pipe2(0) catch |err| {
globalThis.throw("failed to create stderr pipe: {s}", .{err});
return JSValue.jsUndefined();
} else undefined;
@@ -4096,6 +4240,10 @@ pub const Subprocess = struct {
}
};
+ // set non-blocking stdin
+ if (stdio[0].isPiped())
+ _ = std.os.fcntl(stdin_pipe[1], std.os.F.SETFL, std.os.O.NONBLOCK) catch 0;
+
var subprocess = globalThis.allocator().create(Subprocess) catch {
globalThis.throw("out of memory", .{});
return JSValue.jsUndefined();
@@ -4105,12 +4253,12 @@ pub const Subprocess = struct {
.globalThis = globalThis,
.pid = pid,
.pidfd = pidfd,
- .stdin = Writable.init(std.meta.activeTag(stdio[std.os.STDIN_FILENO]), stdin_pipe[1], globalThis) catch {
+ .stdin = Writable.init(stdio[std.os.STDIN_FILENO], stdin_pipe[1], globalThis) catch {
globalThis.throw("out of memory", .{});
return JSValue.jsUndefined();
},
- .stdout = Readable.init(std.meta.activeTag(stdio[std.os.STDOUT_FILENO]), stdout_pipe[0], globalThis),
- .stderr = Readable.init(std.meta.activeTag(stdio[std.os.STDERR_FILENO]), stderr_pipe[0], globalThis),
+ .stdout = Readable.init(stdio[std.os.STDOUT_FILENO], stdout_pipe[0], globalThis),
+ .stderr = Readable.init(stdio[std.os.STDERR_FILENO], stderr_pipe[0], globalThis),
};
subprocess.this_jsvalue = subprocess.toJS(globalThis);
@@ -4134,6 +4282,14 @@ pub const Subprocess = struct {
},
}
+ if (subprocess.stdin == .buffered_input) {
+ subprocess.stdin.buffered_input.remain = switch (subprocess.stdin.buffered_input.source) {
+ .blob => subprocess.stdin.buffered_input.source.blob.slice(),
+ .array_buffer => |array_buffer| array_buffer.slice(),
+ };
+ subprocess.stdin.buffered_input.write(0);
+ }
+
return subprocess.this_jsvalue;
}
@@ -4206,13 +4362,13 @@ pub const Subprocess = struct {
ignore: void,
fd: JSC.Node.FileDescriptor,
path: JSC.Node.PathLike,
- blob: JSC.WebCore.Blob,
- pipe: void,
- callback: JSC.JSValue,
+ blob: JSC.WebCore.AnyBlob,
+ pipe: ?JSC.WebCore.ReadableStream,
+ array_buffer: JSC.ArrayBuffer.Strong,
pub fn isPiped(self: Stdio) bool {
return switch (self) {
- .blob, .callback, .pipe => true,
+ .array_buffer, .blob, .pipe => true,
else => false,
};
}
@@ -4225,8 +4381,10 @@ pub const Subprocess = struct {
_: i32,
) !void {
switch (stdio) {
- .blob, .callback, .pipe => {
+ .array_buffer, .blob, .pipe => {
+ std.debug.assert(!(stdio == .blob and stdio.blob.needsToReadFile()));
const idx: usize = if (std_fileno == 0) 0 else 1;
+
try actions.dup2(pipe_fd[idx], std_fileno);
try actions.close(pipe_fd[1 - idx]);
},
@@ -4252,6 +4410,50 @@ pub const Subprocess = struct {
}
};
+ fn extractStdioBlob(
+ globalThis: *JSC.JSGlobalObject,
+ blob: JSC.WebCore.AnyBlob,
+ i: usize,
+ stdio_array: []Stdio,
+ ) bool {
+ if (blob.needsToReadFile()) {
+ if (blob.store()) |store| {
+ if (store.data.file.pathlike == .fd) {
+ if (store.data.file.pathlike.fd == @intCast(JSC.Node.FileDescriptor, i)) {
+ stdio_array[i] = Stdio{ .inherit = {} };
+ } else {
+ switch (@intCast(std.os.fd_t, i)) {
+ std.os.STDIN_FILENO => {
+ if (i == std.os.STDERR_FILENO or i == std.os.STDOUT_FILENO) {
+ globalThis.throwInvalidArguments("stdin cannot be used for stdout or stderr", .{});
+ return false;
+ }
+ },
+
+ std.os.STDOUT_FILENO, std.os.STDERR_FILENO => {
+ if (i == std.os.STDIN_FILENO) {
+ globalThis.throwInvalidArguments("stdout and stderr cannot be used for stdin", .{});
+ return false;
+ }
+ },
+ else => {},
+ }
+
+ stdio_array[i] = Stdio{ .fd = store.data.file.pathlike.fd };
+ }
+
+ return true;
+ }
+
+ stdio_array[i] = .{ .path = store.data.file.pathlike.path };
+ return true;
+ }
+ }
+
+ stdio_array[i] = .{ .blob = blob };
+ return true;
+ }
+
fn extractStdio(
globalThis: *JSC.JSGlobalObject,
i: usize,
@@ -4269,7 +4471,7 @@ pub const Subprocess = struct {
} else if (str.eqlComptime("ignore")) {
stdio_array[i] = Stdio{ .ignore = {} };
} else if (str.eqlComptime("pipe")) {
- stdio_array[i] = Stdio{ .pipe = {} };
+ stdio_array[i] = Stdio{ .pipe = null };
} else {
globalThis.throwInvalidArguments("stdio must be an array of 'inherit', 'ignore', or null", .{});
return false;
@@ -4306,53 +4508,48 @@ pub const Subprocess = struct {
return true;
} else if (value.as(JSC.WebCore.Blob)) |blob| {
- var store = blob.store orelse {
- globalThis.throwInvalidArguments("Blob is detached (in stdio)", .{});
- return false;
- };
-
- if (i == std.os.STDIN_FILENO and store.data == .bytes) {
- stdio_array[i] = .{ .blob = blob.dupe() };
- return true;
- }
-
- if (store.data != .file) {
- globalThis.throwInvalidArguments("Blob is not a file (in stdio)", .{});
- return false;
- }
-
- if (store.data.file.pathlike == .fd) {
- if (store.data.file.pathlike.fd == @intCast(JSC.Node.FileDescriptor, i)) {
- stdio_array[i] = Stdio{ .inherit = {} };
- } else {
- switch (@intCast(std.os.fd_t, i)) {
- std.os.STDIN_FILENO => {
- if (i == std.os.STDERR_FILENO or i == std.os.STDOUT_FILENO) {
- globalThis.throwInvalidArguments("stdin cannot be used for stdout or stderr", .{});
- return false;
- }
- },
+ return extractStdioBlob(globalThis, .{ .Blob = blob.dupe() }, i, stdio_array);
+ } else if (value.as(JSC.WebCore.Request)) |req| {
+ req.getBodyValue().toBlobIfPossible();
+ return extractStdioBlob(globalThis, req.getBodyValue().useAsAnyBlob(), i, stdio_array);
+ } else if (value.as(JSC.WebCore.Response)) |req| {
+ req.getBodyValue().toBlobIfPossible();
+ return extractStdioBlob(globalThis, req.getBodyValue().useAsAnyBlob(), i, stdio_array);
+ } else if (JSC.WebCore.ReadableStream.fromJS(value, globalThis)) |*req| {
+ if (i == std.os.STDIN_FILENO) {
+ if (req.toAnyBlob(globalThis)) |blob| {
+ return extractStdioBlob(globalThis, blob, i, stdio_array);
+ }
- std.os.STDOUT_FILENO, std.os.STDERR_FILENO => {
- if (i == std.os.STDIN_FILENO) {
- globalThis.throwInvalidArguments("stdout and stderr cannot be used for stdin", .{});
- return false;
- }
- },
- else => {},
- }
+ switch (req.ptr) {
+ .File, .Blob => unreachable,
+ .Direct, .JavaScript, .Bytes => {
+ if (req.isLocked(globalThis)) {
+ globalThis.throwInvalidArguments("ReadableStream cannot be locked", .{});
+ return false;
+ }
- stdio_array[i] = Stdio{ .fd = store.data.file.pathlike.fd };
+ stdio_array[i] = .{ .pipe = req.* };
+ return true;
+ },
+ else => {},
}
- return true;
+ globalThis.throwInvalidArguments("Unsupported ReadableStream type", .{});
+ return false;
+ }
+ } else if (value.asArrayBuffer(globalThis)) |array_buffer| {
+ if (array_buffer.slice().len == 0) {
+ globalThis.throwInvalidArguments("ArrayBuffer cannot be empty", .{});
+ return false;
}
- stdio_array[i] = .{ .path = store.data.file.pathlike.path };
- return true;
- } else if (value.isCallable(globalThis.vm())) {
- stdio_array[i] = .{ .callback = value };
- value.ensureStillAlive();
+ stdio_array[i] = .{
+ .array_buffer = JSC.ArrayBuffer.Strong{
+ .array_buffer = array_buffer,
+ .held = JSC.Strong.create(array_buffer.value, globalThis),
+ },
+ };
return true;
}
diff --git a/src/bun.js/base.zig b/src/bun.js/base.zig
index 23b00bd8e..59cad571b 100644
--- a/src/bun.js/base.zig
+++ b/src/bun.js/base.zig
@@ -2384,6 +2384,24 @@ pub const ArrayBuffer = extern struct {
value: JSC.JSValue = JSC.JSValue.zero,
shared: bool = false,
+ pub const Strong = struct {
+ array_buffer: ArrayBuffer,
+ held: JSC.Strong = .{},
+
+ pub fn clear(this: *ArrayBuffer.Strong) void {
+ var ref: *JSC.napi.Ref = this.ref orelse return;
+ ref.set(JSC.JSValue.zero);
+ }
+
+ pub fn slice(this: *const ArrayBuffer.Strong) []u8 {
+ return this.array_buffer.slice();
+ }
+
+ pub fn deinit(this: *ArrayBuffer.Strong) void {
+ this.held.deinit();
+ }
+ };
+
pub const empty = ArrayBuffer{ .offset = 0, .len = 0, .byte_len = 0, .typed_array_type = .Uint8Array, .ptr = undefined };
pub const name = "Bun__ArrayBuffer";
@@ -3902,7 +3920,7 @@ pub const Ref = struct {
pub const PollRef = struct {
status: Status = .inactive,
- const log = Output.scoped(.PollRef, true);
+ const log = Output.scoped(.PollRef, false);
const Status = enum { active, inactive, done };
diff --git a/src/bun.js/builtins/js/ReadableStreamInternals.js b/src/bun.js/builtins/js/ReadableStreamInternals.js
index 067d10366..5321ca922 100644
--- a/src/bun.js/builtins/js/ReadableStreamInternals.js
+++ b/src/bun.js/builtins/js/ReadableStreamInternals.js
@@ -268,6 +268,9 @@ function readableStreamPipeToWritableStream(
) {
"use strict";
+ const isDirectStream = !!@getByIdDirectPrivate(source, "start");
+
+
@assert(@isReadableStream(source));
@assert(@isWritableStream(destination));
@assert(!@isReadableStreamLocked(source));
diff --git a/src/bun.js/event_loop.zig b/src/bun.js/event_loop.zig
index 4c36f1c05..76206de54 100644
--- a/src/bun.js/event_loop.zig
+++ b/src/bun.js/event_loop.zig
@@ -519,7 +519,7 @@ pub const Poller = struct {
const FileBlobLoader = JSC.WebCore.FileBlobLoader;
const FileSink = JSC.WebCore.FileSink;
const Subprocess = JSC.Subprocess;
-
+ const BufferedInput = Subprocess.BufferedInput;
/// epoll only allows one pointer
/// We unfortunately need two pointers: one for a function call and one for the context
/// We use a tagged pointer union and then call the function with the context pointer
@@ -527,6 +527,7 @@ pub const Poller = struct {
FileBlobLoader,
FileSink,
Subprocess,
+ BufferedInput,
});
const Kevent = std.os.Kevent;
const kevent = std.c.kevent;
diff --git a/src/bun.js/webcore/response.zig b/src/bun.js/webcore/response.zig
index 5d641831f..079fa7f7f 100644
--- a/src/bun.js/webcore/response.zig
+++ b/src/bun.js/webcore/response.zig
@@ -3991,7 +3991,7 @@ pub const AnyBlob = union(enum) {
};
}
- pub fn store(this: *@This()) ?*Blob.Store {
+ pub fn store(this: *const @This()) ?*Blob.Store {
if (this.* == .Blob) {
return this.Blob.store;
}
@@ -4333,50 +4333,14 @@ pub const Body = struct {
}
pub fn toAnyBlobAllowPromise(this: *PendingValue) ?AnyBlob {
- const stream = this.readable orelse return null;
-
- switch (stream.ptr) {
- .Blob => |blobby| {
- var blob = JSC.WebCore.Blob.initWithStore(blobby.store, this.global);
- blob.offset = blobby.offset;
- blob.size = blobby.remain;
- blob.store.?.ref();
- stream.detach(this.global);
- stream.done();
- blobby.deinit();
- this.readable = null;
- return AnyBlob{ .Blob = blob };
- },
- .File => |blobby| {
- var blob = JSC.WebCore.Blob.initWithStore(blobby.store, this.global);
- blobby.store.ref();
-
- // it should be lazy, file shouldn't have opened yet.
- std.debug.assert(!blobby.started);
-
- stream.detach(this.global);
- blobby.deinit();
- stream.done();
- this.readable = null;
- return AnyBlob{ .Blob = blob };
- },
- .Bytes => |bytes| {
-
- // If we've received the complete body by the time this function is called
- // we can avoid streaming it and convert it to a Blob
- if (bytes.has_received_last_chunk) {
- stream.detach(this.global);
- var blob: JSC.WebCore.AnyBlob = undefined;
- blob.from(bytes.buffer);
- bytes.parent().deinit();
- this.readable = null;
- return blob;
- }
+ var stream = if (this.readable != null) &this.readable.? else return null;
- return null;
- },
- else => return null,
+ if (stream.toAnyBlob(this.global)) |blob| {
+ this.readable = null;
+ return blob;
}
+
+ return null;
}
pub fn setPromise(value: *PendingValue, globalThis: *JSC.JSGlobalObject, action: Action) JSValue {
diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig
index e6f8b2378..0f0577b1b 100644
--- a/src/bun.js/webcore/streams.zig
+++ b/src/bun.js/webcore/streams.zig
@@ -49,6 +49,7 @@ const Request = JSC.WebCore.Request;
const assert = std.debug.assert;
const Syscall = JSC.Node.Syscall;
+const AnyBlob = JSC.WebCore.AnyBlob;
pub const ReadableStream = struct {
value: JSValue,
ptr: Source,
@@ -57,6 +58,52 @@ pub const ReadableStream = struct {
return this.value;
}
+ pub fn toAnyBlob(
+ stream: *ReadableStream,
+ globalThis: *JSC.JSGlobalObject,
+ ) ?JSC.WebCore.AnyBlob {
+ switch (stream.ptr) {
+ .Blob => |blobby| {
+ var blob = JSC.WebCore.Blob.initWithStore(blobby.store, globalThis);
+ blob.offset = blobby.offset;
+ blob.size = blobby.remain;
+ blob.store.?.ref();
+ stream.detach(globalThis);
+ stream.done();
+ blobby.deinit();
+
+ return AnyBlob{ .Blob = blob };
+ },
+ .File => |blobby| {
+ var blob = JSC.WebCore.Blob.initWithStore(blobby.store, globalThis);
+ blobby.store.ref();
+
+ // it should be lazy, file shouldn't have opened yet.
+ std.debug.assert(!blobby.started);
+
+ stream.detach(globalThis);
+ blobby.deinit();
+ stream.done();
+ return AnyBlob{ .Blob = blob };
+ },
+ .Bytes => |bytes| {
+
+ // If we've received the complete body by the time this function is called
+ // we can avoid streaming it and convert it to a Blob
+ if (bytes.has_received_last_chunk) {
+ stream.detach(globalThis);
+ var blob: JSC.WebCore.AnyBlob = undefined;
+ blob.from(bytes.buffer);
+ bytes.parent().deinit();
+ return blob;
+ }
+
+ return null;
+ },
+ else => return null,
+ }
+ }
+
pub fn done(this: *const ReadableStream) void {
this.value.unprotect();
}
@@ -2973,6 +3020,8 @@ pub const FileBlobLoader = struct {
stored_global_this_: ?*JSC.JSGlobalObject = null,
poll_ref: JSC.PollRef = .{},
+ has_adjusted_pipe_size_on_linux: bool = false,
+
pub usingnamespace NewReadyWatcher(@This(), .read, ready);
pub inline fn globalThis(this: *FileBlobLoader) *JSC.JSGlobalObject {
@@ -3358,14 +3407,40 @@ pub const FileBlobLoader = struct {
// macOS FIONREAD doesn't seem to work here
// but we can get this information from the kqueue callback so we don't need to
- if (len == 0) {
- const FIONREAD = if (Environment.isLinux) std.os.linux.T.FIONREAD else bun.C.FIONREAD;
- const rc: c_int = std.c.ioctl(this.fd, FIONREAD, &len);
- if (rc != 0) {
- len = 0;
+ if (comptime Environment.isLinux) {
+ if (len == 0) {
+ const FIONREAD = if (Environment.isLinux) std.os.linux.T.FIONREAD else bun.C.FIONREAD;
+ const rc: c_int = std.c.ioctl(this.fd, FIONREAD, &len);
+ if (rc != 0) {
+ len = 0;
+ }
+
+ // In Linux versions before 2.6.11, the capacity of a
+ // pipe was the same as the system page size (e.g., 4096
+ // bytes on i386). Since Linux 2.6.11, the pipe
+ // capacity is 16 pages (i.e., 65,536 bytes in a system
+ // with a page size of 4096 bytes). Since Linux 2.6.35,
+ // the default pipe capacity is 16 pages, but the
+ // capacity can be queried and set using the
+ // fcntl(2) F_GETPIPE_SZ and F_SETPIPE_SZ operations.
+ // See fcntl(2) for more information.
+
+ //:# define F_SETPIPE_SZ 1031 /* Set pipe page size array.
+ const F_SETPIPE_SZ = 1031;
+ const F_GETPIPE_SZ = 1032;
+
+ if (!this.has_adjusted_pipe_size_on_linux) {
+ if (len + 1024 > 16 * std.mem.page_size) {
+ this.has_adjusted_pipe_size_on_linux = true;
+ var pipe_len: c_int = 0;
+ _ = std.c.fcntl(this.fd, F_GETPIPE_SZ, &pipe_len);
+ if (pipe_len <= 16 * std.mem.page_size) {
+ _ = std.c.fcntl(this.fd, F_SETPIPE_SZ, 512 * 1024);
+ }
+ }
+ }
}
}
-
if (len > read_buf.len * 10 and read_buf.len < std.mem.page_size) {
// then we need to allocate a buffer
// to read into