diff options
author | 2023-07-28 16:44:28 -0700 | |
---|---|---|
committer | 2023-07-28 16:44:28 -0700 | |
commit | d614fdfaac13346d71ecf24712abaefe8224687d (patch) | |
tree | 2fce0943c663db984755df123d96d788984cad22 /src/bun.js/bindings/ScriptExecutionContext.cpp | |
parent | 0a4e476a7c08005e242ed48f3f27895e55deacc9 (diff) | |
download | bun-d614fdfaac13346d71ecf24712abaefe8224687d.tar.gz bun-d614fdfaac13346d71ecf24712abaefe8224687d.tar.zst bun-d614fdfaac13346d71ecf24712abaefe8224687d.zip |
`MessageChannel` and `MessagePort` (#3860)
* copy and format
* copy
* copy
* cleanup
* some tests
* spellcheck
* add types
* don't lock getting contextId
* array buffer test
Diffstat (limited to 'src/bun.js/bindings/ScriptExecutionContext.cpp')
-rw-r--r-- | src/bun.js/bindings/ScriptExecutionContext.cpp | 137 |
1 files changed, 137 insertions, 0 deletions
diff --git a/src/bun.js/bindings/ScriptExecutionContext.cpp b/src/bun.js/bindings/ScriptExecutionContext.cpp index 47908b385..d9adbeb98 100644 --- a/src/bun.js/bindings/ScriptExecutionContext.cpp +++ b/src/bun.js/bindings/ScriptExecutionContext.cpp @@ -1,6 +1,7 @@ #include "root.h" #include "headers.h" #include "ScriptExecutionContext.h" +#include "MessagePort.h" #include "webcore/WebSocket.h" #include "libusockets.h" @@ -70,6 +71,23 @@ void ScriptExecutionContext::unrefEventLoop() Bun__eventLoop__incrementRefConcurrently(WebCore::clientData(vm())->bunVM, -1); } +ScriptExecutionContext::~ScriptExecutionContext() +{ + checkConsistency(); + + { + Locker locker { allScriptExecutionContextsMapLock }; + ASSERT_WITH_MESSAGE(!allScriptExecutionContextsMap().contains(m_identifier), "A ScriptExecutionContext subclass instance implementing postTask should have already removed itself from the map"); + } + + auto postMessageCompletionHandlers = WTFMove(m_processMessageWithMessagePortsSoonHandlers); + for (auto& completionHandler : postMessageCompletionHandlers) + completionHandler(); + + while (auto* destructionObserver = m_destructionObservers.takeAny()) + destructionObserver->contextDestroyed(); +} + bool ScriptExecutionContext::postTaskTo(ScriptExecutionContextIdentifier identifier, Function<void(ScriptExecutionContext&)>&& task) { Locker locker { allScriptExecutionContextsMapLock }; @@ -82,6 +100,125 @@ bool ScriptExecutionContext::postTaskTo(ScriptExecutionContextIdentifier identif return true; } +void ScriptExecutionContext::didCreateDestructionObserver(ContextDestructionObserver& observer) +{ + ASSERT(!m_inScriptExecutionContextDestructor); + m_destructionObservers.add(&observer); +} + +void ScriptExecutionContext::willDestroyDestructionObserver(ContextDestructionObserver& observer) +{ + m_destructionObservers.remove(&observer); +} + +extern "C" void* Bun__getVM(); + +bool ScriptExecutionContext::isContextThread() +{ + auto clientData = WebCore::clientData(vm()); + return clientData->bunVM == Bun__getVM(); +} + +bool ScriptExecutionContext::ensureOnContextThread(ScriptExecutionContextIdentifier identifier, Function<void(ScriptExecutionContext&)>&& task) +{ + ScriptExecutionContext* context = nullptr; + { + Locker locker { allScriptExecutionContextsMapLock }; + context = allScriptExecutionContextsMap().get(identifier); + + if (!context) + return false; + + if (!context->isContextThread()) { + context->postTaskConcurrently(WTFMove(task)); + return true; + } + } + + task(*context); + return true; +} + +bool ScriptExecutionContext::ensureOnMainThread(Function<void(ScriptExecutionContext&)>&& task) +{ + Locker locker { allScriptExecutionContextsMapLock }; + auto* context = allScriptExecutionContextsMap().get(1); + + if (!context) { + return false; + } + + context->postTaskConcurrently(WTFMove(task)); + return true; +} + +void ScriptExecutionContext::processMessageWithMessagePortsSoon(CompletionHandler<void()>&& completionHandler) +{ + ASSERT(isContextThread()); + m_processMessageWithMessagePortsSoonHandlers.append(WTFMove(completionHandler)); + + if (m_willProcessMessageWithMessagePortsSoon) { + return; + } + + m_willProcessMessageWithMessagePortsSoon = true; + + postTask([](ScriptExecutionContext& context) { + context.dispatchMessagePortEvents(); + }); +} + +void ScriptExecutionContext::dispatchMessagePortEvents() +{ + ASSERT(isContextThread()); + checkConsistency(); + + ASSERT(m_willprocessMessageWithMessagePortsSoon); + m_willProcessMessageWithMessagePortsSoon = false; + + auto completionHandlers = std::exchange(m_processMessageWithMessagePortsSoonHandlers, Vector<CompletionHandler<void()>> {}); + + // Make a frozen copy of the ports so we can iterate while new ones might be added or destroyed. + for (auto* messagePort : copyToVector(m_messagePorts)) { + // The port may be destroyed, and another one created at the same address, + // but this is harmless. The worst that can happen as a result is that + // dispatchMessages() will be called needlessly. + if (m_messagePorts.contains(messagePort) && messagePort->started()) + messagePort->dispatchMessages(); + } + + for (auto& completionHandler : completionHandlers) + completionHandler(); +} + +void ScriptExecutionContext::checkConsistency() const +{ + for (auto* messagePort : m_messagePorts) + ASSERT(messagePort->scriptExecutionContext() == this); + + for (auto* destructionObserver : m_destructionObservers) + ASSERT(destructionObserver->scriptExecutionContext() == this); + + // for (auto* activeDOMObject : m_activeDOMObjects) { + // ASSERT(activeDOMObject->scriptExecutionContext() == this); + // activeDOMObject->assertSuspendIfNeededWasCalled(); + // } +} + +void ScriptExecutionContext::createdMessagePort(MessagePort& messagePort) +{ + ASSERT(isContextThread()); + + m_messagePorts.add(&messagePort); +} + +void ScriptExecutionContext::destroyedMessagePort(MessagePort& messagePort) +{ + ASSERT(isContextThread()); + + m_messagePorts.remove(&messagePort); +} + us_socket_context_t* ScriptExecutionContext::webSocketContextNoSSL() { if (!m_client_websockets_ctx) { |