aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Alex Lam S.L <alexlamsl@gmail.com> 2023-02-10 03:04:36 +0200
committerGravatar GitHub <noreply@github.com> 2023-02-09 17:04:36 -0800
commit2abfa8abd2ed620986c7483c3fb8cc1bd1730f4f (patch)
tree0ac0794e7898a6ee6d94a2ce4a19bbc7d01581ec
parentad9d4fb0c41c1fe79e29a0fff18084937b2d8a2d (diff)
downloadbun-2abfa8abd2ed620986c7483c3fb8cc1bd1730f4f.tar.gz
bun-2abfa8abd2ed620986c7483c3fb8cc1bd1730f4f.tar.zst
bun-2abfa8abd2ed620986c7483c3fb8cc1bd1730f4f.zip
[streams] fix byte accounting (#2029)
fixes #1939
-rw-r--r--src/bun.js/webcore/streams.zig56
-rw-r--r--test/bun.js/streams.test.js20
2 files changed, 45 insertions, 31 deletions
diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig
index 8fc6a29f7..868d0840d 100644
--- a/src/bun.js/webcore/streams.zig
+++ b/src/bun.js/webcore/streams.zig
@@ -3919,20 +3919,18 @@ pub const File = struct {
}
fn calculateChunkSize(this: *File, available_to_read: usize) usize {
- const chunk_size: usize = if (this.user_chunk_size > 0)
- @as(usize, this.user_chunk_size)
- else if (this.isSeekable())
- @as(usize, default_file_chunk_size)
- else
- @as(usize, default_fifo_chunk_size);
-
- return if (this.remaining_bytes > 0 and this.isSeekable())
- if (available_to_read != std.math.maxInt(usize))
- @min(chunk_size, available_to_read)
+ const chunk_size: usize = switch (this.user_chunk_size) {
+ 0 => if (this.isSeekable())
+ default_file_chunk_size
else
- @min(this.remaining_bytes -| this.total_read, chunk_size)
+ default_fifo_chunk_size,
+ else => |size| size,
+ };
+
+ return if (available_to_read == std.math.maxInt(usize) and this.remaining_bytes > 0 and this.isSeekable())
+ @min(chunk_size, this.remaining_bytes -| this.total_read)
else
- @min(available_to_read, chunk_size);
+ @min(chunk_size, available_to_read);
}
pub fn start(
@@ -4192,23 +4190,21 @@ pub const File = struct {
pub fn readFromJS(this: *File, buf: []u8, view: JSValue, globalThis: *JSC.JSGlobalObject) StreamResult {
const read_result = this.read(buf);
- if (read_result == .read and read_result.read.len == 0) {
- this.close();
- return .{ .done = {} };
- }
-
- if (read_result == .read) {
- this.remaining_bytes -|= @intCast(Blob.SizeType, read_result.read.len);
- }
- if (read_result == .pending) {
- if (this.scheduled_count == 0) {
- this.buf = buf;
- this.view.set(globalThis, view);
- this.scheduleAsync(@truncate(Blob.SizeType, buf.len), globalThis);
- }
-
- return .{ .pending = &this.pending };
+ switch (read_result) {
+ .read => |slice| if (slice.len == 0) {
+ this.close();
+ return .{ .done = {} };
+ },
+ .pending => {
+ if (this.scheduled_count == 0) {
+ this.buf = buf;
+ this.view.set(globalThis, view);
+ this.scheduleAsync(@truncate(Blob.SizeType, buf.len), globalThis);
+ }
+ return .{ .pending = &this.pending };
+ },
+ else => {},
}
return read_result.toStream(&this.pending, buf, view, false);
@@ -4421,7 +4417,7 @@ pub const FileReader = struct {
switch (this.lazy_readable) {
.blob => |blob| {
defer blob.deref();
- var readable_file: File = .{ .loop = this.globalThis().bunVM().eventLoop() };
+ var readable_file = File{ .loop = this.globalThis().bunVM().eventLoop() };
const result = readable_file.start(&blob.data.file);
if (result != .ready) {
@@ -4432,7 +4428,7 @@ pub const FileReader = struct {
if (std.os.S.ISFIFO(readable_file.mode) or std.os.S.ISCHR(readable_file.mode)) {
this.lazy_readable = .{
.readable = .{
- .FIFO = FIFO{
+ .FIFO = .{
.fd = readable_file.fd,
.auto_close = readable_file.auto_close,
.drained = this.buffered_data.len == 0,
diff --git a/test/bun.js/streams.test.js b/test/bun.js/streams.test.js
index 253e4d2f2..7ae49db40 100644
--- a/test/bun.js/streams.test.js
+++ b/test/bun.js/streams.test.js
@@ -1,8 +1,9 @@
import { file, readableStreamToArrayBuffer, readableStreamToArray, readableStreamToText } from "bun";
import { expect, it, beforeEach, afterEach, describe } from "bun:test";
import { mkfifo } from "mkfifo";
-import { unlinkSync, writeFileSync } from "node:fs";
+import { realpathSync, unlinkSync, writeFileSync } from "node:fs";
import { join } from "node:path";
+import { tmpdir } from "os";
import { gc } from "./gc";
beforeEach(() => gc());
@@ -610,3 +611,20 @@ it("Blob.stream() -> new Response(stream).text()", async () => {
const text = await new Response(stream).text();
expect(text).toBe("abdefgh");
});
+
+it("Bun.file().stream() read text from large file", async () => {
+ const hugely = "HELLO!".repeat(1024 * 1024 * 10);
+ const tmpfile = join(realpathSync(tmpdir()), "bun-streams-test.txt");
+ writeFileSync(tmpfile, hugely);
+ try {
+ const chunks = [];
+ for await (const chunk of Bun.file(tmpfile).stream()) {
+ chunks.push(chunk);
+ }
+ const output = Buffer.concat(chunks).toString();
+ expect(output).toHaveLength(hugely.length);
+ expect(output).toBe(hugely);
+ } finally {
+ unlinkSync(tmpfile);
+ }
+});