diff options
author | 2023-02-10 03:04:36 +0200 | |
---|---|---|
committer | 2023-02-09 17:04:36 -0800 | |
commit | 2abfa8abd2ed620986c7483c3fb8cc1bd1730f4f (patch) | |
tree | 0ac0794e7898a6ee6d94a2ce4a19bbc7d01581ec | |
parent | ad9d4fb0c41c1fe79e29a0fff18084937b2d8a2d (diff) | |
download | bun-2abfa8abd2ed620986c7483c3fb8cc1bd1730f4f.tar.gz bun-2abfa8abd2ed620986c7483c3fb8cc1bd1730f4f.tar.zst bun-2abfa8abd2ed620986c7483c3fb8cc1bd1730f4f.zip |
[streams] fix byte accounting (#2029)
fixes #1939
-rw-r--r-- | src/bun.js/webcore/streams.zig | 56 | ||||
-rw-r--r-- | test/bun.js/streams.test.js | 20 |
2 files changed, 45 insertions, 31 deletions
diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index 8fc6a29f7..868d0840d 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -3919,20 +3919,18 @@ pub const File = struct { } fn calculateChunkSize(this: *File, available_to_read: usize) usize { - const chunk_size: usize = if (this.user_chunk_size > 0) - @as(usize, this.user_chunk_size) - else if (this.isSeekable()) - @as(usize, default_file_chunk_size) - else - @as(usize, default_fifo_chunk_size); - - return if (this.remaining_bytes > 0 and this.isSeekable()) - if (available_to_read != std.math.maxInt(usize)) - @min(chunk_size, available_to_read) + const chunk_size: usize = switch (this.user_chunk_size) { + 0 => if (this.isSeekable()) + default_file_chunk_size else - @min(this.remaining_bytes -| this.total_read, chunk_size) + default_fifo_chunk_size, + else => |size| size, + }; + + return if (available_to_read == std.math.maxInt(usize) and this.remaining_bytes > 0 and this.isSeekable()) + @min(chunk_size, this.remaining_bytes -| this.total_read) else - @min(available_to_read, chunk_size); + @min(chunk_size, available_to_read); } pub fn start( @@ -4192,23 +4190,21 @@ pub const File = struct { pub fn readFromJS(this: *File, buf: []u8, view: JSValue, globalThis: *JSC.JSGlobalObject) StreamResult { const read_result = this.read(buf); - if (read_result == .read and read_result.read.len == 0) { - this.close(); - return .{ .done = {} }; - } - - if (read_result == .read) { - this.remaining_bytes -|= @intCast(Blob.SizeType, read_result.read.len); - } - if (read_result == .pending) { - if (this.scheduled_count == 0) { - this.buf = buf; - this.view.set(globalThis, view); - this.scheduleAsync(@truncate(Blob.SizeType, buf.len), globalThis); - } - - return .{ .pending = &this.pending }; + switch (read_result) { + .read => |slice| if (slice.len == 0) { + this.close(); + return .{ .done = {} }; + }, + .pending => { + if (this.scheduled_count == 0) { + this.buf = buf; + this.view.set(globalThis, view); + this.scheduleAsync(@truncate(Blob.SizeType, buf.len), globalThis); + } + return .{ .pending = &this.pending }; + }, + else => {}, } return read_result.toStream(&this.pending, buf, view, false); @@ -4421,7 +4417,7 @@ pub const FileReader = struct { switch (this.lazy_readable) { .blob => |blob| { defer blob.deref(); - var readable_file: File = .{ .loop = this.globalThis().bunVM().eventLoop() }; + var readable_file = File{ .loop = this.globalThis().bunVM().eventLoop() }; const result = readable_file.start(&blob.data.file); if (result != .ready) { @@ -4432,7 +4428,7 @@ pub const FileReader = struct { if (std.os.S.ISFIFO(readable_file.mode) or std.os.S.ISCHR(readable_file.mode)) { this.lazy_readable = .{ .readable = .{ - .FIFO = FIFO{ + .FIFO = .{ .fd = readable_file.fd, .auto_close = readable_file.auto_close, .drained = this.buffered_data.len == 0, diff --git a/test/bun.js/streams.test.js b/test/bun.js/streams.test.js index 253e4d2f2..7ae49db40 100644 --- a/test/bun.js/streams.test.js +++ b/test/bun.js/streams.test.js @@ -1,8 +1,9 @@ import { file, readableStreamToArrayBuffer, readableStreamToArray, readableStreamToText } from "bun"; import { expect, it, beforeEach, afterEach, describe } from "bun:test"; import { mkfifo } from "mkfifo"; -import { unlinkSync, writeFileSync } from "node:fs"; +import { realpathSync, unlinkSync, writeFileSync } from "node:fs"; import { join } from "node:path"; +import { tmpdir } from "os"; import { gc } from "./gc"; beforeEach(() => gc()); @@ -610,3 +611,20 @@ it("Blob.stream() -> new Response(stream).text()", async () => { const text = await new Response(stream).text(); expect(text).toBe("abdefgh"); }); + +it("Bun.file().stream() read text from large file", async () => { + const hugely = "HELLO!".repeat(1024 * 1024 * 10); + const tmpfile = join(realpathSync(tmpdir()), "bun-streams-test.txt"); + writeFileSync(tmpfile, hugely); + try { + const chunks = []; + for await (const chunk of Bun.file(tmpfile).stream()) { + chunks.push(chunk); + } + const output = Buffer.concat(chunks).toString(); + expect(output).toHaveLength(hugely.length); + expect(output).toBe(hugely); + } finally { + unlinkSync(tmpfile); + } +}); |