diff options
author | 2023-01-04 02:08:28 +0200 | |
---|---|---|
committer | 2023-01-03 16:08:28 -0800 | |
commit | 237bcdf99f5c8b2431e89bc9fa6300a7d256e7eb (patch) | |
tree | 653da17ab736d063efd69b12418551f76e9c6cfa | |
parent | 0b395ca1dca8432d04a65402b1d666d6b36ce4ae (diff) | |
download | bun-237bcdf99f5c8b2431e89bc9fa6300a7d256e7eb.tar.gz bun-237bcdf99f5c8b2431e89bc9fa6300a7d256e7eb.tar.zst bun-237bcdf99f5c8b2431e89bc9fa6300a7d256e7eb.zip |
[streams] speed up `Readable` in some cases (#1708)
If `encoding` is set, no `Buffer`s would be exposed thus `Uint8Array` can be used directly.
- fix data corruption in `BufferList.concat()`
- fix segfaults in `BufferList.join()`
Diffstat (limited to '')
-rw-r--r-- | src/bun.js/bindings/JSBufferList.cpp | 39 | ||||
-rw-r--r-- | src/bun.js/streams.exports.js | 4 | ||||
-rw-r--r-- | test/bun.js/node-stream-uint8array.test.ts | 111 |
3 files changed, 140 insertions, 14 deletions
diff --git a/src/bun.js/bindings/JSBufferList.cpp b/src/bun.js/bindings/JSBufferList.cpp index 19aeda2d0..a615dfe73 100644 --- a/src/bun.js/bindings/JSBufferList.cpp +++ b/src/bun.js/bindings/JSBufferList.cpp @@ -53,10 +53,17 @@ JSC::JSValue JSBufferList::concat(JSC::VM& vm, JSC::JSGlobalObject* lexicalGloba size_t i = 0; for (auto iter = m_deque.begin(); iter != m_deque.end(); ++iter) { auto array = JSC::jsDynamicCast<JSC::JSUint8Array*>(iter->get()); - if (!array) - continue; - size_t length = array->byteLength(); - uint8Array->setFromTypedArray(lexicalGlobalObject, i, array, 0, length, JSC::CopyType::Unobservable); + if (UNLIKELY(!array)) { + return throwTypeError(lexicalGlobalObject, throwScope, "concat can only be called when all buffers are Uint8Array"_s); + } + const size_t length = array->byteLength(); + if (UNLIKELY(i + length > n)) { + return throwRangeError(lexicalGlobalObject, throwScope, "specified size too small to fit all buffers"_s); + } + if (UNLIKELY(!uint8Array->setFromTypedArray(lexicalGlobalObject, i, array, 0, length, JSC::CopyType::Unobservable))) { + return throwOutOfMemoryError(lexicalGlobalObject, throwScope); + } + i += length; } RELEASE_AND_RETURN(throwScope, uint8Array); @@ -68,16 +75,18 @@ JSC::JSValue JSBufferList::join(JSC::VM& vm, JSC::JSGlobalObject* lexicalGlobalO if (length() == 0) { RELEASE_AND_RETURN(throwScope, JSC::jsEmptyString(vm)); } - bool needSeq = false; + const bool needSeq = seq->length() != 0; + const auto end = m_deque.end(); JSRopeString::RopeBuilder<RecordOverflow> ropeBuilder(vm); - for (auto iter = m_deque.begin(); iter != m_deque.end(); ++iter) { - auto str = JSC::jsCast<JSC::JSString*>(iter->get()); + for (auto iter = m_deque.begin(); ;) { + auto str = iter->get().toString(lexicalGlobalObject); + if (!ropeBuilder.append(str)) + return throwOutOfMemoryError(lexicalGlobalObject, throwScope); + if (++iter == end) + break; if (needSeq) if (!ropeBuilder.append(seq)) return throwOutOfMemoryError(lexicalGlobalObject, throwScope); - if (!ropeBuilder.append(str)) - return throwOutOfMemoryError(lexicalGlobalObject, throwScope); - needSeq = seq->length() != 0; } RELEASE_AND_RETURN(throwScope, ropeBuilder.release()); } @@ -142,11 +151,13 @@ JSC::JSValue JSBufferList::_getBuffer(JSC::VM& vm, JSC::JSGlobalObject* lexicalG for (auto iter = m_deque.begin(); iter != m_deque.end() && n > 0; ++iter) { JSC::JSUint8Array* array = JSC::jsDynamicCast<JSC::JSUint8Array*>(iter->get()); if (UNLIKELY(!array)) { - return throwOutOfMemoryError(lexicalGlobalObject, throwScope, "_getBuffer can only be called when all buffers are Uint8Array"_s); + return throwTypeError(lexicalGlobalObject, throwScope, "_getBuffer can only be called when all buffers are Uint8Array"_s); } size_t length = array->byteLength(); if (length > n) { - uint8Array->setFromTypedArray(lexicalGlobalObject, offset, array, 0, n, JSC::CopyType::Unobservable); + if (UNLIKELY(!uint8Array->setFromTypedArray(lexicalGlobalObject, offset, array, 0, n, JSC::CopyType::Unobservable))) { + return throwOutOfMemoryError(lexicalGlobalObject, throwScope); + } // create a new array of size length - n. // is there a faster way to do this? auto arrayBuffer = JSC::ArrayBuffer::tryCreateUninitialized(length - n, 1); @@ -160,7 +171,9 @@ JSC::JSValue JSBufferList::_getBuffer(JSC::VM& vm, JSC::JSGlobalObject* lexicalG memcpy(newArray->typedVector(), array->typedVector() + n, length - n); iter->set(vm, this, newArray); } else { - uint8Array->setFromTypedArray(lexicalGlobalObject, offset, array, 0, length, JSC::CopyType::Unobservable); + if (UNLIKELY(!uint8Array->setFromTypedArray(lexicalGlobalObject, offset, array, 0, length, JSC::CopyType::Unobservable))) { + return throwOutOfMemoryError(lexicalGlobalObject, throwScope); + } m_deque.removeFirst(); } n -= static_cast<int32_t>(length); diff --git a/src/bun.js/streams.exports.js b/src/bun.js/streams.exports.js index 988666388..5df244950 100644 --- a/src/bun.js/streams.exports.js +++ b/src/bun.js/streams.exports.js @@ -2949,7 +2949,9 @@ var require_readable = __commonJS({ } else if (chunk instanceof Buffer) { encoding = ""; } else if (Stream._isUint8Array(chunk)) { - chunk = Stream._uint8ArrayToBuffer(chunk); + if (addToFront || !state.decoder) { + chunk = Stream._uint8ArrayToBuffer(chunk); + } encoding = ""; } else if (chunk != null) { err = new ERR_INVALID_ARG_TYPE( diff --git a/test/bun.js/node-stream-uint8array.test.ts b/test/bun.js/node-stream-uint8array.test.ts new file mode 100644 index 000000000..4bd1c4bcf --- /dev/null +++ b/test/bun.js/node-stream-uint8array.test.ts @@ -0,0 +1,111 @@ +import { beforeEach, describe, expect, it } from "bun:test"; +import { Readable, Writable } from "stream"; + +const ABC = new Uint8Array([0x41, 0x42, 0x43]); +const DEF = new Uint8Array([0x44, 0x45, 0x46]); +const GHI = new Uint8Array([0x47, 0x48, 0x49]); + +describe("Writable", () => { + let called; + + function logCall(fn, id) { + return function() { + called[id] = (called[id] || 0) + 1; + return fn.apply(this, arguments); + }; + } + + beforeEach(() => { + called = []; + }); + + it("should perform simple operations", () => { + let n = 0; + const writable = new Writable({ + write: logCall((chunk, encoding, cb) => { + expect(chunk instanceof Buffer).toBe(true); + if (n++ === 0) { + expect(String(chunk)).toBe("ABC"); + } else { + expect(String(chunk)).toBe("DEF"); + } + + cb(); + }, 0), + }); + + writable.write(ABC); + writable.end(DEF); + expect(called).toEqual([ 2 ]); + }); + + it("should pass in Uint8Array in object mode", () => { + const writable = new Writable({ + objectMode: true, + write: logCall((chunk, encoding, cb) => { + expect(chunk instanceof Buffer).toBe(false); + expect(chunk instanceof Uint8Array).toBe(true); + expect(chunk).toStrictEqual(ABC); + expect(encoding).toBe("utf8"); + cb(); + }, 0), + }); + + writable.end(ABC); + expect(called).toEqual([ 1 ]); + }); + + it("should handle multiple writes carried out via writev()", () => { + let callback; + + const writable = new Writable({ + write: logCall((chunk, encoding, cb) => { + expect(chunk instanceof Buffer).toBe(true); + expect(encoding).toBe("buffer"); + expect(String(chunk)).toBe("ABC"); + callback = cb; + }, 0), + writev: logCall((chunks, cb) => { + expect(chunks.length).toBe(2); + expect(chunks[0].encoding).toBe("buffer"); + expect(chunks[1].encoding).toBe("buffer"); + expect(chunks[0].chunk + chunks[1].chunk).toBe("DEFGHI"); + }, 1), + }); + + writable.write(ABC); + writable.write(DEF); + writable.end(GHI); + callback(); + expect(called).toEqual([ 1, 1 ]); + }); +}); + +describe("Readable", () => { + it("should perform simple operations", () => { + const readable = new Readable({ + read() {} + }); + + readable.push(DEF); + readable.unshift(ABC); + + const buf = readable.read(); + expect(buf instanceof Buffer).toBe(true); + expect([ ...buf ]).toEqual([ ...ABC, ...DEF ]); + }); + + it("should work with setEncoding()", () => { + const readable = new Readable({ + read() {} + }); + + readable.setEncoding("utf8"); + + readable.push(DEF); + readable.unshift(ABC); + + const out = readable.read(); + expect(out).toBe("ABCDEF"); + }); +}); |