diff options
-rw-r--r-- | integration/bunjs-only-snippets/fetch.js | 23 | ||||
-rw-r--r-- | src/http/async_bio.zig | 306 | ||||
-rw-r--r-- | src/http/async_message.zig | 3 | ||||
-rw-r--r-- | src/javascript/jsc/javascript.zig | 23 | ||||
-rw-r--r-- | types.d.ts | 5 |
5 files changed, 203 insertions, 157 deletions
diff --git a/integration/bunjs-only-snippets/fetch.js b/integration/bunjs-only-snippets/fetch.js index cc83b5af4..bb4345659 100644 --- a/integration/bunjs-only-snippets/fetch.js +++ b/integration/bunjs-only-snippets/fetch.js @@ -1,14 +1,17 @@ import fs from "fs"; -const response = await fetch("http://example.com/"); -const text = await response.text(); +const urls = ["https://example.com", "http://example.com"]; +for (let url of urls) { + const response = await fetch(url); + const text = await response.text(); -if ( - fs.readFileSync( - import.meta.path.substring(0, import.meta.path.lastIndexOf("/")) + - "/fetch.js.txt", - "utf8" - ) !== text -) { - throw new Error("Expected fetch.js.txt to match snapshot"); + if ( + fs.readFileSync( + import.meta.path.substring(0, import.meta.path.lastIndexOf("/")) + + "/fetch.js.txt", + "utf8" + ) !== text + ) { + throw new Error("Expected fetch.js.txt to match snapshot"); + } } diff --git a/src/http/async_bio.zig b/src/http/async_bio.zig index 5e86da949..eadc40b2a 100644 --- a/src/http/async_bio.zig +++ b/src/http/async_bio.zig @@ -8,26 +8,38 @@ const Output = @import("../global.zig").Output; const extremely_verbose = @import("../http_client_async.zig").extremely_verbose; const SOCKET_FLAGS = @import("../http_client_async.zig").SOCKET_FLAGS; const getAllocator = @import("../http_client_async.zig").getAllocator; +const assert = std.debug.assert; +const BufferPool = AsyncMessage.BufferPool; + +const fail = -3; +const connection_closed = -2; +const pending = -1; +const OK = 0; bio: *boring.BIO = undefined, socket_fd: std.os.socket_t = 0, -allocator: std.mem.Allocator, -read_buf_len: usize = 0, +allocator: std.mem.Allocator, read_wait: Wait = Wait.pending, send_wait: Wait = Wait.pending, -recv_completion: AsyncIO.Completion = undefined, -send_completion: AsyncIO.Completion = undefined, -write_buffer: ?*AsyncMessage = null, +recv_buffer: ?*BufferPool.Node = null, +recv_completion: Completion = undefined, + +send_buffer: ?*BufferPool.Node = null, +send_completion: Completion = undefined, -last_send_result: AsyncIO.SendError!usize = 0, +write_error: c_int = 0, +socket_recv_len: c_int = 0, +bio_read_offset: u32 = 0, + +socket_send_error: ?anyerror = null, +socket_recv_error: ?anyerror = null, -last_read_result: AsyncIO.RecvError!usize = 0, next: ?*AsyncBIO = null, -pending_frame: PendingFrame = PendingFrame.init(), +pending_frame: PendingFrame = PendingFrame.init(), pub const PendingFrame = std.fifo.LinearFifo(anyframe, .{ .Static = 8 }); pub inline fn pushPendingFrame(this: *AsyncBIO, frame: anyframe) void { @@ -38,6 +50,12 @@ pub inline fn popPendingFrame(this: *AsyncBIO) ?anyframe { return this.pending_frame.readItem(); } +pub fn nextFrame(this: *AsyncBIO) void { + if (this.pending_frame.readItem()) |frame| { + resume frame; + } +} + var method: ?*boring.BIO_METHOD = null; var head: ?*AsyncBIO = null; @@ -77,11 +95,14 @@ pub fn release(this: *AsyncBIO) void { } this.read_wait = .pending; - this.last_read_result = 0; this.send_wait = .pending; - this.last_read_result = 0; this.pending_frame = PendingFrame.init(); + if (this.recv_buffer) |recv| { + recv.release(); + this.recv_buffer = null; + } + if (this.write_buffer) |write| { write.release(); this.write_buffer = null; @@ -110,79 +131,72 @@ const WaitResult = enum { send, }; -const Sender = struct { - pub fn onSend(this: *AsyncBIO, _: *Completion, result: AsyncIO.SendError!usize) void { - this.last_send_result = result; - this.send_wait = .completed; - this.write_buffer.?.sent += @truncate(u32, result catch 0); - - if (extremely_verbose) { - const read_result = result catch @as(usize, 999); - Output.prettyErrorln("onSend: {d}", .{read_result}); - Output.flush(); - } - - if (this.pending_frame.readItem()) |frame| { - resume frame; - } - } -}; +pub fn doSocketRead(this: *AsyncBIO, _: *Completion, result_: AsyncIO.RecvError!usize) void { + const socket_recv_len = @truncate( + c_int, + result_ catch |err| { + this.socket_recv_error = err; + this.socket_recv_len = fail; + this.onSocketReadComplete(); + return; + }, + ); + this.socket_recv_len += socket_recv_len; -pub fn enqueueSend( - self: *AsyncBIO, -) void { - if (self.write_buffer == null) return; - var to_write = self.write_buffer.?.slice(); - if (to_write.len == 0) { + if (socket_recv_len == 0) { + this.onSocketReadComplete(); return; } - self.last_send_result = 0; + this.read_wait = .pending; + this.scheduleSocketRead(); +} - AsyncIO.global.send( - *AsyncBIO, - self, - Sender.onSend, - &self.send_completion, - self.socket_fd, - to_write, - SOCKET_FLAGS, - ); - self.send_wait = .suspended; - if (extremely_verbose) { - Output.prettyErrorln("enqueueSend: {d}", .{to_write.len}); - Output.flush(); - } +fn onSocketReadComplete(this: *AsyncBIO) void { + assert(this.read_wait == .suspended); + this.handleSocketReadComplete(); + + this.nextFrame(); } -const Reader = struct { - pub fn onRead(this: *AsyncBIO, _: *Completion, result: AsyncIO.RecvError!usize) void { - this.last_read_result = result; - this.read_wait = .completed; - if (extremely_verbose) { - const read_result = result catch @as(usize, 999); - Output.prettyErrorln("onRead: {d}", .{read_result}); - Output.flush(); - } - if (this.pending_frame.readItem()) |frame| { - resume frame; +inline fn readBuf(this: *AsyncBIO) []u8 { + return this.recv_buffer.?.data[this.bio_read_offset..]; +} + +pub fn scheduleSocketRead(this: *AsyncBIO) void { + assert(this.read_wait == .pending); + this.read_wait = .suspended; + + AsyncIO.global.recv(*AsyncBIO, this, this.doSocketRead, &this.recv_completion, this.socket_fd, this.readBuf()); +} + +fn handleSocketReadComplete( + this: *AsyncBIO, +) void { + this.read_wait = .completed; + + if (this.socket_recv_len <= 0) { + if (this.recv_buffer) |buf| { + buf.release(); + this.recv_buffer = null; } } -}; +} -pub fn enqueueRead(self: *AsyncBIO, read_buf: []u8, off: u64) void { - var read_buffer = read_buf[off..]; - if (read_buffer.len == 0) { - return; - } +pub fn onSocketWriteComplete(this: *AsyncBIO, _: *Completion, result: AsyncIO.SendError!usize) void { + assert(this.send_wait == .pending); + this.handleSocketWriteComplete(result); + this.nextFrame(); +} - self.last_read_result = 0; - AsyncIO.global.recv(*AsyncBIO, self, Reader.onRead, &self.recv_completion, self.socket_fd, read_buffer); - self.read_wait = .suspended; - if (extremely_verbose) { - Output.prettyErrorln("enqueuedRead: {d}", .{read_buf.len}); - Output.flush(); - } +pub fn handleSocketWriteComplete(this: *AsyncBIO, result: AsyncIO.SendError!usize) void { + // this.last_socket_recv_len = result; + // this.read_wait = .completed; + // if (extremely_verbose) { + // const socket_recv_len = result catch @as(usize, 999); + // Output.prettyErrorln("onRead: {d}", .{socket_recv_len}); + // Output.flush(); + // } } pub const Bio = struct { @@ -205,90 +219,100 @@ pub const Bio = struct { return 0; } pub fn write(this_bio: *boring.BIO, ptr: [*c]const u8, len: c_int) callconv(.C) c_int { - std.debug.assert(@ptrToInt(ptr) > 0 and len >= 0); - - var buf = ptr[0..@intCast(usize, len)]; - boring.BIO_clear_flags(this_bio, boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY); + if (len < 0) return len; + assert(@ptrToInt(ptr) > 0); + boring.BIO_clear_retry_flags(this_bio); + var this = cast(this_bio); - if (len <= 0) { - return 0; + if (this.socket_send_error != null) { + if (extremely_verbose) { + Output.prettyErrorln("write: {s}", .{@errorName(this.socket_send_error.?)}); + Output.flush(); + } + return -1; } + } + + pub fn read(this_bio: *boring.BIO, ptr: [*c]u8, len_: c_int) callconv(.C) c_int { + if (len_ < 0) return len_; + const len__: u32 = @intCast(u32, len_); + assert(@ptrToInt(ptr) > 0); + boring.BIO_clear_retry_flags(this_bio); var this = cast(this_bio); - if (this.read_wait == .suspended) { - boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY)); + + var socket_recv_len = this.socket_recv_len; + var bio_read_offset = this.bio_read_offset; + defer { + this.bio_read_offset = bio_read_offset; + this.socket_recv_len = socket_recv_len; + } + + if (this.socket_recv_error) |socket_err| { + if (extremely_verbose) Output.prettyErrorln("SSL read error: {s}", .{@errorName(socket_err)}); return -1; } - switch (this.send_wait) { - .pending => { - var write_buffer = this.write_buffer orelse brk: { - this.write_buffer = AsyncMessage.get(getAllocator()); - break :brk this.write_buffer.?; - }; - - _ = write_buffer.writeAll(buf); - boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY)); - - return -1; - }, - .suspended => { - boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY)); - - return -1; - }, - .completed => { - this.send_wait = .pending; - const written = this.last_send_result catch |err| { - Output.prettyErrorln("HTTPS error: {s}", .{@errorName(err)}); - Output.flush(); - boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY)); - return -1; - }; - this.last_send_result = 0; - return @intCast(c_int, written); - }, + // If there is no result available synchronously, report any Write() errors + // that were observed. Otherwise the application may have encountered a socket + // error while writing that would otherwise not be reported until the + // application attempted to write again - which it may never do. See + // https://crbug.com/249848. + if ((this.write_error != OK or this.write_error != pending) and (socket_recv_len == OK or socket_recv_len == pending)) { + return -1; } - unreachable; - } + if (socket_recv_len == 0) { + // Instantiate the read buffer and read from the socket. Although only |len| + // bytes were requested, intentionally read to the full buffer size. The SSL + // layer reads the record header and body in separate reads to avoid + // overreading, but issuing one is more efficient. SSL sockets are not + // reused after shutdown for non-SSL traffic, so overreading is fine. + assert(bio_read_offset == 0); + this.scheduleSocketRead(); + socket_recv_len = pending; + } - pub fn read(this_bio: *boring.BIO, ptr: [*c]u8, len: c_int) callconv(.C) c_int { - std.debug.assert(@ptrToInt(ptr) > 0 and len >= 0); - var this = cast(this_bio); + if (socket_recv_len == pending) { + boring.BIO_set_retry_read(this_bio); + return -1; + } - var buf = ptr[0..@maximum(@intCast(usize, len), this.read_buf_len)]; - - boring.BIO_clear_flags(this_bio, boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY); - - switch (this.read_wait) { - .pending => { - this.enqueueRead(buf, 0); - boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_WRITE | boring.BIO_FLAGS_SHOULD_RETRY)); - return -1; - }, - .suspended => { - boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_WRITE | boring.BIO_FLAGS_SHOULD_RETRY)); - return -1; - }, - .completed => { - this.read_wait = .pending; - const read_len = this.last_read_result catch |err| { - Output.prettyErrorln("HTTPS error: {s}", .{@errorName(err)}); - Output.flush(); - boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_WRITE | boring.BIO_FLAGS_SHOULD_RETRY)); - return -1; - }; - this.last_read_result = 0; - return @intCast(c_int, read_len); - }, + // If the last Read() failed, report the error. + if (socket_recv_len < 0) { + if (extremely_verbose) Output.prettyErrorln("Unexpected ssl error: {d}", .{socket_recv_len}); + return -1; } - unreachable; + + const socket_recv_len_ = @intCast(u32, socket_recv_len); + + // Report the result of the last Read() if non-empty. + if (!(bio_read_offset < socket_recv_len_)) return 0; + const len = @minimum(len__, socket_recv_len_ - bio_read_offset); + var data = @ptrCast([*]const u8, &this.recv_buffer.?.data[bio_read_offset]); + @memcpy(ptr, data, len); + bio_read_offset += len; + + if (bio_read_offset == socket_recv_len_) { + // The read buffer is empty. + bio_read_offset = 0; + socket_recv_len = 0; + + if (this.recv_buffer) |buf| { + buf.release(); + this.recv_buffer = null; + } + } + + return @intCast(c_int, len); } + + // https://chromium.googlesource.com/chromium/src/+/refs/heads/main/net/socket/socket_bio_adapter.cc#376 pub fn ctrl(_: *boring.BIO, cmd: c_int, _: c_long, _: ?*anyopaque) callconv(.C) c_long { return switch (cmd) { - boring.BIO_CTRL_PENDING, boring.BIO_CTRL_WPENDING => 0, - else => 1, + // The SSL stack requires BIOs handle BIO_flush. + boring.BIO_CTRL_FLUSH => 1, + else => 0, }; } }; diff --git a/src/http/async_message.zig b/src/http/async_message.zig index c1c11b109..d68c58cb0 100644 --- a/src/http/async_message.zig +++ b/src/http/async_message.zig @@ -3,7 +3,8 @@ const ObjectPool = @import("../pool.zig").ObjectPool; const AsyncIO = @import("io"); pub const buffer_pool_len = std.math.maxInt(u16) - 64; -pub const BufferPool = ObjectPool([buffer_pool_len]u8, null, false); +pub const BufferPoolBytes = [buffer_pool_len]u8; +pub const BufferPool = ObjectPool(BufferPoolBytes, null, false); const AsyncMessage = @This(); diff --git a/src/javascript/jsc/javascript.zig b/src/javascript/jsc/javascript.zig index 7cdb49004..fcbc532ca 100644 --- a/src/javascript/jsc/javascript.zig +++ b/src/javascript/jsc/javascript.zig @@ -641,6 +641,29 @@ pub const Bun = struct { return ZigString.init(stream.buffer[0..stream.pos]).toValueGC(ctx.ptr()).asObjectRef(); } + // pub fn resolvePath( + // _: void, + // ctx: js.JSContextRef, + // _: js.JSObjectRef, + // _: js.JSObjectRef, + // arguments: []const js.JSValueRef, + // _: js.ExceptionRef, + // ) js.JSValueRef { + // if (arguments.len == 0) return ZigString.Empty.toValue(ctx.ptr()).asObjectRef(); + // var zig_str: ZigString = ZigString.Empty; + // JSValue.toZigString(JSValue.fromRef(arguments[0]), &zig_str, ctx.ptr()); + // var buf: [std.fs.MAX_PATH_BYTES]u8 = undefined; + // var stack = std.heap.stackFallback(32 * @sizeOf(string), VirtualMachine.vm.allocator); + // var allocator = stack.get(); + // var parts = allocator.alloc(string, arguments.len) catch {}; + // defer allocator.free(parts); + + // const to = zig_str.slice(); + // var parts = .{to}; + // const value = ZigString.init(VirtualMachine.vm.bundler.fs.absBuf(&parts, &buf)).toValueGC(ctx.ptr()); + // return value.asObjectRef(); + // } + pub const Class = NewClass( void, .{ diff --git a/types.d.ts b/types.d.ts index 1907de370..e69de29bb 100644 --- a/types.d.ts +++ b/types.d.ts @@ -1,5 +0,0 @@ -interface BunNodeModule extends NodeJS.Module { - requireFirst(...id: string[]): any; -} - -declare var module: BunNodeModule; |