diff --git a/src/ipc/libmultiprocess/CMakeLists.txt b/src/ipc/libmultiprocess/CMakeLists.txt index a36023b1..ea91f98f 100644 --- a/src/ipc/libmultiprocess/CMakeLists.txt +++ b/src/ipc/libmultiprocess/CMakeLists.txt @@ -90,13 +90,6 @@ if(MP_ENABLE_CLANG_TIDY) message(FATAL_ERROR "MP_ENABLE_CLANG_TIDY is ON but clang-tidy is not found.") endif() set(CMAKE_CXX_CLANG_TIDY "${CLANG_TIDY_EXECUTABLE}") - - # Workaround for nix from https://gitlab.kitware.com/cmake/cmake/-/issues/20912#note_793338 - # Nix injects header paths via $NIX_CFLAGS_COMPILE; CMake tags these as - # CMAKE_CXX_IMPLICIT_INCLUDE_DIRECTORIES and omits them from the compile - # database, so clang-tidy, which ignores $NIX_CFLAGS_COMPILE, can't find capnp - # headers. Setting them as standard passes them to clang-tidy. - set(CMAKE_CXX_STANDARD_INCLUDE_DIRECTORIES ${CMAKE_CXX_IMPLICIT_INCLUDE_DIRECTORIES}) endif() option(MP_ENABLE_IWYU "Run include-what-you-use with the compiler." OFF) @@ -111,6 +104,15 @@ if(MP_ENABLE_IWYU) endif() endif() +if(MP_ENABLE_CLANG_TIDY OR MP_ENABLE_IWYU) + # Workaround for nix from https://gitlab.kitware.com/cmake/cmake/-/issues/20912#note_793338 + # Nix injects header paths via $NIX_CFLAGS_COMPILE; CMake tags these as + # CMAKE_CXX_IMPLICIT_INCLUDE_DIRECTORIES and omits them from the compile + # database, so clang-tidy, which ignores $NIX_CFLAGS_COMPILE, can't find capnp + # headers. Setting them as standard passes them to clang-tidy. + set(CMAKE_CXX_STANDARD_INCLUDE_DIRECTORIES ${CMAKE_CXX_IMPLICIT_INCLUDE_DIRECTORIES}) +endif() + include("cmake/compat_config.cmake") include("cmake/pthread_checks.cmake") include(GNUInstallDirs) @@ -137,6 +139,10 @@ configure_file(include/mp/config.h.in "${CMAKE_CURRENT_BINARY_DIR}/include/mp/co # Generated C++ Capn'Proto schema files capnp_generate_cpp(MP_PROXY_SRCS MP_PROXY_HDRS include/mp/proxy.capnp) set_source_files_properties("${MP_PROXY_SRCS}" PROPERTIES SKIP_LINTING TRUE) # Ignored before cmake 3.27 +# Build-graph node for generated headers. This lets targets that include +# the headers order themselves after generation without depending on the +# library target that also uses them. +add_custom_target(mp_headers DEPENDS ${MP_PROXY_HDRS}) # util library add_library(mputil OBJECT src/mp/util.cpp) diff --git a/src/ipc/libmultiprocess/cmake/TargetCapnpSources.cmake b/src/ipc/libmultiprocess/cmake/TargetCapnpSources.cmake index abcca70f..f04a9657 100644 --- a/src/ipc/libmultiprocess/cmake/TargetCapnpSources.cmake +++ b/src/ipc/libmultiprocess/cmake/TargetCapnpSources.cmake @@ -27,10 +27,8 @@ Arguments: #include - The specified include_prefix should be ${CMAKE_SOURCE_DIR} or a - subdirectory of it to include files relative to the project root. It can - be ${CMAKE_CURRENT_SOURCE_DIR} to include files relative to the current - source directory. + Pass ${CMAKE_CURRENT_SOURCE_DIR} or a subdirectory of it to include files + relative to the current source directory (the typical usage). Additional Unnamed Arguments: @@ -43,12 +41,19 @@ Optional Keyword Arguments: IMPORT_PATHS: Specifies additional directories to search for imported `.capnp` files. + ONLY_CAPNP: If specified, only the Cap'n Proto serialization files + (`.capnp.h`, `.capnp.c++`) are generated and compiled. The mpgen proxy + files (`.capnp.proxy-client.c++`, `.capnp.proxy-server.c++`, + `.capnp.proxy-types.c++`, etc.) are skipped. Useful when you need + Cap'n Proto serialization without the multiprocess RPC proxy + infrastructure. + Example: # Assuming `my_library` is a target and `lib/` contains `.capnp` schema # files with imports from `include/`. - target_capnp_sources(my_library "${CMAKE_SOURCE_DIR}" + target_capnp_sources(my_library "${CMAKE_CURRENT_SOURCE_DIR}" lib/schema1.capnp lib/schema2.capnp - IMPORT_PATHS ${CMAKE_SOURCE_DIR}/include) + IMPORT_PATHS ${CMAKE_CURRENT_SOURCE_DIR}/include) #]=] @@ -98,8 +103,8 @@ function(target_capnp_sources target include_prefix) # Translate include_prefix from a source path to a binary path and add it as a # target include directory. - set(build_include_prefix ${CMAKE_BINARY_DIR}) - file(RELATIVE_PATH relative_path ${CMAKE_SOURCE_DIR} ${include_prefix}) + set(build_include_prefix ${CMAKE_CURRENT_BINARY_DIR}) + file(RELATIVE_PATH relative_path ${CMAKE_CURRENT_SOURCE_DIR} ${include_prefix}) if(relative_path) string(APPEND build_include_prefix "/" "${relative_path}") endif() diff --git a/src/ipc/libmultiprocess/doc/design.md b/src/ipc/libmultiprocess/doc/design.md index 113cafc4..b8bc5fd0 100644 --- a/src/ipc/libmultiprocess/doc/design.md +++ b/src/ipc/libmultiprocess/doc/design.md @@ -15,7 +15,7 @@ Libmultiprocess acts as a pure wrapper or layer over the underlying protocol. Cl ## Core Architecture -The `ProxyClient` and `ProxyServer` generated classes are not directly exposed to the user, as described in [usage.md](usage.md). Instead, they wrap C++ interfaces and appear to the user as pointers to an interface. They are first instantiated when calling `ConnectStream` and `ServeStream` respectively for creating the `InitInterface`. These methods establish connections through sockets, internally creating `Connection` objects wrapping a `capnp::RpcSystem` configured for client and server mode respectively. +The `ProxyServer` generated class is not directly exposed to the user. The `ProxyClient` generated class inherits from the C++ interface class, so user code interacts with it through the abstract interface type, and the `ProxyClient` type itself is generally not visible or accessible without a cast. `ConnectStream` returns a `unique_ptr>` as an exception for the root Init interface, but even there users typically treat it as a pointer to the abstract `InitInterface`. For all interfaces returned by Init methods (e.g., `Printer`, `Calculator`), the return type is the abstract class pointer, hiding the underlying `ProxyClient` entirely. They are first instantiated when calling `ConnectStream` and `ServeStream` respectively for creating the `InitInterface`. These methods establish connections through sockets, internally creating `Connection` objects wrapping a `capnp::RpcSystem` configured for client and server mode respectively. The `InitInterface` interface will typically have methods which return other interfaces, giving the connecting process the ability to call other functions in the serving process. Interfaces can also have methods accepting other interfaces as parameters, giving serving processes the ability to call back and invoke functions in connecting processes. Creating new interfaces does not create new connections, and typically many interface objects will share the same connection. @@ -134,6 +134,8 @@ sequenceDiagram SF1->>serverInvoke: return ``` +Destroy methods are a special case: a capnp interface can declare a `destroy` method that is handled by `ServerDestroy` instead of `ServerCall`. Rather than dispatching through the `ServerField`/`ServerRet`/`ServerCall` chain to a C++ interface method, `ServerDestroy` calls `invokeDestroy()` on the `ProxyServer`, which resets `m_impl` and runs any registered cleanup functions, giving the client a way to synchronously destroy the wrapped object on the server side. + ## Advanced Features ### Callbacks diff --git a/src/ipc/libmultiprocess/doc/versions.md b/src/ipc/libmultiprocess/doc/versions.md index 2c2ec50e..3cfa28e3 100644 --- a/src/ipc/libmultiprocess/doc/versions.md +++ b/src/ipc/libmultiprocess/doc/versions.md @@ -7,9 +7,14 @@ Library versions are tracked with simple Versioning policy is described in the [version.h](../include/mp/version.h) include. -## v10 +## v11 - Current unstable version. +## [v10.0](https://github.com/bitcoin-core/libmultiprocess/commits/v10.0) +- Increases spawn test timeout to avoid spurious failures. +- Uses `throwRecoverableException` instead of raw `throw` to improve runtime error messages in macOS builds. +- Used in Bitcoin Core master branch, pulled in by [#34977](https://github.com/bitcoin/bitcoin/pull/34977). Also pulled into Bitcoin Core 31.x stable branch by [#35028](https://github.com/bitcoin/bitcoin/pull/35028). + ## [v9.0](https://github.com/bitcoin-core/libmultiprocess/commits/v9.0) - Fixes race conditions where worker thread could be used after destruction, where getParams() could be called after request cancel, and where m_on_cancel could be called after request finishes. - Adds `CustomHasField` hook to map Cap'n Proto null values to C++ null values. diff --git a/src/ipc/libmultiprocess/include/mp/proxy-io.h b/src/ipc/libmultiprocess/include/mp/proxy-io.h index d7b9f0e5..092ea42e 100644 --- a/src/ipc/libmultiprocess/include/mp/proxy-io.h +++ b/src/ipc/libmultiprocess/include/mp/proxy-io.h @@ -384,7 +384,7 @@ struct Waiter } template - void wait(Lock& lock, Predicate pred) + void wait(Lock& lock, Predicate pred) MP_REQUIRES(m_mutex) { m_cv.wait(lock.m_lock, [&]() MP_REQUIRES(m_mutex) { // Important for this to be "while (m_fn)", not "if (m_fn)" to avoid @@ -410,7 +410,7 @@ struct Waiter //! EventLoop::m_mutex as long as Waiter::mutex is locked first and //! EventLoop::m_mutex is locked second. Mutex m_mutex; - std::condition_variable m_cv; + std::condition_variable m_cv MP_GUARDED_BY(m_mutex); std::optional> m_fn MP_GUARDED_BY(m_mutex); }; @@ -511,6 +511,7 @@ ProxyClientBase::ProxyClientBase(typename Interface::Client cli : m_client(std::move(client)), m_context(connection) { + MP_LOG(*m_context.loop, Log::Debug) << "Creating " << CxxTypeName(*this) << " " << this; // Handler for the connection getting destroyed before this client object. auto disconnect_cb = m_context.connection->addSyncCleanup([this]() { // Release client capability by move-assigning to temporary. @@ -538,7 +539,12 @@ ProxyClientBase::ProxyClientBase(typename Interface::Client cli // the remote object, waiting for it to be deleted server side. If the // capnp interface does not define a destroy method, this will just call // an empty stub defined in the ProxyClientBase class and do nothing. - Sub::destroy(*this); + // Exceptions are caught and logged rather than propagated because + // ~ProxyClientBase is noexcept and the peer may be gone by the time + // this runs. + if (kj::runCatchingExceptions([&]{ Sub::destroy(*this); }) != nullptr) { + MP_LOG(*m_context.loop, Log::Warning) << "Remote destroy call failed during cleanup. Continuing."; + } // FIXME: Could just invoke removed addCleanup fn here instead of duplicating code m_context.loop->sync([&]() { @@ -567,13 +573,16 @@ ProxyClientBase::ProxyClientBase(typename Interface::Client cli template ProxyClientBase::~ProxyClientBase() noexcept { + MP_LOG(*m_context.loop, Log::Debug) << "Cleaning up " << CxxTypeName(*this) << " " << this; CleanupRun(m_context.cleanup_fns); + MP_LOG(*m_context.loop, Log::Debug) << "Destroying " << CxxTypeName(*this) << " " << this; } template ProxyServerBase::ProxyServerBase(std::shared_ptr impl, Connection& connection) : m_impl(std::move(impl)), m_context(&connection) { + MP_LOG(*m_context.loop, Log::Debug) << "Creating " << CxxTypeName(*this) << " " << this; assert(m_impl); } @@ -592,6 +601,7 @@ ProxyServerBase::ProxyServerBase(std::shared_ptr impl, Co template ProxyServerBase::~ProxyServerBase() { + MP_LOG(*m_context.loop, Log::Debug) << "Cleaning up " << CxxTypeName(*this) << " " << this; if (m_impl) { // If impl is non-null at this point, it means no client is waiting for // the m_impl server object to be destroyed synchronously. This can @@ -618,6 +628,7 @@ ProxyServerBase::~ProxyServerBase() }); } assert(m_context.cleanup_fns.empty()); + MP_LOG(*m_context.loop, Log::Debug) << "Destroying " << CxxTypeName(*this) << " " << this; } //! If the capnp interface defined a special "destroy" method, as described the diff --git a/src/ipc/libmultiprocess/include/mp/proxy-types.h b/src/ipc/libmultiprocess/include/mp/proxy-types.h index c13debf3..1ccf423f 100644 --- a/src/ipc/libmultiprocess/include/mp/proxy-types.h +++ b/src/ipc/libmultiprocess/include/mp/proxy-types.h @@ -280,6 +280,17 @@ struct ListOutput<::capnp::List> // clang-format on }; +template +void BuildList(TypeList, InvokeContext& invoke_context, Output&& output, Value&& value) +{ + auto list = output.init(value.size()); + size_t i = 0; + for (const auto& elem : value) { + BuildField(TypeList(), invoke_context, ListOutput(list, i), elem); + ++i; + } +} + template void CustomBuildField(TypeList, Priority<0>, InvokeContext& invoke_context, Value&& value, Output&& output) { @@ -644,17 +655,13 @@ struct CapRequestTraits<::capnp::Request<_Params, _Results>> template void clientDestroy(Client& client) { - if (client.m_context.connection) { - MP_LOG(*client.m_context.loop, Log::Debug) << "IPC client destroy " << typeid(client).name(); - } else { - KJ_LOG(INFO, "IPC interrupted client destroy", typeid(client).name()); - } + MP_LOG(*client.m_context.loop, Log::Debug) << "IPC client destroy " << CxxTypeName(client); } template void serverDestroy(Server& server) { - MP_LOG(*server.m_context.loop, Log::Debug) << "IPC server destroy " << typeid(server).name(); + MP_LOG(*server.m_context.loop, Log::Debug) << "IPC server destroy " << CxxTypeName(server); } //! Entry point called by generated client code that looks like: @@ -780,10 +787,11 @@ kj::Promise serverInvoke(Server& server, CallContext& call_context, Fn fn) using Params = decltype(params); using Results = typename decltype(call_context.getResults())::Builds; + EventLoop& loop = *server.m_context.loop; int req = ++server_reqs; - MP_LOG(*server.m_context.loop, Log::Debug) << "IPC server recv request #" << req << " " - << TypeName(); - MP_LOG(*server.m_context.loop, Log::Trace) << "request data: " + MP_LOG(loop, Log::Debug) << "IPC server recv request #" << req << " " + << TypeName(); + MP_LOG(loop, Log::Trace) << "request data: " << LogEscape(params.toString(), server.m_context.loop->m_log_opts.max_chars); try { @@ -799,16 +807,23 @@ kj::Promise serverInvoke(Server& server, CallContext& call_context, Fn fn) // and waiting for it to complete. return ReplaceVoid([&]() { return fn.invoke(server_context, ArgList()); }, [&]() { return kj::Promise(kj::mv(call_context)); }) - .then([&server, req](CallContext call_context) { - MP_LOG(*server.m_context.loop, Log::Debug) << "IPC server send response #" << req << " " << TypeName(); - MP_LOG(*server.m_context.loop, Log::Trace) << "response data: " - << LogEscape(call_context.getResults().toString(), server.m_context.loop->m_log_opts.max_chars); + .then([&loop, req](CallContext call_context) { + MP_LOG(loop, Log::Debug) << "IPC server send response #" << req << " " << TypeName(); + MP_LOG(loop, Log::Trace) << "response data: " + << LogEscape(call_context.getResults().toString(), loop.m_log_opts.max_chars); + }).catch_([&loop, req](::kj::Exception&& e) -> kj::Promise { + // Call failed for some reason. Cap'n Proto will try to send + // this error to the client as well, but it is good to log the + // failure early here and include the request number. + MP_LOG(loop, Log::Error) << "IPC server error request #" << req << " " << TypeName() + << " " << kj::str("kj::Exception: ", e.getDescription()).cStr(); + return kj::mv(e); }); } catch (const std::exception& e) { - MP_LOG(*server.m_context.loop, Log::Error) << "IPC server unhandled exception: " << e.what(); + MP_LOG(loop, Log::Error) << "IPC server unhandled exception: " << e.what(); throw; } catch (...) { - MP_LOG(*server.m_context.loop, Log::Error) << "IPC server unhandled exception"; + MP_LOG(loop, Log::Error) << "IPC server unhandled exception"; throw; } } diff --git a/src/ipc/libmultiprocess/include/mp/proxy.h b/src/ipc/libmultiprocess/include/mp/proxy.h index c55380c1..4458ee9e 100644 --- a/src/ipc/libmultiprocess/include/mp/proxy.h +++ b/src/ipc/libmultiprocess/include/mp/proxy.h @@ -304,20 +304,31 @@ struct ProxyServerMethodTraits : public ProxyMethodTraits { }; -static constexpr int FIELD_IN = 1; -static constexpr int FIELD_OUT = 2; -static constexpr int FIELD_OPTIONAL = 4; -static constexpr int FIELD_REQUESTED = 8; -static constexpr int FIELD_BOXED = 16; +static constexpr int FIELD_IN = 1; //!< See Accessor::in. +static constexpr int FIELD_OUT = 2; //!< See Accessor::out. +static constexpr int FIELD_OPTIONAL = 4; //!< See Accessor::optional. +static constexpr int FIELD_REQUESTED = 8; //!< See Accessor::requested. +static constexpr int FIELD_BOXED = 16; //!< See Accessor::boxed. //! Accessor type holding flags that determine how to access a message field. template struct Accessor : public Field { + //! Field is present from the Cap'n Proto Params struct (client -> server). static const bool in = flags & FIELD_IN; + //! Field is present from the Cap'n Proto Results struct (server -> client). static const bool out = flags & FIELD_OUT; + //! Field has a companion has{Name} boolean field in the Cap'n Proto struct. + //! This is used to represent optional primitive values (e.g. C++ + //! std::optional) because Cap'n Proto doesn't allow primitive fields to + //! be unset. static const bool optional = flags & FIELD_OPTIONAL; + //! Results field has a companion want{Name} boolean field in the Params + //! struct. Used for optional output parameters (e.g. C++ int*) and set to + //! true if the caller passed a non-null pointer and wants the result. static const bool requested = flags & FIELD_REQUESTED; + //! Field is a Cap'n Proto pointer type (struct, list, text, data, + //! interface) as opposed to a primitive type (bool, int, float, enum). static const bool boxed = flags & FIELD_BOXED; }; diff --git a/src/ipc/libmultiprocess/include/mp/type-context.h b/src/ipc/libmultiprocess/include/mp/type-context.h index 46952f49..3ab3d4b0 100644 --- a/src/ipc/libmultiprocess/include/mp/type-context.h +++ b/src/ipc/libmultiprocess/include/mp/type-context.h @@ -216,6 +216,40 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& << "IPC server error request #" << req << ", missing thread to execute request"; throw std::runtime_error("invalid thread handle"); } + }, [&loop, req](::kj::Exception&& e) -> kj::Promise { + // If you see the error "(remote):0: failed: remote exception: + // Called null capability" here, it probably means your Init class + // is missing a declaration like: + // + // construct @0 (threadMap: Proxy.ThreadMap) -> (threadMap :Proxy.ThreadMap); + // + // which passes a ThreadMap reference from the client to the server, + // allowing the server to create threads to run IPC calls on the + // client, and also returns a ThreadMap reference from the server to + // the client, allowing the client to create threads on the server. + // (Typically the latter ThreadMap is used more often because there + // are more client-to-server calls.) + // + // If the other side of the connection did not previously get a + // ThreadMap reference from this side of the connection, when the + // other side calls `m_thread_map.makeThreadRequest()` in + // `BuildField` above, `m_thread_map` will be null, but that call + // will not fail immediately due to Cap'n Proto's request pipelining + // and delayed execution. Instead that call will return an invalid + // Thread reference, and when that reference is passed to this side + // of the connection as `thread_client` above, the + // `getLocalServer(thread_client)` call there will be the first + // thing to overtly fail, leading to an error here. + // + // Potentially there are also other things that could cause errors + // here, but this is the most likely cause. + // + // The log statement here is not strictly necessary since the same + // exception will also be logged in serverInvoke, but this logging + // may provide extra context that could be helpful for debugging. + MP_LOG(loop, Log::Info) + << "IPC server error request #" << req << " CapabilityServerSet::getLocalServer call failed, did you forget to provide a ThreadMap to the client prior to this IPC call?"; + return kj::mv(e); }); // Use connection m_canceler object to cancel the result promise if the // connection is destroyed. (By default Cap'n Proto does not cancel requests diff --git a/src/ipc/libmultiprocess/include/mp/type-map.h b/src/ipc/libmultiprocess/include/mp/type-map.h index 50a590cb..213ed811 100644 --- a/src/ipc/libmultiprocess/include/mp/type-map.h +++ b/src/ipc/libmultiprocess/include/mp/type-map.h @@ -17,14 +17,7 @@ void CustomBuildField(TypeList>, Value&& value, Output&& output) { - // FIXME dededup with vector handler above - auto list = output.init(value.size()); - size_t i = 0; - for (const auto& elem : value) { - BuildField(TypeList>(), invoke_context, - ListOutput(list, i), elem); - ++i; - } + BuildList(TypeList>(), invoke_context, output, value); } // Replacement for `m.emplace(piecewise_construct, t1, t2)` to work around a diff --git a/src/ipc/libmultiprocess/include/mp/type-number.h b/src/ipc/libmultiprocess/include/mp/type-number.h index 5c997f54..ddff5cdd 100644 --- a/src/ipc/libmultiprocess/include/mp/type-number.h +++ b/src/ipc/libmultiprocess/include/mp/type-number.h @@ -82,7 +82,7 @@ decltype(auto) CustomReadField(TypeList, InvokeContext& invoke_context, Input&& input, ReadDest&& read_dest, - typename std::enable_if::value>::type* enable = 0) + typename std::enable_if::value>::type* enable = nullptr) { auto value = input.get(); static_assert(std::is_same::value, "floating point type mismatch"); diff --git a/src/ipc/libmultiprocess/include/mp/type-set.h b/src/ipc/libmultiprocess/include/mp/type-set.h index 699c6e9e..f051f738 100644 --- a/src/ipc/libmultiprocess/include/mp/type-set.h +++ b/src/ipc/libmultiprocess/include/mp/type-set.h @@ -16,13 +16,7 @@ void CustomBuildField(TypeList>, Value&& value, Output&& output) { - // FIXME dededup with vector handler above - auto list = output.init(value.size()); - size_t i = 0; - for (const auto& elem : value) { - BuildField(TypeList(), invoke_context, ListOutput(list, i), elem); - ++i; - } + BuildList(TypeList(), invoke_context, output, value); } template diff --git a/src/ipc/libmultiprocess/include/mp/type-unordered-set.h b/src/ipc/libmultiprocess/include/mp/type-unordered-set.h new file mode 100644 index 00000000..ad4c0ab8 --- /dev/null +++ b/src/ipc/libmultiprocess/include/mp/type-unordered-set.h @@ -0,0 +1,43 @@ +// Copyright (c) The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#ifndef MP_PROXY_TYPE_UNORDERED_SET_H +#define MP_PROXY_TYPE_UNORDERED_SET_H + +#include +#include +#include + +namespace mp { +template +void CustomBuildField(TypeList>, + Priority<1>, + InvokeContext& invoke_context, + Value&& value, + Output&& output) +{ + BuildList(TypeList(), invoke_context, output, value); +} + +template +decltype(auto) CustomReadField(TypeList>, + Priority<1>, + InvokeContext& invoke_context, + Input&& input, + ReadDest&& read_dest) +{ + return read_dest.update([&](auto& value) { + auto data = input.get(); + value.clear(); + for (auto item : data) { + ReadField(TypeList(), invoke_context, Make(item), + ReadDestEmplace(TypeList(), [&](auto&&... args) -> auto& { + return *value.emplace(std::forward(args)...).first; + })); + } + }); +} +} // namespace mp + +#endif // MP_PROXY_TYPE_UNORDERED_SET_H \ No newline at end of file diff --git a/src/ipc/libmultiprocess/include/mp/type-vector.h b/src/ipc/libmultiprocess/include/mp/type-vector.h index 90605ddf..648fda5d 100644 --- a/src/ipc/libmultiprocess/include/mp/type-vector.h +++ b/src/ipc/libmultiprocess/include/mp/type-vector.h @@ -16,12 +16,7 @@ void CustomBuildField(TypeList>, Value&& value, Output&& output) { - // FIXME dedup with set handler below - auto list = output.init(value.size()); - size_t i = 0; - for (auto it = value.begin(); it != value.end(); ++it, ++i) { - BuildField(TypeList(), invoke_context, ListOutput(list, i), *it); - } + BuildList(TypeList(), invoke_context, output, value); } inline static bool BuildPrimitive(InvokeContext& invoke_context, std::vector::const_reference value, TypeList) diff --git a/src/ipc/libmultiprocess/include/mp/util.h b/src/ipc/libmultiprocess/include/mp/util.h index a3db1282..380a142f 100644 --- a/src/ipc/libmultiprocess/include/mp/util.h +++ b/src/ipc/libmultiprocess/include/mp/util.h @@ -7,7 +7,7 @@ #include #include -#include +#include #include #include #include @@ -15,11 +15,17 @@ #include #include #include +#include #include #include #include #include +#if __has_include() +#include +#include +#endif + namespace mp { //! Generic utility functions used by capnp code. @@ -274,6 +280,28 @@ inline char* CharCast(unsigned char* c) { return (char*)c; } inline const char* CharCast(const char* c) { return c; } inline const char* CharCast(const unsigned char* c) { return (const char*)c; } +#if __has_include() // GCC & Clang ─ use to demangle +inline std::string _demangle(const char* m) +{ + int status = 0; + std::unique_ptr p{ + abi::__cxa_demangle(m, /*output_buffer=*/nullptr, /*length=*/nullptr, &status), std::free}; + return (status == 0 && p) ? p.get() : m; // fall back on mangled if needed +} +#else // MSVC or other ─ no demangling available +inline std::string _demangle(const char* m) { return m; } +#endif + +template +std::string CxxTypeName(const T& /*unused*/) +{ +#ifdef __cpp_rtti + return _demangle(typeid(std::decay_t).name()); +#else + return ""; +#endif +} + //! Exception thrown from code executing an IPC call that is interrupted. struct InterruptException final : std::exception { explicit InterruptException(std::string message) : m_message(std::move(message)) {} diff --git a/src/ipc/libmultiprocess/include/mp/version.h b/src/ipc/libmultiprocess/include/mp/version.h index 964667a9..423ed460 100644 --- a/src/ipc/libmultiprocess/include/mp/version.h +++ b/src/ipc/libmultiprocess/include/mp/version.h @@ -24,7 +24,7 @@ //! pointing at the prior merge commit. The /doc/versions.md file should also be //! updated, noting any significant or incompatible changes made since the //! previous version. -#define MP_MAJOR_VERSION 10 +#define MP_MAJOR_VERSION 11 //! Minor version number. Should be incremented in stable branches after //! backporting changes. The /doc/versions.md file should also be updated to diff --git a/src/ipc/libmultiprocess/shell.nix b/src/ipc/libmultiprocess/shell.nix index 9ebbc0a0..1a4614e3 100644 --- a/src/ipc/libmultiprocess/shell.nix +++ b/src/ipc/libmultiprocess/shell.nix @@ -39,6 +39,7 @@ let hash = lib.attrByPath [capnprotoVersion] "" capnprotoHashes; }; patches = lib.optionals (lib.versionAtLeast capnprotoVersion "0.9.0" && lib.versionOlder capnprotoVersion "0.10.4") [ ./ci/patches/spaceship.patch ]; + cmakeFlags = (old.cmakeFlags or []) ++ (lib.optionals (lib.versionAtLeast "1.1.0" capnprotoVersion) ["-DCMAKE_POLICY_VERSION_MINIMUM=3.5"]); } // (lib.optionalAttrs (lib.versionOlder capnprotoVersion "0.10") { env = { }; # Drop -std=c++20 flag forced by nixpkgs })); diff --git a/src/ipc/libmultiprocess/src/mp/gen.cpp b/src/ipc/libmultiprocess/src/mp/gen.cpp index 603f9ccb..c8db469d 100644 --- a/src/ipc/libmultiprocess/src/mp/gen.cpp +++ b/src/ipc/libmultiprocess/src/mp/gen.cpp @@ -6,6 +6,7 @@ #include #include +#include // IWYU pragma: keep #include #include #include @@ -125,6 +126,109 @@ static bool BoxedType(const ::capnp::Type& type) type.isFloat64() || type.isEnum()); } +struct Field +{ + ::capnp::StructSchema::Field param; + bool param_is_set = false; + ::capnp::StructSchema::Field result; + bool result_is_set = false; + int args = 0; + bool retval = false; + bool optional = false; + bool requested = false; + bool skip = false; + kj::StringPtr exception; +}; + +struct FieldList +{ + std::vector fields; + std::map field_idx; // name -> args index + bool has_result = false; + + void addField(const ::capnp::StructSchema::Field& schema_field, bool param, bool result) + { + auto field_name = schema_field.getProto().getName(); + auto inserted = field_idx.emplace(field_name, fields.size()); + if (inserted.second) { + fields.emplace_back(); + } + auto& field = fields[inserted.first->second]; + if (param) { + field.param = schema_field; + field.param_is_set = true; + } + if (result) { + field.result = schema_field; + field.result_is_set = true; + } + + if (!param && field_name == kj::StringPtr{"result"}) { + field.retval = true; + has_result = true; + } + + if (AnnotationExists(schema_field.getProto(), SKIP_ANNOTATION_ID)) { + field.skip = true; + } + GetAnnotationText(schema_field.getProto(), EXCEPTION_ANNOTATION_ID, &field.exception); + + int32_t count = 1; + if (!GetAnnotationInt32(schema_field.getProto(), COUNT_ANNOTATION_ID, &count)) { + if (schema_field.getType().isStruct()) { + GetAnnotationInt32(schema_field.getType().asStruct().getProto(), + COUNT_ANNOTATION_ID, &count); + } else if (schema_field.getType().isInterface()) { + GetAnnotationInt32(schema_field.getType().asInterface().getProto(), + COUNT_ANNOTATION_ID, &count); + } + } + + + if (inserted.second && !field.retval && !field.exception.size()) { + field.args = count; + } + } + + void mergeFields() + { + for (auto& field : field_idx) { + auto has_field = field_idx.find("has" + Cap(field.first)); + if (has_field != field_idx.end()) { + fields[has_field->second].skip = true; + fields[field.second].optional = true; + } + auto want_field = field_idx.find("want" + Cap(field.first)); + if (want_field != field_idx.end() && fields[want_field->second].param_is_set) { + fields[want_field->second].skip = true; + fields[field.second].requested = true; + } + } + } +}; + +std::string AccessorType(kj::StringPtr base_name, const Field& field) +{ + const auto& f = field.param_is_set ? field.param : field.result; + const auto field_name = f.getProto().getName(); + const auto field_type = f.getType(); + + std::ostringstream out; + out << "Accessor<" << base_name << "_fields::" << Cap(field_name) << ", "; + if (!field.param_is_set) { + out << "FIELD_OUT"; + } else if (field.result_is_set) { + out << "FIELD_IN | FIELD_OUT"; + } else { + out << "FIELD_IN"; + } + if (field.optional) out << " | FIELD_OPTIONAL"; + if (field.requested) out << " | FIELD_REQUESTED"; + if (BoxedType(field_type)) out << " | FIELD_BOXED"; + out << ">"; + return out.str(); +} + // src_file is path to .capnp file to generate stub code from. // // src_prefix can be used to generate outputs in a different directory than the @@ -226,12 +330,17 @@ static void Generate(kj::StringPtr src_prefix, cpp_client << "#include <" << include_path << ".h>\n"; cpp_client << "#include <" << include_path << ".proxy.h>\n"; cpp_client << "#include <" << include_path << ".proxy-types.h>\n"; + cpp_client << "#include \n"; + cpp_client << "#include \n"; cpp_client << "#include \n"; cpp_client << "#include \n"; - cpp_client << "#include \n"; + cpp_client << "#include \n"; cpp_client << "#include \n"; + cpp_client << "#include \n"; cpp_client << "#include \n"; cpp_client << "#include \n"; + cpp_client << "#include \n"; + cpp_client << "#include \n"; cpp_client << "#include <" << PROXY_TYPES << ">\n"; cpp_client << "// IWYU pragma: end_keep\n\n"; cpp_client << "namespace mp {\n"; @@ -240,6 +349,7 @@ static void Generate(kj::StringPtr src_prefix, cpp_types << "// Generated by " PROXY_BIN " from " << src_file << "\n\n"; cpp_types << "// IWYU pragma: no_include \"mp/proxy.h\"\n"; cpp_types << "// IWYU pragma: no_include \"mp/proxy-io.h\"\n"; + cpp_types << "#include <" << include_path << ".h> // IWYU pragma: keep\n"; cpp_types << "#include <" << include_path << ".proxy.h>\n"; cpp_types << "#include <" << include_path << ".proxy-types.h> // IWYU pragma: keep\n"; cpp_types << "#include <" << PROXY_TYPES << ">\n\n"; @@ -332,6 +442,13 @@ static void Generate(kj::StringPtr src_prefix, if (node.getProto().isStruct()) { const auto& struc = node.asStruct(); + + FieldList fields; + for (const auto schema_field : struc.getFields()) { + fields.addField(schema_field, true, true); + } + fields.mergeFields(); + std::ostringstream generic_name; generic_name << node_name; dec << "template<"; @@ -352,22 +469,18 @@ static void Generate(kj::StringPtr src_prefix, dec << "struct ProxyStruct<" << message_namespace << "::" << generic_name.str() << ">\n"; dec << "{\n"; dec << " using Struct = " << message_namespace << "::" << generic_name.str() << ";\n"; - for (const auto field : struc.getFields()) { - auto field_name = field.getProto().getName(); + for (const auto& field : fields.fields) { + auto field_name = field.param.getProto().getName(); add_accessor(field_name); - dec << " using " << Cap(field_name) << "Accessor = Accessor<" << base_name - << "_fields::" << Cap(field_name) << ", FIELD_IN | FIELD_OUT"; - if (BoxedType(field.getType())) dec << " | FIELD_BOXED"; - dec << ">;\n"; + dec << " using " << Cap(field_name) << "Accessor = " + << AccessorType(base_name, field) << ";\n"; } dec << " using Accessors = std::tuple<"; size_t i = 0; - for (const auto field : struc.getFields()) { - if (AnnotationExists(field.getProto(), SKIP_ANNOTATION_ID)) { - continue; - } + for (const auto& field : fields.fields) { + if (field.skip) continue; if (i) dec << ", "; - dec << Cap(field.getProto().getName()) << "Accessor"; + dec << Cap(field.param.getProto().getName()) << "Accessor"; ++i; } dec << ">;\n"; @@ -381,13 +494,11 @@ static void Generate(kj::StringPtr src_prefix, inl << "public:\n"; inl << " using Struct = " << message_namespace << "::" << node_name << ";\n"; size_t i = 0; - for (const auto field : struc.getFields()) { - if (AnnotationExists(field.getProto(), SKIP_ANNOTATION_ID)) { - continue; - } - auto field_name = field.getProto().getName(); + for (const auto& field : fields.fields) { + if (field.skip) continue; + auto field_name = field.param.getProto().getName(); auto member_name = field_name; - GetAnnotationText(field.getProto(), NAME_ANNOTATION_ID, &member_name); + GetAnnotationText(field.param.getProto(), NAME_ANNOTATION_ID, &member_name); inl << " static decltype(auto) get(std::integral_constant) { return " << "&" << proxied_class_type << "::" << member_name << "; }\n"; ++i; @@ -430,85 +541,14 @@ static void Generate(kj::StringPtr src_prefix, const bool is_construct = method_name == kj::StringPtr{"construct"}; const bool is_destroy = method_name == kj::StringPtr{"destroy"}; - struct Field - { - ::capnp::StructSchema::Field param; - bool param_is_set = false; - ::capnp::StructSchema::Field result; - bool result_is_set = false; - int args = 0; - bool retval = false; - bool optional = false; - bool requested = false; - bool skip = false; - kj::StringPtr exception; - }; - - std::vector fields; - std::map field_idx; // name -> args index - bool has_result = false; - - auto add_field = [&](const ::capnp::StructSchema::Field& schema_field, bool param) { - if (AnnotationExists(schema_field.getProto(), SKIP_ANNOTATION_ID)) { - return; - } - - auto field_name = schema_field.getProto().getName(); - auto inserted = field_idx.emplace(field_name, fields.size()); - if (inserted.second) { - fields.emplace_back(); - } - auto& field = fields[inserted.first->second]; - if (param) { - field.param = schema_field; - field.param_is_set = true; - } else { - field.result = schema_field; - field.result_is_set = true; - } - - if (!param && field_name == kj::StringPtr{"result"}) { - field.retval = true; - has_result = true; - } - - GetAnnotationText(schema_field.getProto(), EXCEPTION_ANNOTATION_ID, &field.exception); - - int32_t count = 1; - if (!GetAnnotationInt32(schema_field.getProto(), COUNT_ANNOTATION_ID, &count)) { - if (schema_field.getType().isStruct()) { - GetAnnotationInt32(schema_field.getType().asStruct().getProto(), - COUNT_ANNOTATION_ID, &count); - } else if (schema_field.getType().isInterface()) { - GetAnnotationInt32(schema_field.getType().asInterface().getProto(), - COUNT_ANNOTATION_ID, &count); - } - } - - - if (inserted.second && !field.retval && !field.exception.size()) { - field.args = count; - } - }; - + FieldList fields; for (const auto schema_field : method.getParamType().getFields()) { - add_field(schema_field, true); + fields.addField(schema_field, true, false); } for (const auto schema_field : method.getResultType().getFields()) { - add_field(schema_field, false); - } - for (auto& field : field_idx) { - auto has_field = field_idx.find("has" + Cap(field.first)); - if (has_field != field_idx.end()) { - fields[has_field->second].skip = true; - fields[field.second].optional = true; - } - auto want_field = field_idx.find("want" + Cap(field.first)); - if (want_field != field_idx.end() && fields[want_field->second].param_is_set) { - fields[want_field->second].skip = true; - fields[field.second].requested = true; - } + fields.addField(schema_field, false, true); } + fields.mergeFields(); if (!is_construct && !is_destroy && (&method_interface == &interface)) { methods << "template<>\n"; @@ -524,25 +564,11 @@ static void Generate(kj::StringPtr src_prefix, std::ostringstream server_invoke_start; std::ostringstream server_invoke_end; int argc = 0; - for (const auto& field : fields) { + for (const auto& field : fields.fields) { if (field.skip) continue; const auto& f = field.param_is_set ? field.param : field.result; auto field_name = f.getProto().getName(); - auto field_type = f.getType(); - - std::ostringstream field_flags; - if (!field.param_is_set) { - field_flags << "FIELD_OUT"; - } else if (field.result_is_set) { - field_flags << "FIELD_IN | FIELD_OUT"; - } else { - field_flags << "FIELD_IN"; - } - if (field.optional) field_flags << " | FIELD_OPTIONAL"; - if (field.requested) field_flags << " | FIELD_REQUESTED"; - if (BoxedType(field_type)) field_flags << " | FIELD_BOXED"; - add_accessor(field_name); std::ostringstream fwd_args; @@ -569,8 +595,7 @@ static void Generate(kj::StringPtr src_prefix, client_invoke << "MakeClientParam<"; } - client_invoke << "Accessor<" << base_name << "_fields::" << Cap(field_name) << ", " - << field_flags.str() << ">>("; + client_invoke << AccessorType(base_name, field) << ">("; if (field.retval) { client_invoke << field_name; @@ -586,8 +611,7 @@ static void Generate(kj::StringPtr src_prefix, } else { server_invoke_start << "MakeServerField<" << field.args; } - server_invoke_start << ", Accessor<" << base_name << "_fields::" << Cap(field_name) << ", " - << field_flags.str() << ">>("; + server_invoke_start << ", " << AccessorType(base_name, field) << ">("; server_invoke_end << ")"; } @@ -603,12 +627,12 @@ static void Generate(kj::StringPtr src_prefix, def_client << "ProxyClient<" << message_namespace << "::" << node_name << ">::M" << method_ordinal << "::Result ProxyClient<" << message_namespace << "::" << node_name << ">::" << method_name << "(" << super_str << client_args.str() << ") {\n"; - if (has_result) { + if (fields.has_result) { def_client << " typename M" << method_ordinal << "::Result result;\n"; } def_client << " clientInvoke(" << self_str << ", &" << message_namespace << "::" << node_name << "::Client::" << method_name << "Request" << client_invoke.str() << ");\n"; - if (has_result) def_client << " return result;\n"; + if (fields.has_result) def_client << " return result;\n"; def_client << "}\n"; server << " kj::Promise " << method_name << "(" << Cap(method_name) diff --git a/src/ipc/libmultiprocess/src/mp/proxy.cpp b/src/ipc/libmultiprocess/src/mp/proxy.cpp index d24208db..963050c3 100644 --- a/src/ipc/libmultiprocess/src/mp/proxy.cpp +++ b/src/ipc/libmultiprocess/src/mp/proxy.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -245,7 +246,12 @@ void EventLoop::loop() if (read_bytes != 1) throw std::logic_error("EventLoop wait_stream closed unexpectedly"); Lock lock(m_mutex); if (m_post_fn) { - Unlock(lock, *m_post_fn); + // m_post_fn throwing is never expected. If it does happen, the caller + // of EventLoop::post() will return without any indication of failure, + // which will likely cause other bugs. Log the error and continue. + KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() MP_REQUIRES(m_mutex) { Unlock(lock, *m_post_fn); })) { + MP_LOG(*this, Log::Error) << "EventLoop: m_post_fn threw: " << kj::str(*exception).cStr(); + } m_post_fn = nullptr; m_cv.notify_all(); } else if (done()) { diff --git a/src/ipc/libmultiprocess/test/mp/test/foo-types.h b/src/ipc/libmultiprocess/test/mp/test/foo-types.h index 735adb76..b96eabfc 100644 --- a/src/ipc/libmultiprocess/test/mp/test/foo-types.h +++ b/src/ipc/libmultiprocess/test/mp/test/foo-types.h @@ -20,11 +20,13 @@ #include #include #include +#include #include #include #include #include #include +#include #include #include #include diff --git a/src/ipc/libmultiprocess/test/mp/test/foo.capnp b/src/ipc/libmultiprocess/test/mp/test/foo.capnp index 5bdee257..31736452 100644 --- a/src/ipc/libmultiprocess/test/mp/test/foo.capnp +++ b/src/ipc/libmultiprocess/test/mp/test/foo.capnp @@ -30,6 +30,7 @@ interface FooInterface $Proxy.wrap("mp::test::FooImplementation") { passMessage @13 (arg :FooMessage) -> (result :FooMessage); passMutable @14 (arg :FooMutable) -> (arg :FooMutable); passEnum @15 (arg :Int32) -> (result :Int32); + passDouble @23 (arg :Float64) -> (result :Float64); passFn @16 (context :Proxy.Context, fn :FooFn) -> (result :Int32); callFn @17 () -> (); callFnAsync @18 (context :Proxy.Context) -> (); @@ -53,8 +54,11 @@ interface FooFn $Proxy.wrap("ProxyCallback>") { struct FooStruct $Proxy.wrap("mp::test::FooStruct") { name @0 :Text; - setint @1 :List(Int32); - vbool @2 :List(Bool); + setInt @1 :List(Int32) $Proxy.name("set_int"); + vBool @2 :List(Bool) $Proxy.name("v_bool"); + optionalInt @3 :Int32 $Proxy.name("optional_int"); + hasOptionalInt @4 :Bool; + unorderedSetInt @5 :List(Int32) $Proxy.name("unordered_set_int"); } struct FooCustom $Proxy.wrap("mp::test::FooCustom") { diff --git a/src/ipc/libmultiprocess/test/mp/test/foo.h b/src/ipc/libmultiprocess/test/mp/test/foo.h index 317af025..5b66b85e 100644 --- a/src/ipc/libmultiprocess/test/mp/test/foo.h +++ b/src/ipc/libmultiprocess/test/mp/test/foo.h @@ -9,8 +9,10 @@ #include #include #include +#include #include #include +#include #include namespace mp { @@ -19,8 +21,10 @@ namespace test { struct FooStruct { std::string name; - std::set setint; - std::vector vbool; + std::set set_int; + std::vector v_bool; + std::optional optional_int; + std::unordered_set unordered_set_int; }; enum class FooEnum : uint8_t { ONE = 1, TWO = 2, }; @@ -82,6 +86,7 @@ class FooImplementation FooMessage passMessage(FooMessage foo) { foo.message += " call"; return foo; } void passMutable(FooMutable& foo) { foo.message += " call"; } FooEnum passEnum(FooEnum foo) { return foo; } + double passDouble(double value) { return value; } int passFn(std::function fn) { return fn(); } std::vector passDataPointers(std::vector values) { return values; } std::shared_ptr m_callback; diff --git a/src/ipc/libmultiprocess/test/mp/test/test.cpp b/src/ipc/libmultiprocess/test/mp/test/test.cpp index d91edb40..eb4b5ec7 100644 --- a/src/ipc/libmultiprocess/test/mp/test/test.cpp +++ b/src/ipc/libmultiprocess/test/mp/test/test.cpp @@ -34,6 +34,7 @@ #include #include #include +#include #include #include @@ -141,21 +142,34 @@ KJ_TEST("Call FooInterface methods") FooStruct in; in.name = "name"; - in.setint.insert(2); - in.setint.insert(1); - in.vbool.push_back(false); - in.vbool.push_back(true); - in.vbool.push_back(false); + in.set_int.insert(2); + in.set_int.insert(1); + in.unordered_set_int.insert(2); + in.unordered_set_int.insert(1); + in.v_bool.push_back(false); + in.v_bool.push_back(true); + in.v_bool.push_back(false); + in.optional_int = 3; FooStruct out = foo->pass(in); KJ_EXPECT(in.name == out.name); - KJ_EXPECT(in.setint.size() == out.setint.size()); - for (auto init{in.setint.begin()}, outit{out.setint.begin()}; init != in.setint.end() && outit != out.setint.end(); ++init, ++outit) { + KJ_EXPECT(in.set_int.size() == out.set_int.size()); + for (auto init{in.set_int.begin()}, outit{out.set_int.begin()}; init != in.set_int.end() && outit != out.set_int.end(); ++init, ++outit) { KJ_EXPECT(*init == *outit); } - KJ_EXPECT(in.vbool.size() == out.vbool.size()); - for (size_t i = 0; i < in.vbool.size(); ++i) { - KJ_EXPECT(in.vbool[i] == out.vbool[i]); + KJ_EXPECT(in.unordered_set_int.size() == out.unordered_set_int.size()); + for (const auto& elem : in.unordered_set_int) { + KJ_EXPECT(out.unordered_set_int.count(elem) == 1); } + KJ_EXPECT(in.v_bool.size() == out.v_bool.size()); + for (size_t i = 0; i < in.v_bool.size(); ++i) { + KJ_EXPECT(in.v_bool[i] == out.v_bool[i]); + } + KJ_EXPECT(in.optional_int == out.optional_int); + + // Additional checks for std::optional member + KJ_EXPECT(foo->pass(in).optional_int == 3); + in.optional_int.reset(); + KJ_EXPECT(!foo->pass(in).optional_int); FooStruct err; try { @@ -216,6 +230,8 @@ KJ_TEST("Call FooInterface methods") foo->passMutable(mut); KJ_EXPECT(mut.message == "init build pass call return read"); + KJ_EXPECT(foo->passDouble(1.25) == 1.25); + KJ_EXPECT(foo->passFn([]{ return 10; }) == 10); std::vector data_in; @@ -427,6 +443,32 @@ KJ_TEST("Calling async IPC method, with server disconnect after cleanup") } } +KJ_TEST("Destroying ProxyClient<> with destroy method after peer disconnect") +{ + // Regression test for bitcoin-core/libmultiprocess#219 where + // ~ProxyClientBase would call std::terminate if the remote destroy RPC + // failed during teardown. + // + // Save a callback on the server so it holds a ProxyClient + // pointing back to this side, then disconnect. When the server is torn + // down, the ProxyClient destructor issues a destroy RPC over + // the now dead connection; without the bugfix the exception escapes the + // noexcept destructor and aborts the process. + + TestSetup setup{/*client_owns_connection=*/false}; + ProxyClient* foo = setup.client.get(); + foo->initThreadMap(); + + class Callback : public FooCallback + { + public: + int call(int arg) override { return arg; } + }; + + foo->saveCallback(std::make_shared()); + setup.client_disconnect(); +} + KJ_TEST("Make simultaneous IPC calls on single remote thread") { TestSetup setup; @@ -470,6 +512,7 @@ KJ_TEST("Make simultaneous IPC calls on single remote thread") [&running, &tc, i](auto&& results) { assert(results.getResult() == static_cast(100 * (i+1))); running -= 1; + Lock lock(tc.waiter->m_mutex); tc.waiter->m_cv.notify_all(); })); }