aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Alex Lam S.L <alexlamsl@gmail.com> 2023-01-04 02:08:28 +0200
committerGravatar GitHub <noreply@github.com> 2023-01-03 16:08:28 -0800
commit237bcdf99f5c8b2431e89bc9fa6300a7d256e7eb (patch)
tree653da17ab736d063efd69b12418551f76e9c6cfa
parent0b395ca1dca8432d04a65402b1d666d6b36ce4ae (diff)
downloadbun-237bcdf99f5c8b2431e89bc9fa6300a7d256e7eb.tar.gz
bun-237bcdf99f5c8b2431e89bc9fa6300a7d256e7eb.tar.zst
bun-237bcdf99f5c8b2431e89bc9fa6300a7d256e7eb.zip
[streams] speed up `Readable` in some cases (#1708)
If `encoding` is set, no `Buffer`s would be exposed thus `Uint8Array` can be used directly. - fix data corruption in `BufferList.concat()` - fix segfaults in `BufferList.join()`
Diffstat (limited to '')
-rw-r--r--src/bun.js/bindings/JSBufferList.cpp39
-rw-r--r--src/bun.js/streams.exports.js4
-rw-r--r--test/bun.js/node-stream-uint8array.test.ts111
3 files changed, 140 insertions, 14 deletions
diff --git a/src/bun.js/bindings/JSBufferList.cpp b/src/bun.js/bindings/JSBufferList.cpp
index 19aeda2d0..a615dfe73 100644
--- a/src/bun.js/bindings/JSBufferList.cpp
+++ b/src/bun.js/bindings/JSBufferList.cpp
@@ -53,10 +53,17 @@ JSC::JSValue JSBufferList::concat(JSC::VM& vm, JSC::JSGlobalObject* lexicalGloba
size_t i = 0;
for (auto iter = m_deque.begin(); iter != m_deque.end(); ++iter) {
auto array = JSC::jsDynamicCast<JSC::JSUint8Array*>(iter->get());
- if (!array)
- continue;
- size_t length = array->byteLength();
- uint8Array->setFromTypedArray(lexicalGlobalObject, i, array, 0, length, JSC::CopyType::Unobservable);
+ if (UNLIKELY(!array)) {
+ return throwTypeError(lexicalGlobalObject, throwScope, "concat can only be called when all buffers are Uint8Array"_s);
+ }
+ const size_t length = array->byteLength();
+ if (UNLIKELY(i + length > n)) {
+ return throwRangeError(lexicalGlobalObject, throwScope, "specified size too small to fit all buffers"_s);
+ }
+ if (UNLIKELY(!uint8Array->setFromTypedArray(lexicalGlobalObject, i, array, 0, length, JSC::CopyType::Unobservable))) {
+ return throwOutOfMemoryError(lexicalGlobalObject, throwScope);
+ }
+ i += length;
}
RELEASE_AND_RETURN(throwScope, uint8Array);
@@ -68,16 +75,18 @@ JSC::JSValue JSBufferList::join(JSC::VM& vm, JSC::JSGlobalObject* lexicalGlobalO
if (length() == 0) {
RELEASE_AND_RETURN(throwScope, JSC::jsEmptyString(vm));
}
- bool needSeq = false;
+ const bool needSeq = seq->length() != 0;
+ const auto end = m_deque.end();
JSRopeString::RopeBuilder<RecordOverflow> ropeBuilder(vm);
- for (auto iter = m_deque.begin(); iter != m_deque.end(); ++iter) {
- auto str = JSC::jsCast<JSC::JSString*>(iter->get());
+ for (auto iter = m_deque.begin(); ;) {
+ auto str = iter->get().toString(lexicalGlobalObject);
+ if (!ropeBuilder.append(str))
+ return throwOutOfMemoryError(lexicalGlobalObject, throwScope);
+ if (++iter == end)
+ break;
if (needSeq)
if (!ropeBuilder.append(seq))
return throwOutOfMemoryError(lexicalGlobalObject, throwScope);
- if (!ropeBuilder.append(str))
- return throwOutOfMemoryError(lexicalGlobalObject, throwScope);
- needSeq = seq->length() != 0;
}
RELEASE_AND_RETURN(throwScope, ropeBuilder.release());
}
@@ -142,11 +151,13 @@ JSC::JSValue JSBufferList::_getBuffer(JSC::VM& vm, JSC::JSGlobalObject* lexicalG
for (auto iter = m_deque.begin(); iter != m_deque.end() && n > 0; ++iter) {
JSC::JSUint8Array* array = JSC::jsDynamicCast<JSC::JSUint8Array*>(iter->get());
if (UNLIKELY(!array)) {
- return throwOutOfMemoryError(lexicalGlobalObject, throwScope, "_getBuffer can only be called when all buffers are Uint8Array"_s);
+ return throwTypeError(lexicalGlobalObject, throwScope, "_getBuffer can only be called when all buffers are Uint8Array"_s);
}
size_t length = array->byteLength();
if (length > n) {
- uint8Array->setFromTypedArray(lexicalGlobalObject, offset, array, 0, n, JSC::CopyType::Unobservable);
+ if (UNLIKELY(!uint8Array->setFromTypedArray(lexicalGlobalObject, offset, array, 0, n, JSC::CopyType::Unobservable))) {
+ return throwOutOfMemoryError(lexicalGlobalObject, throwScope);
+ }
// create a new array of size length - n.
// is there a faster way to do this?
auto arrayBuffer = JSC::ArrayBuffer::tryCreateUninitialized(length - n, 1);
@@ -160,7 +171,9 @@ JSC::JSValue JSBufferList::_getBuffer(JSC::VM& vm, JSC::JSGlobalObject* lexicalG
memcpy(newArray->typedVector(), array->typedVector() + n, length - n);
iter->set(vm, this, newArray);
} else {
- uint8Array->setFromTypedArray(lexicalGlobalObject, offset, array, 0, length, JSC::CopyType::Unobservable);
+ if (UNLIKELY(!uint8Array->setFromTypedArray(lexicalGlobalObject, offset, array, 0, length, JSC::CopyType::Unobservable))) {
+ return throwOutOfMemoryError(lexicalGlobalObject, throwScope);
+ }
m_deque.removeFirst();
}
n -= static_cast<int32_t>(length);
diff --git a/src/bun.js/streams.exports.js b/src/bun.js/streams.exports.js
index 988666388..5df244950 100644
--- a/src/bun.js/streams.exports.js
+++ b/src/bun.js/streams.exports.js
@@ -2949,7 +2949,9 @@ var require_readable = __commonJS({
} else if (chunk instanceof Buffer) {
encoding = "";
} else if (Stream._isUint8Array(chunk)) {
- chunk = Stream._uint8ArrayToBuffer(chunk);
+ if (addToFront || !state.decoder) {
+ chunk = Stream._uint8ArrayToBuffer(chunk);
+ }
encoding = "";
} else if (chunk != null) {
err = new ERR_INVALID_ARG_TYPE(
diff --git a/test/bun.js/node-stream-uint8array.test.ts b/test/bun.js/node-stream-uint8array.test.ts
new file mode 100644
index 000000000..4bd1c4bcf
--- /dev/null
+++ b/test/bun.js/node-stream-uint8array.test.ts
@@ -0,0 +1,111 @@
+import { beforeEach, describe, expect, it } from "bun:test";
+import { Readable, Writable } from "stream";
+
+const ABC = new Uint8Array([0x41, 0x42, 0x43]);
+const DEF = new Uint8Array([0x44, 0x45, 0x46]);
+const GHI = new Uint8Array([0x47, 0x48, 0x49]);
+
+describe("Writable", () => {
+ let called;
+
+ function logCall(fn, id) {
+ return function() {
+ called[id] = (called[id] || 0) + 1;
+ return fn.apply(this, arguments);
+ };
+ }
+
+ beforeEach(() => {
+ called = [];
+ });
+
+ it("should perform simple operations", () => {
+ let n = 0;
+ const writable = new Writable({
+ write: logCall((chunk, encoding, cb) => {
+ expect(chunk instanceof Buffer).toBe(true);
+ if (n++ === 0) {
+ expect(String(chunk)).toBe("ABC");
+ } else {
+ expect(String(chunk)).toBe("DEF");
+ }
+
+ cb();
+ }, 0),
+ });
+
+ writable.write(ABC);
+ writable.end(DEF);
+ expect(called).toEqual([ 2 ]);
+ });
+
+ it("should pass in Uint8Array in object mode", () => {
+ const writable = new Writable({
+ objectMode: true,
+ write: logCall((chunk, encoding, cb) => {
+ expect(chunk instanceof Buffer).toBe(false);
+ expect(chunk instanceof Uint8Array).toBe(true);
+ expect(chunk).toStrictEqual(ABC);
+ expect(encoding).toBe("utf8");
+ cb();
+ }, 0),
+ });
+
+ writable.end(ABC);
+ expect(called).toEqual([ 1 ]);
+ });
+
+ it("should handle multiple writes carried out via writev()", () => {
+ let callback;
+
+ const writable = new Writable({
+ write: logCall((chunk, encoding, cb) => {
+ expect(chunk instanceof Buffer).toBe(true);
+ expect(encoding).toBe("buffer");
+ expect(String(chunk)).toBe("ABC");
+ callback = cb;
+ }, 0),
+ writev: logCall((chunks, cb) => {
+ expect(chunks.length).toBe(2);
+ expect(chunks[0].encoding).toBe("buffer");
+ expect(chunks[1].encoding).toBe("buffer");
+ expect(chunks[0].chunk + chunks[1].chunk).toBe("DEFGHI");
+ }, 1),
+ });
+
+ writable.write(ABC);
+ writable.write(DEF);
+ writable.end(GHI);
+ callback();
+ expect(called).toEqual([ 1, 1 ]);
+ });
+});
+
+describe("Readable", () => {
+ it("should perform simple operations", () => {
+ const readable = new Readable({
+ read() {}
+ });
+
+ readable.push(DEF);
+ readable.unshift(ABC);
+
+ const buf = readable.read();
+ expect(buf instanceof Buffer).toBe(true);
+ expect([ ...buf ]).toEqual([ ...ABC, ...DEF ]);
+ });
+
+ it("should work with setEncoding()", () => {
+ const readable = new Readable({
+ read() {}
+ });
+
+ readable.setEncoding("utf8");
+
+ readable.push(DEF);
+ readable.unshift(ABC);
+
+ const out = readable.read();
+ expect(out).toBe("ABCDEF");
+ });
+});