diff options
Diffstat (limited to 'src/js/node/http2.ts')
-rw-r--r-- | src/js/node/http2.ts | 484 |
1 files changed, 482 insertions, 2 deletions
diff --git a/src/js/node/http2.ts b/src/js/node/http2.ts index f8d5058c2..01003f34b 100644 --- a/src/js/node/http2.ts +++ b/src/js/node/http2.ts @@ -2,8 +2,485 @@ // This is a stub! None of this is actually implemented yet. const { hideFromStack, throwNotImplemented } = require("$shared"); -function connect() { - throwNotImplemented("node:http2 connect", 887); +const tls = require("node:tls"); +const net = require("node:net"); +type Socket = typeof net.Socket; +type TLSSocket = typeof tls.TLSSocket; +const EventEmitter = require("node:events"); +const { Duplex } = require("node:stream"); +const { H2FrameParser } = $lazy("internal/http2"); + + +type NativeHttp2HeaderValue = { + name: string; + value: string; + neverIndex?: boolean; +}; + +type Settings = { + headerTableSize: number; + enablePush: number; + maxConcurrentStreams: number; + initialWindowSize: number; + maxFrameSize: number; + maxHeaderListSize: number; +}; + + +class Http2Session extends EventEmitter { + +} + +const http2 = { + sensitiveHeaders: Symbol("bun.http2.sensitiveHeaders"), +}; + +class ClientStream extends EventEmitter { + +} +class Http2Stream extends EventEmitter { + +} + +const bunHTTP2Write = Symbol.for("::bunhttp2write::"); +const bunHTTP2StreamResponded = Symbol.for("::bunhttp2hasResponded::"); +const bunHTTP2StreamReadQueue = Symbol.for("::bunhttp2ReadQueue::"); + +function reduceToCompatibleHeaders(obj: any, currentValue: any) { + let { name, value } = currentValue; + if(name === ":status") { + value = parseInt(value, 10); + } + const lastValue = obj[name]; + if(typeof lastValue === "string" || typeof lastValue === "number") { + obj[name] = [obj[name], value]; + } else if(Array.isArray(lastValue)) { + obj[name].push(value); + } else { + obj[name] = value; + } + return obj; +} + +class ClientHttp2Stream extends Duplex { + #id: number; + #session: ClientHttp2Session | null = null; + [bunHTTP2StreamReadQueue]: Array<Buffer> = $createFIFO(); + [bunHTTP2StreamResponded]: boolean = false; + constructor(streamId, session) { + super(); + this.#id = streamId; + this.#session = session; + } + + get session() { + return this.#session; + } + + respond() { + // not implemented yet + } + + _destroy(err, callback) { + + callback(err); + } + + _final(callback) { + + callback(); + } + + _read(size) { + const queue = this[bunHTTP2StreamReadQueue]; + let chunk; + while ((chunk = queue.peek())) { + if (!this.push(chunk)) return; + queue.shift(); + } + } + + _write(chunk, encoding, callback) { + if (typeof chunk == "string" && encoding !== "ascii") chunk = Buffer.from(chunk, encoding); + const session = this.#session; + if(session) { + session[bunHTTP2Write](this.#id, chunk); + if(typeof callback == "function") { + callback(); + } + } + } +} + +class ClientHttp2Session extends Http2Session{ + #closed: boolean = false; + #queue: Array<Buffer> = []; + #connecions: number = 0; + #socket: TLSSocket | Socket | null; + #parser: typeof H2FrameParser | null; + #url: URL; + #originSet = new Set<string>(); + #streams = new Map<number, any>(); + #isServer: boolean = false; + #localSettings: Settings | null = { + headerTableSize: 4096, + enablePush: 1, + maxConcurrentStreams: 100, + initialWindowSize: 65535, + maxFrameSize: 16384, + maxHeaderListSize: 65535, + }; + #pendingSettingsAck: boolean = true; + #remoteSettings: Settings | null = null; + + static #Handlers = { + binaryType: "buffer", + streamStart(self: ClientHttp2Session, streamId: number){ + self.#connecions++; + }, + streamError(self: ClientHttp2Session, streamId: number, error: number){ + // var stream = self.#streams.get(streamId); + // if(stream) { + // stream.state = 2; + // stream.error = error; + // } + }, + streamEnd(self: ClientHttp2Session, streamId: number){ + self.#connecions--; + var stream = self.#streams.get(streamId); + if(stream) { + stream.emit("end"); + } + if(self.#connecions === 0 && self.#closed) { + self.#socket?.end(); + self.#parser?.detach(); + self.#parser = null; + } + }, + streamData(self: ClientHttp2Session, streamId: number, data: Buffer){ + var stream = self.#streams.get(streamId); + if(stream) { + const queue = stream[bunHTTP2StreamReadQueue]; + + if (queue.isEmpty()) { + if (stream.push(data)) return; + } + queue.push(data); + } + }, + streamHeaders(self: ClientHttp2Session, streamId: number, headers: Array<NativeHttp2HeaderValue>, flags: number){ + var stream = self.#streams.get(streamId); + if(stream) { + if(stream[bunHTTP2StreamResponded]) { + stream.emit("trailers", headers.reduce(reduceToCompatibleHeaders), flags); + } else { + stream[bunHTTP2StreamResponded] = true; + stream.emit("response", headers.reduce(reduceToCompatibleHeaders), flags); + } + } + }, + localSettings(self: ClientHttp2Session, settings: Settings){ + self.emit("localSettings", settings); + self.#localSettings = settings; + self.#pendingSettingsAck = false; + }, + remoteSettings(self: ClientHttp2Session, settings: Settings){ + self.emit("remoteSettings", settings); + self.#remoteSettings = settings; + }, + ping(self: ClientHttp2Session, ping: Buffer){ + self.emit("ping", ping); + }, + error(self: ClientHttp2Session, errorCode: number, lastStreamId: number, opaqueData: Buffer){ + self.emit("error", new Error("ERROR_HTTP2")); + self.#socket?.end(); + self.#parser?.detach(); + self.#parser = null; + }, + goaway(self: ClientHttp2Session, errorCode: number, lastStreamId: number, opaqueData: Buffer){ + self.emit("goaway", errorCode, lastStreamId, opaqueData); + self.#socket?.end(); + self.#parser?.detach(); + self.#parser = null; + }, + end(self: ClientHttp2Session, errorCode: number, lastStreamId: number, opaqueData: Buffer){ + self.#socket?.end(); + self.#parser.detach(); + self.#parser = null; + }, + write(self: ClientHttp2Session, buffer: Buffer){ + const socket = self.#socket; + if(self.#closed) { + //queue + self.#queue.push(buffer); + } else { + // redirect writes to socket + socket.write(buffer); + } + } + } + + #onRead(data: Buffer){ + this.#parser?.read(data); + } + get originSet() { + if(this.encrypted) { + return Array.from(this.#originSet); + } + } + get alpnProtocol() { + + const socket = this.#socket; + if(!socket) return; + return (socket as TLSSocket).alpnProtocol; + } + #onConnect(){ + this.#closed = false; + const socket = this.#socket as TLSSocket; + if (socket.alpnProtocol !== "h2") { + socket.end(); + this.emit("error", new Error("h2 is not supported")); + } + this.#originSet.add(socket.remoteAddress as string); + socket.on("data", this.#onRead.bind(this)); + // connected! + this.emit("connect", this, socket); + + // redirect the queued buffers + const queue = this.#queue; + while(queue.length) { + socket.write(queue.shift()); + } + } + + #onClose(){ + this.#parser?.detach(); + this.#parser = null; + this.emit("close"); + this.#socket = null; + } + #onError(error: Error){ + this.#parser?.detach(); + this.#parser = null; + this.emit("error", error); + } + #onTimeout(){ + this.#parser?.detach(); + this.#parser = null; + this.emit("timeout"); + } + + get connected() { + return this.#socket?.connecting === false; + } + get destroyed() { + return this.#socket === null; + } + get encrypted() { + if(!this.#socket) return; + + return (this.#socket instanceof TLSSocket); + } + get closed() { + return this.#closed; + } + + get remoteSettings() { + return this.#remoteSettings; + } + + get localSettings() { + return this.#localSettings; + } + + get pendingSettingsAck() { + return this.#pendingSettingsAck; + } + + get type(){ + if(this.#isServer) return 0; + return 1; + } + unref() { + return this.#socket?.unref(); + } + ref() { + return this.#socket?.ref(); + } + setTimeout(msecs, callback) { + return this.#socket?.setTimeout(msecs, callback); + } + ping(payload, callback) { + if(typeof callback === "function") { + this.once("ping", callback); + } + payload = payload || Buffer.alloc(8); + if(payload.byteLength !== 8) { + throw new Error("ERR_HTTP2_PING_PAYLOAD_SIZE"); + } + this.#parser?.ping(payload); + return this.#parser && this.#socket ? true : false; + } + goaway(errorCode, lastStreamId, opaqueData) { + return this.#parser?.goaway(errorCode, lastStreamId, opaqueData); + } + setLocalWindowSize(windowSize) { + return this.#parser?.setLocalWindowSize(windowSize); + } + get socket() { + // TODO: Proxy socket + return this.#socket; + } + get state() { + return this.#parser?.getCurrentState(); + } + + settings(settings: Settings, callback) { + this.#pendingSettingsAck = true; + this.#parser?.settings(settings); + if(callback) { + const start = Date.now(); + this.once("localSettings", ()=> { + callback(null, this.#localSettings, Date.now() - start); + }); + } + } + + constructor(url: string | URL, options?: Settings) { + super(); + + if(typeof url === "string") { + url = new URL(url); + } + if(!(url instanceof URL)) { + throw new Error("ERR_HTTP2: Invalid URL"); + } + this.#isServer = true; + this.#url = url; + this.#socket = tls.connect({ + host: url.hostname, + port: url.port ? parseInt(url.port, 10) : (url.protocol === "https:" ? 443 : 80), + ALPNProtocols: ["h2", "http/1.1"], + }, this.#onConnect.bind(this)); + this.#parser = new H2FrameParser({ + context: this, + settings: options, + handlers: ClientHttp2Session.#Handlers + }); + this.#socket.on("close", this.#onClose.bind(this)); + this.#socket.on("error", this.#onError.bind(this)); + this.#socket.on("timeout", this.#onTimeout.bind(this)); + } + + request(headers: any, options?: any) { + if(!(headers instanceof Object)) { + throw new Error("ERROR_HTTP2: Invalid headers"); + } + const flat_headers: Array<NativeHttp2HeaderValue> = []; + let has_scheme = false; + let has_authority = false; + let method: string | null = null; + + Object.keys(headers).forEach(key => { + //@ts-ignore + if (key === http2.sensitiveHeaders) { + const name = headers[0]; + const values = headers[key]; + if(Array.isArray(values) === false) { + throw new Error("ERROR_HTTP2: Invalid sensitiveHeaders"); + } + switch(name) { + case ":scheme": + has_scheme = true; + break; + case ":authority": + has_authority = true; + break; + case ":method": + method = values[1]?.toString(); + break; + }; + + if(name === ":scheme") { + has_scheme = true; + } else if(name === ":authority") { + has_authority = true; + } + for(let i = 1; i < values.length; i++){ + flat_headers.push({ name: name, value: values[i]?.toString(), neverIndex: true }); + } + } else { + switch(key) { + case ":scheme": + has_scheme = true; + break; + case ":authority": + has_authority = true; + break; + case ":method": + method = headers[key]?.toString() || "GET"; + break; + } + + const value = headers[key]; + if(Array.isArray(value)) { + for(let i = 0; i < value.length; i++){ + flat_headers.push({ name: key, value: value[i]?.toString(), neverIndex: true }); + } + } else { + flat_headers.push({ name: key, value: value?.toString() }); + } + } + }); + + const url = this.#url; + if(!has_scheme) { + let protocol: string = options?.protocol || "https"; + switch(url.protocol) { + case "https:": + protocol = "https"; + break + case "http:": + protocol = "http"; + break; + }; + + flat_headers.push({ name: ":scheme", value: protocol }); + } + if(!has_authority) { + flat_headers.push({ name: ":authority", value: url.hostname }); + } + if(!method) { + method = "GET"; + flat_headers.push({ name: ":method", value: method }); + } + + let stream_id: number; + if(arguments.length === 1) { + stream_id = this.#parser.request(flat_headers); + } else { + stream_id = this.#parser.request(flat_headers, options); + } + const req = new ClientHttp2Stream(stream_id, this); + this.#streams.set(stream_id, req); + return req; + } + static connect(url: string | URL, options?: Settings) { + if(options) { + return new ClientHttp2Session(url, options); + } + return new ClientHttp2Session(url); + } + [bunHTTP2Write](streamId: number, chunk: Buffer) { + this.#parser?.writeStream(streamId, chunk); + } +} + +function connect(url: string | URL, options?: Settings) { + if(options) { + return ClientHttp2Session.connect(url, options); + } + return ClientHttp2Session.connect(url); } const constants = { NGHTTP2_ERR_FRAME_SIZE_ERROR: -522, @@ -293,6 +770,7 @@ export default { Http2ServerRequest, Http2ServerResponse, connect, + ClientHttp2Session, }; hideFromStack([ @@ -304,4 +782,6 @@ hideFromStack([ getDefaultSettings, getPackedSettings, getUnpackedSettings, + ClientHttp2Session, + ClientHttp2Stream, ]); |