aboutsummaryrefslogtreecommitdiff
path: root/src/js/node/http2.ts
diff options
context:
space:
mode:
Diffstat (limited to 'src/js/node/http2.ts')
-rw-r--r--src/js/node/http2.ts484
1 files changed, 482 insertions, 2 deletions
diff --git a/src/js/node/http2.ts b/src/js/node/http2.ts
index f8d5058c2..01003f34b 100644
--- a/src/js/node/http2.ts
+++ b/src/js/node/http2.ts
@@ -2,8 +2,485 @@
// This is a stub! None of this is actually implemented yet.
const { hideFromStack, throwNotImplemented } = require("$shared");
-function connect() {
- throwNotImplemented("node:http2 connect", 887);
+const tls = require("node:tls");
+const net = require("node:net");
+type Socket = typeof net.Socket;
+type TLSSocket = typeof tls.TLSSocket;
+const EventEmitter = require("node:events");
+const { Duplex } = require("node:stream");
+const { H2FrameParser } = $lazy("internal/http2");
+
+
+type NativeHttp2HeaderValue = {
+ name: string;
+ value: string;
+ neverIndex?: boolean;
+};
+
+type Settings = {
+ headerTableSize: number;
+ enablePush: number;
+ maxConcurrentStreams: number;
+ initialWindowSize: number;
+ maxFrameSize: number;
+ maxHeaderListSize: number;
+};
+
+
+class Http2Session extends EventEmitter {
+
+}
+
+const http2 = {
+ sensitiveHeaders: Symbol("bun.http2.sensitiveHeaders"),
+};
+
+class ClientStream extends EventEmitter {
+
+}
+class Http2Stream extends EventEmitter {
+
+}
+
+const bunHTTP2Write = Symbol.for("::bunhttp2write::");
+const bunHTTP2StreamResponded = Symbol.for("::bunhttp2hasResponded::");
+const bunHTTP2StreamReadQueue = Symbol.for("::bunhttp2ReadQueue::");
+
+function reduceToCompatibleHeaders(obj: any, currentValue: any) {
+ let { name, value } = currentValue;
+ if(name === ":status") {
+ value = parseInt(value, 10);
+ }
+ const lastValue = obj[name];
+ if(typeof lastValue === "string" || typeof lastValue === "number") {
+ obj[name] = [obj[name], value];
+ } else if(Array.isArray(lastValue)) {
+ obj[name].push(value);
+ } else {
+ obj[name] = value;
+ }
+ return obj;
+}
+
+class ClientHttp2Stream extends Duplex {
+ #id: number;
+ #session: ClientHttp2Session | null = null;
+ [bunHTTP2StreamReadQueue]: Array<Buffer> = $createFIFO();
+ [bunHTTP2StreamResponded]: boolean = false;
+ constructor(streamId, session) {
+ super();
+ this.#id = streamId;
+ this.#session = session;
+ }
+
+ get session() {
+ return this.#session;
+ }
+
+ respond() {
+ // not implemented yet
+ }
+
+ _destroy(err, callback) {
+
+ callback(err);
+ }
+
+ _final(callback) {
+
+ callback();
+ }
+
+ _read(size) {
+ const queue = this[bunHTTP2StreamReadQueue];
+ let chunk;
+ while ((chunk = queue.peek())) {
+ if (!this.push(chunk)) return;
+ queue.shift();
+ }
+ }
+
+ _write(chunk, encoding, callback) {
+ if (typeof chunk == "string" && encoding !== "ascii") chunk = Buffer.from(chunk, encoding);
+ const session = this.#session;
+ if(session) {
+ session[bunHTTP2Write](this.#id, chunk);
+ if(typeof callback == "function") {
+ callback();
+ }
+ }
+ }
+}
+
+class ClientHttp2Session extends Http2Session{
+ #closed: boolean = false;
+ #queue: Array<Buffer> = [];
+ #connecions: number = 0;
+ #socket: TLSSocket | Socket | null;
+ #parser: typeof H2FrameParser | null;
+ #url: URL;
+ #originSet = new Set<string>();
+ #streams = new Map<number, any>();
+ #isServer: boolean = false;
+ #localSettings: Settings | null = {
+ headerTableSize: 4096,
+ enablePush: 1,
+ maxConcurrentStreams: 100,
+ initialWindowSize: 65535,
+ maxFrameSize: 16384,
+ maxHeaderListSize: 65535,
+ };
+ #pendingSettingsAck: boolean = true;
+ #remoteSettings: Settings | null = null;
+
+ static #Handlers = {
+ binaryType: "buffer",
+ streamStart(self: ClientHttp2Session, streamId: number){
+ self.#connecions++;
+ },
+ streamError(self: ClientHttp2Session, streamId: number, error: number){
+ // var stream = self.#streams.get(streamId);
+ // if(stream) {
+ // stream.state = 2;
+ // stream.error = error;
+ // }
+ },
+ streamEnd(self: ClientHttp2Session, streamId: number){
+ self.#connecions--;
+ var stream = self.#streams.get(streamId);
+ if(stream) {
+ stream.emit("end");
+ }
+ if(self.#connecions === 0 && self.#closed) {
+ self.#socket?.end();
+ self.#parser?.detach();
+ self.#parser = null;
+ }
+ },
+ streamData(self: ClientHttp2Session, streamId: number, data: Buffer){
+ var stream = self.#streams.get(streamId);
+ if(stream) {
+ const queue = stream[bunHTTP2StreamReadQueue];
+
+ if (queue.isEmpty()) {
+ if (stream.push(data)) return;
+ }
+ queue.push(data);
+ }
+ },
+ streamHeaders(self: ClientHttp2Session, streamId: number, headers: Array<NativeHttp2HeaderValue>, flags: number){
+ var stream = self.#streams.get(streamId);
+ if(stream) {
+ if(stream[bunHTTP2StreamResponded]) {
+ stream.emit("trailers", headers.reduce(reduceToCompatibleHeaders), flags);
+ } else {
+ stream[bunHTTP2StreamResponded] = true;
+ stream.emit("response", headers.reduce(reduceToCompatibleHeaders), flags);
+ }
+ }
+ },
+ localSettings(self: ClientHttp2Session, settings: Settings){
+ self.emit("localSettings", settings);
+ self.#localSettings = settings;
+ self.#pendingSettingsAck = false;
+ },
+ remoteSettings(self: ClientHttp2Session, settings: Settings){
+ self.emit("remoteSettings", settings);
+ self.#remoteSettings = settings;
+ },
+ ping(self: ClientHttp2Session, ping: Buffer){
+ self.emit("ping", ping);
+ },
+ error(self: ClientHttp2Session, errorCode: number, lastStreamId: number, opaqueData: Buffer){
+ self.emit("error", new Error("ERROR_HTTP2"));
+ self.#socket?.end();
+ self.#parser?.detach();
+ self.#parser = null;
+ },
+ goaway(self: ClientHttp2Session, errorCode: number, lastStreamId: number, opaqueData: Buffer){
+ self.emit("goaway", errorCode, lastStreamId, opaqueData);
+ self.#socket?.end();
+ self.#parser?.detach();
+ self.#parser = null;
+ },
+ end(self: ClientHttp2Session, errorCode: number, lastStreamId: number, opaqueData: Buffer){
+ self.#socket?.end();
+ self.#parser.detach();
+ self.#parser = null;
+ },
+ write(self: ClientHttp2Session, buffer: Buffer){
+ const socket = self.#socket;
+ if(self.#closed) {
+ //queue
+ self.#queue.push(buffer);
+ } else {
+ // redirect writes to socket
+ socket.write(buffer);
+ }
+ }
+ }
+
+ #onRead(data: Buffer){
+ this.#parser?.read(data);
+ }
+ get originSet() {
+ if(this.encrypted) {
+ return Array.from(this.#originSet);
+ }
+ }
+ get alpnProtocol() {
+
+ const socket = this.#socket;
+ if(!socket) return;
+ return (socket as TLSSocket).alpnProtocol;
+ }
+ #onConnect(){
+ this.#closed = false;
+ const socket = this.#socket as TLSSocket;
+ if (socket.alpnProtocol !== "h2") {
+ socket.end();
+ this.emit("error", new Error("h2 is not supported"));
+ }
+ this.#originSet.add(socket.remoteAddress as string);
+ socket.on("data", this.#onRead.bind(this));
+ // connected!
+ this.emit("connect", this, socket);
+
+ // redirect the queued buffers
+ const queue = this.#queue;
+ while(queue.length) {
+ socket.write(queue.shift());
+ }
+ }
+
+ #onClose(){
+ this.#parser?.detach();
+ this.#parser = null;
+ this.emit("close");
+ this.#socket = null;
+ }
+ #onError(error: Error){
+ this.#parser?.detach();
+ this.#parser = null;
+ this.emit("error", error);
+ }
+ #onTimeout(){
+ this.#parser?.detach();
+ this.#parser = null;
+ this.emit("timeout");
+ }
+
+ get connected() {
+ return this.#socket?.connecting === false;
+ }
+ get destroyed() {
+ return this.#socket === null;
+ }
+ get encrypted() {
+ if(!this.#socket) return;
+
+ return (this.#socket instanceof TLSSocket);
+ }
+ get closed() {
+ return this.#closed;
+ }
+
+ get remoteSettings() {
+ return this.#remoteSettings;
+ }
+
+ get localSettings() {
+ return this.#localSettings;
+ }
+
+ get pendingSettingsAck() {
+ return this.#pendingSettingsAck;
+ }
+
+ get type(){
+ if(this.#isServer) return 0;
+ return 1;
+ }
+ unref() {
+ return this.#socket?.unref();
+ }
+ ref() {
+ return this.#socket?.ref();
+ }
+ setTimeout(msecs, callback) {
+ return this.#socket?.setTimeout(msecs, callback);
+ }
+ ping(payload, callback) {
+ if(typeof callback === "function") {
+ this.once("ping", callback);
+ }
+ payload = payload || Buffer.alloc(8);
+ if(payload.byteLength !== 8) {
+ throw new Error("ERR_HTTP2_PING_PAYLOAD_SIZE");
+ }
+ this.#parser?.ping(payload);
+ return this.#parser && this.#socket ? true : false;
+ }
+ goaway(errorCode, lastStreamId, opaqueData) {
+ return this.#parser?.goaway(errorCode, lastStreamId, opaqueData);
+ }
+ setLocalWindowSize(windowSize) {
+ return this.#parser?.setLocalWindowSize(windowSize);
+ }
+ get socket() {
+ // TODO: Proxy socket
+ return this.#socket;
+ }
+ get state() {
+ return this.#parser?.getCurrentState();
+ }
+
+ settings(settings: Settings, callback) {
+ this.#pendingSettingsAck = true;
+ this.#parser?.settings(settings);
+ if(callback) {
+ const start = Date.now();
+ this.once("localSettings", ()=> {
+ callback(null, this.#localSettings, Date.now() - start);
+ });
+ }
+ }
+
+ constructor(url: string | URL, options?: Settings) {
+ super();
+
+ if(typeof url === "string") {
+ url = new URL(url);
+ }
+ if(!(url instanceof URL)) {
+ throw new Error("ERR_HTTP2: Invalid URL");
+ }
+ this.#isServer = true;
+ this.#url = url;
+ this.#socket = tls.connect({
+ host: url.hostname,
+ port: url.port ? parseInt(url.port, 10) : (url.protocol === "https:" ? 443 : 80),
+ ALPNProtocols: ["h2", "http/1.1"],
+ }, this.#onConnect.bind(this));
+ this.#parser = new H2FrameParser({
+ context: this,
+ settings: options,
+ handlers: ClientHttp2Session.#Handlers
+ });
+ this.#socket.on("close", this.#onClose.bind(this));
+ this.#socket.on("error", this.#onError.bind(this));
+ this.#socket.on("timeout", this.#onTimeout.bind(this));
+ }
+
+ request(headers: any, options?: any) {
+ if(!(headers instanceof Object)) {
+ throw new Error("ERROR_HTTP2: Invalid headers");
+ }
+ const flat_headers: Array<NativeHttp2HeaderValue> = [];
+ let has_scheme = false;
+ let has_authority = false;
+ let method: string | null = null;
+
+ Object.keys(headers).forEach(key => {
+ //@ts-ignore
+ if (key === http2.sensitiveHeaders) {
+ const name = headers[0];
+ const values = headers[key];
+ if(Array.isArray(values) === false) {
+ throw new Error("ERROR_HTTP2: Invalid sensitiveHeaders");
+ }
+ switch(name) {
+ case ":scheme":
+ has_scheme = true;
+ break;
+ case ":authority":
+ has_authority = true;
+ break;
+ case ":method":
+ method = values[1]?.toString();
+ break;
+ };
+
+ if(name === ":scheme") {
+ has_scheme = true;
+ } else if(name === ":authority") {
+ has_authority = true;
+ }
+ for(let i = 1; i < values.length; i++){
+ flat_headers.push({ name: name, value: values[i]?.toString(), neverIndex: true });
+ }
+ } else {
+ switch(key) {
+ case ":scheme":
+ has_scheme = true;
+ break;
+ case ":authority":
+ has_authority = true;
+ break;
+ case ":method":
+ method = headers[key]?.toString() || "GET";
+ break;
+ }
+
+ const value = headers[key];
+ if(Array.isArray(value)) {
+ for(let i = 0; i < value.length; i++){
+ flat_headers.push({ name: key, value: value[i]?.toString(), neverIndex: true });
+ }
+ } else {
+ flat_headers.push({ name: key, value: value?.toString() });
+ }
+ }
+ });
+
+ const url = this.#url;
+ if(!has_scheme) {
+ let protocol: string = options?.protocol || "https";
+ switch(url.protocol) {
+ case "https:":
+ protocol = "https";
+ break
+ case "http:":
+ protocol = "http";
+ break;
+ };
+
+ flat_headers.push({ name: ":scheme", value: protocol });
+ }
+ if(!has_authority) {
+ flat_headers.push({ name: ":authority", value: url.hostname });
+ }
+ if(!method) {
+ method = "GET";
+ flat_headers.push({ name: ":method", value: method });
+ }
+
+ let stream_id: number;
+ if(arguments.length === 1) {
+ stream_id = this.#parser.request(flat_headers);
+ } else {
+ stream_id = this.#parser.request(flat_headers, options);
+ }
+ const req = new ClientHttp2Stream(stream_id, this);
+ this.#streams.set(stream_id, req);
+ return req;
+ }
+ static connect(url: string | URL, options?: Settings) {
+ if(options) {
+ return new ClientHttp2Session(url, options);
+ }
+ return new ClientHttp2Session(url);
+ }
+ [bunHTTP2Write](streamId: number, chunk: Buffer) {
+ this.#parser?.writeStream(streamId, chunk);
+ }
+}
+
+function connect(url: string | URL, options?: Settings) {
+ if(options) {
+ return ClientHttp2Session.connect(url, options);
+ }
+ return ClientHttp2Session.connect(url);
}
const constants = {
NGHTTP2_ERR_FRAME_SIZE_ERROR: -522,
@@ -293,6 +770,7 @@ export default {
Http2ServerRequest,
Http2ServerResponse,
connect,
+ ClientHttp2Session,
};
hideFromStack([
@@ -304,4 +782,6 @@ hideFromStack([
getDefaultSettings,
getPackedSettings,
getUnpackedSettings,
+ ClientHttp2Session,
+ ClientHttp2Stream,
]);