diff options
author | 2023-09-05 16:52:57 -0800 | |
---|---|---|
committer | 2023-09-05 17:52:57 -0700 | |
commit | 1bd5b245b8a55353e60a2decad507ef8014be044 (patch) | |
tree | 1a5cd5bcc7d7758bbfd154cf49470c1b0f3dc1bb /src/js | |
parent | acfd028e8f859a0e8139b7adab5d319e326c2373 (diff) | |
download | bun-1bd5b245b8a55353e60a2decad507ef8014be044.tar.gz bun-1bd5b245b8a55353e60a2decad507ef8014be044.tar.zst bun-1bd5b245b8a55353e60a2decad507ef8014be044.zip |
Align `process.nextTick` execution order with Node (#4409)
* Align `process.nextTick` execution order with Node
* some tests
* formatting
* fixups
* fix the test failures
* simplify the logic here
* push it up
---------
Co-authored-by: Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com>
Co-authored-by: dave caruso <me@paperdave.net>
Diffstat (limited to 'src/js')
-rw-r--r-- | src/js/builtins/ProcessObjectInternals.ts | 202 | ||||
-rw-r--r-- | src/js/out/WebCoreJSBuiltins.cpp | 8 | ||||
-rw-r--r-- | src/js/out/WebCoreJSBuiltins.h | 11 |
3 files changed, 221 insertions, 0 deletions
diff --git a/src/js/builtins/ProcessObjectInternals.ts b/src/js/builtins/ProcessObjectInternals.ts index e6c04c90f..2bb8648df 100644 --- a/src/js/builtins/ProcessObjectInternals.ts +++ b/src/js/builtins/ProcessObjectInternals.ts @@ -1,4 +1,5 @@ /* + * Copyright Joyent, Inc. and other Node contributors. * Copyright 2023 Codeblog Corp. All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -199,3 +200,204 @@ export function getStdinStream(fd) { return stream; } + +export function initializeNextTickQueue(process, nextTickQueue, drainMicrotasksFn, reportUncaughtExceptionFn) { + var queue; + var process; + var nextTickQueue = nextTickQueue; + var drainMicrotasks = drainMicrotasksFn; + var reportUncaughtException = reportUncaughtExceptionFn; + + function validateFunction(cb) { + if (typeof cb !== "function") { + const err = new TypeError(`The "callback" argument must be of type "function". Received type ${typeof cb}`); + err.code = "ERR_INVALID_ARG_TYPE"; + throw err; + } + } + + var setup; + setup = () => { + queue = (function createQueue() { + // Currently optimal queue size, tested on V8 6.0 - 6.6. Must be power of two. + const kSize = 2048; + const kMask = kSize - 1; + + // The FixedQueue is implemented as a singly-linked list of fixed-size + // circular buffers. It looks something like this: + // + // head tail + // | | + // v v + // +-----------+ <-----\ +-----------+ <------\ +-----------+ + // | [null] | \----- | next | \------- | next | + // +-----------+ +-----------+ +-----------+ + // | item | <-- bottom | item | <-- bottom | [empty] | + // | item | | item | | [empty] | + // | item | | item | | [empty] | + // | item | | item | | [empty] | + // | item | | item | bottom --> | item | + // | item | | item | | item | + // | ... | | ... | | ... | + // | item | | item | | item | + // | item | | item | | item | + // | [empty] | <-- top | item | | item | + // | [empty] | | item | | item | + // | [empty] | | [empty] | <-- top top --> | [empty] | + // +-----------+ +-----------+ +-----------+ + // + // Or, if there is only one circular buffer, it looks something + // like either of these: + // + // head tail head tail + // | | | | + // v v v v + // +-----------+ +-----------+ + // | [null] | | [null] | + // +-----------+ +-----------+ + // | [empty] | | item | + // | [empty] | | item | + // | item | <-- bottom top --> | [empty] | + // | item | | [empty] | + // | [empty] | <-- top bottom --> | item | + // | [empty] | | item | + // +-----------+ +-----------+ + // + // Adding a value means moving `top` forward by one, removing means + // moving `bottom` forward by one. After reaching the end, the queue + // wraps around. + // + // When `top === bottom` the current queue is empty and when + // `top + 1 === bottom` it's full. This wastes a single space of storage + // but allows much quicker checks. + + class FixedCircularBuffer { + constructor() { + this.bottom = 0; + this.top = 0; + this.list = $newArrayWithSize(kSize); + this.next = null; + } + + isEmpty() { + return this.top === this.bottom; + } + + isFull() { + return ((this.top + 1) & kMask) === this.bottom; + } + + push(data) { + this.list[this.top] = data; + this.top = (this.top + 1) & kMask; + } + + shift() { + var { list, bottom } = this; + const nextItem = list[bottom]; + if (nextItem === undefined) return null; + list[bottom] = undefined; + this.bottom = (bottom + 1) & kMask; + return nextItem; + } + } + + class FixedQueue { + constructor() { + this.head = this.tail = new FixedCircularBuffer(); + } + + isEmpty() { + return this.head.isEmpty(); + } + + push(data) { + if (this.head.isFull()) { + // Head is full: Creates a new queue, sets the old queue's `.next` to it, + // and sets it as the new main queue. + this.head = this.head.next = new FixedCircularBuffer(); + } + this.head.push(data); + } + + shift() { + const tail = this.tail; + const next = tail.shift(); + if (tail.isEmpty() && tail.next !== null) { + // If there is another queue, it forms the new tail. + this.tail = tail.next; + tail.next = null; + } + return next; + } + } + + return new FixedQueue(); + })(); + + function processTicksAndRejections() { + var tock; + do { + while ((tock = queue.shift()) !== null) { + var callback = tock.callback; + var args = tock.args; + var frame = tock.frame; + var restore = $getInternalField($asyncContext, 0); + $putInternalField($asyncContext, 0, frame); + try { + if (args === undefined) { + callback(); + } else { + switch (args.length) { + case 1: + callback(args[0]); + break; + case 2: + callback(args[0], args[1]); + break; + case 3: + callback(args[0], args[1], args[2]); + break; + case 4: + callback(args[0], args[1], args[2], args[3]); + break; + default: + callback(...args); + break; + } + } + } catch (e) { + reportUncaughtException(e); + } finally { + $putInternalField($asyncContext, 0, restore); + } + } + + drainMicrotasks(); + } while (!queue.isEmpty()); + } + + $putInternalField(nextTickQueue, 0, 0); + $putInternalField(nextTickQueue, 1, queue); + $putInternalField(nextTickQueue, 2, processTicksAndRejections); + setup = undefined; + }; + + function nextTick(cb, args) { + validateFunction(cb); + if (setup) { + setup(); + process = globalThis.process; + } + if (process._exiting) return; + + queue.push({ + callback: cb, + args: $argumentCount() > 1 ? Array.prototype.slice.$call(arguments, 1) : undefined, + frame: $getInternalField($asyncContext, 0), + }); + $putInternalField(nextTickQueue, 0, 1); + } + + return nextTick; +} diff --git a/src/js/out/WebCoreJSBuiltins.cpp b/src/js/out/WebCoreJSBuiltins.cpp index 1003fd522..d767a3c39 100644 --- a/src/js/out/WebCoreJSBuiltins.cpp +++ b/src/js/out/WebCoreJSBuiltins.cpp @@ -658,6 +658,14 @@ const int s_processObjectInternalsGetStdinStreamCodeLength = 1386; static const JSC::Intrinsic s_processObjectInternalsGetStdinStreamCodeIntrinsic = JSC::NoIntrinsic; const char* const s_processObjectInternalsGetStdinStreamCode = "(function (fd){\"use strict\";var reader,readerRef;function ref(){reader\?\?=@Bun.stdin.stream().getReader(),readerRef\?\?=setInterval(()=>{},1<<30)}function unref(){if(readerRef)clearInterval(readerRef),readerRef=@undefined;if(reader)reader.cancel(),reader=@undefined}const stream=new((@getInternalField(@internalModuleRegistry,44))||(@createInternalModuleById(44))).ReadStream(fd),originalOn=stream.on;stream.on=function(event,listener){if(event===\"readable\")ref();return originalOn.call(this,event,listener)},stream.fd=fd;const originalPause=stream.pause;stream.pause=function(){return unref(),originalPause.call(this)};const originalResume=stream.resume;stream.resume=function(){return ref(),originalResume.call(this)};async function internalRead(stream2){try{var done,value;const read=reader\?.readMany();if(@isPromise(read))({done,value}=await read);else({done,value}=read);if(!done){stream2.push(value[0]);const length=value.length;for(let i=1;i<length;i++)stream2.push(value[i])}else stream2.emit(\"end\"),stream2.pause()}catch(err){stream2.destroy(err)}}return stream._read=function(size){internalRead(this)},stream.on(\"resume\",()=>{ref(),stream._undestroy()}),stream._readableState.reading=!1,stream.on(\"pause\",()=>{process.nextTick(()=>{if(!stream.readableFlowing)stream._readableState.reading=!1})}),stream.on(\"close\",()=>{process.nextTick(()=>{stream.destroy(),unref()})}),stream})\n"; +// initializeNextTickQueue +const JSC::ConstructAbility s_processObjectInternalsInitializeNextTickQueueCodeConstructAbility = JSC::ConstructAbility::CannotConstruct; +const JSC::ConstructorKind s_processObjectInternalsInitializeNextTickQueueCodeConstructorKind = JSC::ConstructorKind::None; +const JSC::ImplementationVisibility s_processObjectInternalsInitializeNextTickQueueCodeImplementationVisibility = JSC::ImplementationVisibility::Public; +const int s_processObjectInternalsInitializeNextTickQueueCodeLength = 2336; +static const JSC::Intrinsic s_processObjectInternalsInitializeNextTickQueueCodeIntrinsic = JSC::NoIntrinsic; +const char* const s_processObjectInternalsInitializeNextTickQueueCode = "(function (process,nextTickQueue,drainMicrotasksFn,reportUncaughtExceptionFn){\"use strict\";var queue,process,nextTickQueue=nextTickQueue,drainMicrotasks=drainMicrotasksFn,reportUncaughtException=reportUncaughtExceptionFn;function validateFunction(cb){if(typeof cb!==\"function\"){const err=@makeTypeError(`The \"callback\" argument must be of type \"function\". Received type ${typeof cb}`);throw err.code=\"ERR_INVALID_ARG_TYPE\",err}}var setup=()=>{queue=function createQueue(){class FixedCircularBuffer{constructor(){this.bottom=0,this.top=0,this.list=@newArrayWithSize(2048),this.next=null}isEmpty(){return this.top===this.bottom}isFull(){return(this.top+1&2047)===this.bottom}push(data){this.list[this.top]=data,this.top=this.top+1&2047}shift(){var{list,bottom}=this;const nextItem=list[bottom];if(nextItem===@undefined)return null;return list[bottom]=@undefined,this.bottom=bottom+1&2047,nextItem}}class FixedQueue{constructor(){this.head=this.tail=new FixedCircularBuffer}isEmpty(){return this.head.isEmpty()}push(data){if(this.head.isFull())this.head=this.head.next=new FixedCircularBuffer;this.head.push(data)}shift(){const tail=this.tail,next=tail.shift();if(tail.isEmpty()&&tail.next!==null)this.tail=tail.next,tail.next=null;return next}}return new FixedQueue}();function processTicksAndRejections(){var tock;do{while((tock=queue.shift())!==null){var{callback,args,frame}=tock,restore=@getInternalField(@asyncContext,0);@putInternalField(@asyncContext,0,frame);try{if(args===@undefined)callback();else switch(args.length){case 1:callback(args[0]);break;case 2:callback(args[0],args[1]);break;case 3:callback(args[0],args[1],args[2]);break;case 4:callback(args[0],args[1],args[2],args[3]);break;default:callback(...args);break}}catch(e){reportUncaughtException(e)}finally{@putInternalField(@asyncContext,0,restore)}}drainMicrotasks()}while(!queue.isEmpty())}@putInternalField(nextTickQueue,0,0),@putInternalField(nextTickQueue,1,queue),@putInternalField(nextTickQueue,2,processTicksAndRejections),setup=@undefined};function nextTick(cb,args){if(validateFunction(cb),setup)setup(),process=globalThis.process;if(process._exiting)return;queue.push({callback:cb,args:@argumentCount()>1\?@Array.prototype.slice.@call(arguments,1):@undefined,frame:@getInternalField(@asyncContext,0)}),@putInternalField(nextTickQueue,0,1)}return nextTick})\n"; + #define DEFINE_BUILTIN_GENERATOR(codeName, functionName, overriddenName, argumentCount) \ JSC::FunctionExecutable* codeName##Generator(JSC::VM& vm) \ {\ diff --git a/src/js/out/WebCoreJSBuiltins.h b/src/js/out/WebCoreJSBuiltins.h index cf28fa82a..4fc91dbd9 100644 --- a/src/js/out/WebCoreJSBuiltins.h +++ b/src/js/out/WebCoreJSBuiltins.h @@ -1179,20 +1179,31 @@ extern const JSC::ConstructAbility s_processObjectInternalsGetStdinStreamCodeCon extern const JSC::ConstructorKind s_processObjectInternalsGetStdinStreamCodeConstructorKind; extern const JSC::ImplementationVisibility s_processObjectInternalsGetStdinStreamCodeImplementationVisibility; +// initializeNextTickQueue +#define WEBCORE_BUILTIN_PROCESSOBJECTINTERNALS_INITIALIZENEXTTICKQUEUE 1 +extern const char* const s_processObjectInternalsInitializeNextTickQueueCode; +extern const int s_processObjectInternalsInitializeNextTickQueueCodeLength; +extern const JSC::ConstructAbility s_processObjectInternalsInitializeNextTickQueueCodeConstructAbility; +extern const JSC::ConstructorKind s_processObjectInternalsInitializeNextTickQueueCodeConstructorKind; +extern const JSC::ImplementationVisibility s_processObjectInternalsInitializeNextTickQueueCodeImplementationVisibility; + #define WEBCORE_FOREACH_PROCESSOBJECTINTERNALS_BUILTIN_DATA(macro) \ macro(binding, processObjectInternalsBinding, 1) \ macro(getStdioWriteStream, processObjectInternalsGetStdioWriteStream, 1) \ macro(getStdinStream, processObjectInternalsGetStdinStream, 1) \ + macro(initializeNextTickQueue, processObjectInternalsInitializeNextTickQueue, 4) \ #define WEBCORE_FOREACH_PROCESSOBJECTINTERNALS_BUILTIN_CODE(macro) \ macro(processObjectInternalsBindingCode, binding, ASCIILiteral(), s_processObjectInternalsBindingCodeLength) \ macro(processObjectInternalsGetStdioWriteStreamCode, getStdioWriteStream, ASCIILiteral(), s_processObjectInternalsGetStdioWriteStreamCodeLength) \ macro(processObjectInternalsGetStdinStreamCode, getStdinStream, ASCIILiteral(), s_processObjectInternalsGetStdinStreamCodeLength) \ + macro(processObjectInternalsInitializeNextTickQueueCode, initializeNextTickQueue, ASCIILiteral(), s_processObjectInternalsInitializeNextTickQueueCodeLength) \ #define WEBCORE_FOREACH_PROCESSOBJECTINTERNALS_BUILTIN_FUNCTION_NAME(macro) \ macro(binding) \ macro(getStdioWriteStream) \ macro(getStdinStream) \ + macro(initializeNextTickQueue) \ #define DECLARE_BUILTIN_GENERATOR(codeName, functionName, overriddenName, argumentCount) \ JSC::FunctionExecutable* codeName##Generator(JSC::VM&); |