aboutsummaryrefslogtreecommitdiff
path: root/packages/bun-vscode/src/jsc.ts
diff options
context:
space:
mode:
Diffstat (limited to 'packages/bun-vscode/src/jsc.ts')
-rw-r--r--packages/bun-vscode/src/jsc.ts308
1 files changed, 308 insertions, 0 deletions
diff --git a/packages/bun-vscode/src/jsc.ts b/packages/bun-vscode/src/jsc.ts
new file mode 100644
index 000000000..5b8d4ed84
--- /dev/null
+++ b/packages/bun-vscode/src/jsc.ts
@@ -0,0 +1,308 @@
+import { Socket, createConnection } from "node:net";
+import { inspect } from "node:util";
+import type { JSC } from "../types/jsc";
+export type { JSC };
+
+export type JSCClientOptions = {
+ url: string | URL;
+ retry?: boolean;
+ onEvent?: (event: JSC.Event) => void;
+ onRequest?: (request: JSC.Request) => void;
+ onResponse?: (response: JSC.Response) => void;
+ onError?: (error: Error) => void;
+ onClose?: (code: number, reason: string) => void;
+};
+const headerInvalidNumber = 2147483646;
+
+// We use non-printable characters to separate messages in the stream.
+// These should never appear in textual messages.
+
+// These are non-sequential so that code which just counts up from 0 doesn't accidentally parse them as messages.
+// 0x12 0x11 0x13 0x14 as a little-endian 32-bit unsigned integer
+const headerPrefix = "\x14\x13\x11\x12";
+
+// 0x14 0x12 0x13 0x11 as a little-endian 32-bit unsigned integer
+const headerSuffixString = "\x11\x13\x12\x14";
+
+const headerSuffixInt = Buffer.from(headerSuffixString).readInt32LE(0);
+const headerPrefixInt = Buffer.from(headerPrefix).readInt32LE(0);
+
+const messageLengthBuffer = new ArrayBuffer(12);
+const messageLengthDataView = new DataView(messageLengthBuffer);
+messageLengthDataView.setInt32(0, headerPrefixInt, true);
+messageLengthDataView.setInt32(8, headerSuffixInt, true);
+
+function writeJSONMessageToBuffer(message: any) {
+ const asString = JSON.stringify(message);
+ const byteLength = Buffer.byteLength(asString, "utf8");
+ const buffer = Buffer.allocUnsafe(12 + byteLength);
+ buffer.writeInt32LE(headerPrefixInt, 0);
+ buffer.writeInt32LE(byteLength, 4);
+ buffer.writeInt32LE(headerSuffixInt, 8);
+ if (buffer.write(asString, 12, byteLength, "utf8") !== byteLength) {
+ throw new Error("Failed to write message to buffer");
+ }
+
+ return buffer;
+}
+
+let currentMessageLength = 0;
+const DEBUGGING = true;
+function extractMessageLengthAndOffsetFromBytes(buffer: Buffer, offset: number) {
+ const bufferLength = buffer.length;
+ while (offset < bufferLength) {
+ const headerStart = buffer.indexOf(headerPrefix, offset, "binary");
+ if (headerStart === -1) {
+ if (DEBUGGING) {
+ console.error("No header found in buffer of length " + bufferLength + " starting at offset " + offset);
+ }
+ return headerInvalidNumber;
+ }
+
+ // [headerPrefix (4), byteLength (4), headerSuffix (4)]
+ if (bufferLength <= headerStart + 12) {
+ if (DEBUGGING) {
+ console.error(
+ "Not enough bytes for header in buffer of length " + bufferLength + " starting at offset " + offset,
+ );
+ }
+ return headerInvalidNumber;
+ }
+
+ const prefix = buffer.readInt32LE(headerStart);
+ const byteLengthInt = buffer.readInt32LE(headerStart + 4);
+ const suffix = buffer.readInt32LE(headerStart + 8);
+
+ if (prefix !== headerPrefixInt || suffix !== headerSuffixInt) {
+ offset = headerStart + 1;
+ currentMessageLength = 0;
+
+ if (DEBUGGING) {
+ console.error(
+ "Invalid header in buffer of length " + bufferLength + " starting at offset " + offset + ": " + prefix,
+ byteLengthInt,
+ suffix,
+ );
+ }
+ continue;
+ }
+
+ if (byteLengthInt < 0) {
+ if (DEBUGGING) {
+ console.error(
+ "Invalid byteLength in buffer of length " + bufferLength + " starting at offset " + offset + ": " + prefix,
+ byteLengthInt,
+ suffix,
+ );
+ }
+
+ return headerInvalidNumber;
+ }
+
+ if (byteLengthInt === 0) {
+ // Ignore 0-length messages
+ // Shouldn't happen in practice
+ offset = headerStart + 12;
+ currentMessageLength = 0;
+
+ if (DEBUGGING) {
+ console.error(
+ "Ignoring 0-length message in buffer of length " + bufferLength + " starting at offset " + offset,
+ );
+ console.error({
+ buffer: buffer,
+ string: buffer.toString(),
+ });
+ }
+
+ continue;
+ }
+
+ currentMessageLength = byteLengthInt;
+
+ return headerStart + 12;
+ }
+
+ if (DEBUGGING) {
+ if (bufferLength > 0)
+ console.error("Header not found in buffer of length " + bufferLength + " starting at offset " + offset);
+ }
+
+ return headerInvalidNumber;
+}
+
+class StreamingReader {
+ pendingBuffer: Buffer;
+
+ constructor() {
+ this.pendingBuffer = Buffer.alloc(0);
+ }
+
+ *onMessage(chunk: Buffer) {
+ let buffer: Buffer;
+ if (this.pendingBuffer.length > 0) {
+ this.pendingBuffer = buffer = Buffer.concat([this.pendingBuffer, chunk]);
+ } else {
+ this.pendingBuffer = buffer = chunk;
+ }
+
+ currentMessageLength = 0;
+
+ for (
+ let offset = extractMessageLengthAndOffsetFromBytes(buffer, 0);
+ buffer.length > 0 && offset !== headerInvalidNumber;
+ currentMessageLength = 0, offset = extractMessageLengthAndOffsetFromBytes(buffer, 0)
+ ) {
+ const messageLength = currentMessageLength;
+ const start = offset;
+ const end = start + messageLength;
+ offset = end;
+ const messageChunk = buffer.slice(start, end);
+ this.pendingBuffer = buffer = buffer.slice(offset);
+ yield messageChunk.toString();
+ }
+ }
+}
+
+export class JSCClient {
+ #options: JSCClientOptions;
+ #requestId: number;
+ #pendingMessages: Buffer[];
+ #pendingRequests: Map<number, (result: unknown) => void>;
+ #socket: Socket;
+ #ready?: Promise<void>;
+ #reader = new StreamingReader();
+ signal?: AbortSignal;
+
+ constructor(options: JSCClientOptions) {
+ this.#options = options;
+ this.#socket = undefined;
+ this.#requestId = 1;
+
+ this.#pendingMessages = [];
+ this.#pendingRequests = new Map();
+ }
+
+ get ready(): Promise<void> {
+ if (!this.#ready) {
+ this.#ready = this.#connect();
+ }
+ return this.#ready;
+ }
+
+ #connect(): Promise<void> {
+ const { url, retry, onError, onResponse, onEvent, onClose } = this.#options;
+ let [host, port] = typeof url === "string" ? url.split(":") : [url.hostname, url.port];
+ if (port == null) {
+ if (host == null) {
+ host = "localhost";
+ port = "9229";
+ } else {
+ port = "9229";
+ }
+ }
+
+ if (host == null) {
+ host = "localhost";
+ }
+ var resolve,
+ reject,
+ promise = new Promise<void>((r1, r2) => {
+ resolve = r1;
+ reject = r2;
+ }),
+ socket: Socket;
+ let didConnect = false;
+
+ this.#socket = socket = createConnection(
+ {
+ host,
+ port: Number(port),
+ },
+ () => {
+ for (const message of this.#pendingMessages) {
+ this.#send(message);
+ }
+ this.#pendingMessages.length = 0;
+ didConnect = true;
+ resolve();
+ },
+ )
+ .once("error", e => {
+ const error = new Error(`Socket error: ${e?.message || e}`);
+ reject(error);
+ })
+ .on("data", buffer => {
+ for (const message of this.#reader.onMessage(buffer)) {
+ let received: JSC.Event | JSC.Response;
+ try {
+ received = JSON.parse(message);
+ } catch {
+ const error = new Error(`Invalid WebSocket data: ${inspect(message)}`);
+ onError?.(error);
+ return;
+ }
+ console.log({ received });
+ if ("id" in received) {
+ onResponse?.(received);
+ if ("error" in received) {
+ const { message, code = "?" } = received.error;
+ const error = new Error(`${message} [code: ${code}]`);
+ error.code = code;
+ onError?.(error);
+ this.#pendingRequests.get(received.id)?.(error);
+ } else {
+ this.#pendingRequests.get(received.id)?.(received.result);
+ }
+ } else {
+ onEvent?.(received);
+ }
+ }
+ })
+ .on("close", hadError => {
+ if (didConnect) {
+ onClose?.(hadError ? 1 : 0, "Socket closed");
+ }
+ });
+
+ return promise;
+ }
+
+ #send(message: any): void {
+ const socket = this.#socket;
+ const framed = writeJSONMessageToBuffer(message);
+ if (socket && !socket.connecting) {
+ socket.write(framed);
+ } else {
+ this.#pendingMessages.push(framed);
+ }
+ }
+
+ async fetch<T extends keyof JSC.RequestMap>(
+ method: T,
+ params?: JSC.Request<T>["params"],
+ ): Promise<JSC.ResponseMap[T]> {
+ const request: JSC.Request<T> = {
+ id: this.#requestId++,
+ method,
+ params,
+ };
+ this.#options.onRequest?.(request);
+ return new Promise((resolve, reject) => {
+ const done = (result: Error | JSC.ResponseMap[T]) => {
+ this.#pendingRequests.delete(request.id);
+ if (result instanceof Error) {
+ reject(result);
+ } else {
+ resolve(result);
+ }
+ };
+ this.#pendingRequests.set(request.id, done);
+ this.#send(request);
+ });
+ }
+
+ close(): void {
+ if (this.#socket) this.#socket.end();
+ }
+}