Skip to content

Commit 0a0c613

Browse files
committed
Move semaphore from MainThreadMessageDispatcher to dedicated helper class and refactor dispatch accordingly
1 parent 268ee00 commit 0a0c613

File tree

1 file changed

+107
-90
lines changed

1 file changed

+107
-90
lines changed

IPC/ARAIPCConnection.cpp

Lines changed: 107 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include "ARA_Library/IPC/ARAIPCEncoding.h"
2727

2828
#include <utility>
29+
#include <optional>
2930
#if __cplusplus >= 202002L
3031
#include <chrono>
3132
#include <semaphore>
@@ -233,48 +234,111 @@ std::unique_ptr<MessageEncoder> MessageDispatcher::_handleReceivedMessage (Messa
233234

234235
//------------------------------------------------------------------------------
235236

236-
// single-threaded variant for main thread communication only
237-
class MainThreadMessageDispatcher : public MessageDispatcher
237+
// helper for MainThreadMessageDispatcher: single-object message queue with semaphore to wait on
238+
class WaitableSingleMessageQueue
238239
{
239-
public:
240-
MainThreadMessageDispatcher (Connection* connection, std::unique_ptr<MessageChannel> && messageChannel)
241-
: MessageDispatcher { connection, std::move (messageChannel) },
240+
public:
241+
WaitableSingleMessageQueue ()
242242
#if __cplusplus >= 202002L
243-
_waitForMessageSemaphore { new std::binary_semaphore { 0 } }
243+
: _waitForMessageSemaphore { new std::binary_semaphore { 0 } }
244244
#elif defined (_WIN32)
245-
_waitForMessageSemaphore { ::CreateSemaphore (nullptr, 0, LONG_MAX, nullptr) }
245+
: _waitForMessageSemaphore { ::CreateSemaphore (nullptr, 0, LONG_MAX, nullptr) }
246246
#elif defined (__APPLE__)
247-
_waitForMessageSemaphore { dispatch_semaphore_create (0) }
247+
: _waitForMessageSemaphore { dispatch_semaphore_create (0) }
248248
#else
249-
#error "IPC not yet implemented for this platform"
249+
#error "IPC not yet implemented for this platform"
250250
#endif
251-
{
252-
getMessageChannel ()->_receivedMessageRouter = [this] (MessageID messageID, std::unique_ptr<const MessageDecoder> && decoder)
253-
{ routeReceivedMessage (messageID, std::move (decoder)); };
254-
}
251+
{}
255252

256-
~MainThreadMessageDispatcher () override
257-
{
253+
~WaitableSingleMessageQueue ()
254+
{
258255
#if __cplusplus >= 202002L
259-
delete static_cast<std::binary_semaphore*> (_waitForMessageSemaphore);
256+
delete static_cast<std::binary_semaphore*> (_waitForMessageSemaphore);
260257
#elif defined (_WIN32)
261-
::CloseHandle (_waitForMessageSemaphore);
258+
::CloseHandle (_waitForMessageSemaphore);
262259
#elif defined (__APPLE__)
263-
dispatch_release (static_cast<dispatch_semaphore_t> (_waitForMessageSemaphore));
260+
dispatch_release (static_cast<dispatch_semaphore_t> (_waitForMessageSemaphore));
264261
#else
265-
#error "IPC not yet implemented for this platform"
262+
#error "IPC not yet implemented for this platform"
266263
#endif
264+
}
265+
266+
std::optional<std::pair<MessageID, std::unique_ptr<const MessageDecoder>>> waitOnSemaphore (ARATimeDuration timeout)
267+
{
268+
bool didReceiveMessage;
269+
#if __cplusplus >= 202002L
270+
didReceiveMessage = static_cast<std::binary_semaphore*> (_waitForMessageSemaphore)->try_acquire_for (std::chrono::duration<ARATimeDuration> { timeout });
271+
#elif defined (_WIN32)
272+
didReceiveMessage = (::WaitForSingleObject (_waitForMessageSemaphore, static_cast<DWORD> (timeout * 1000.0 + 0.5)) == WAIT_OBJECT_0);
273+
#elif defined (__APPLE__)
274+
const auto deadline { dispatch_time (DISPATCH_TIME_NOW, static_cast<int64_t> (10e9 * timeout + 0.5)) };
275+
didReceiveMessage = (dispatch_semaphore_wait (static_cast<dispatch_semaphore_t> (_waitForMessageSemaphore), deadline) == 0);
276+
#else
277+
#error "IPC not yet implemented for this platform"
278+
#endif
279+
if (didReceiveMessage)
280+
{
281+
const auto messageID { _pendingMessageID };
282+
const auto messageDecoder { _pendingMessageDecoder.load (std::memory_order_acquire) };
283+
return std::make_pair (messageID, std::unique_ptr<const MessageDecoder> (messageDecoder));
284+
}
285+
else
286+
{
287+
return {};
288+
}
289+
}
290+
291+
void signalSemaphore (MessageID messageID, std::unique_ptr<const MessageDecoder> && decoder)
292+
{
293+
_pendingMessageID = messageID;
294+
_pendingMessageDecoder.store (decoder.release (), std::memory_order_release);
295+
#if __cplusplus >= 202002L
296+
static_cast<std::binary_semaphore*> (_waitForMessageSemaphore)->release ();
297+
#elif defined (_WIN32)
298+
::ReleaseSemaphore (_waitForMessageSemaphore, 1, nullptr);
299+
#elif defined (__APPLE__)
300+
dispatch_semaphore_signal (static_cast<dispatch_semaphore_t> (_waitForMessageSemaphore));
301+
#else
302+
#error "IPC not yet implemented for this platform"
303+
#endif
304+
}
305+
306+
private:
307+
MessageID _pendingMessageID { 0 }; // read/write _pendingMessageDecoder with proper barrier before/after reading/writing this
308+
std::atomic<const MessageDecoder*> _pendingMessageDecoder { nullptr };
309+
void* const _waitForMessageSemaphore; // concrete type is platform-dependent
310+
};
311+
312+
//------------------------------------------------------------------------------
313+
314+
// single-threaded variant for main thread communication only
315+
class MainThreadMessageDispatcher : public MessageDispatcher
316+
{
317+
public:
318+
MainThreadMessageDispatcher (Connection* connection, std::unique_ptr<MessageChannel> && messageChannel)
319+
: MessageDispatcher { connection, std::move (messageChannel) }
320+
{
321+
getMessageChannel ()->_receivedMessageRouter = [this] (MessageID messageID, std::unique_ptr<const MessageDecoder> && decoder)
322+
{ routeReceivedMessage (messageID, std::move (decoder)); };
267323
}
268324

325+
~MainThreadMessageDispatcher () override = default;
326+
269327
void sendMessage (MessageID messageID, std::unique_ptr<MessageEncoder> && encoder, ReplyHandler && replyHandler);
270328

271329
void routeReceivedMessage (MessageID messageID, std::unique_ptr<const MessageDecoder> && decoder);
272330

273-
void processPendingMessageIfNeeded ();
331+
void processPendingMessageIfNeeded ()
332+
{
333+
ARA_INTERNAL_ASSERT (getConnection ()->wasCreatedOnCurrentThread ());
274334

275-
private:
276-
bool waitOnSemaphore ();
277-
void signalSemaphore ();
335+
auto receivedData { _messageQueue.waitOnSemaphore (0.0) };
336+
if (receivedData)
337+
processMessage (receivedData->first, std::move (receivedData->second));
338+
}
339+
340+
protected:
341+
void processMessage (MessageID messageID, std::unique_ptr<const MessageDecoder> && decoder);
278342

279343
private:
280344
// key to indicate whether an outgoing call is made in response (reply or callback) to a
@@ -287,15 +351,9 @@ class MainThreadMessageDispatcher : public MessageDispatcher
287351
private:
288352
int32_t _processingMessagesCount { 0 };
289353

290-
// \todo this is not allowed for some reason, so we must cast at every use of _noPendingMessageDecoder...
291-
// static constexpr auto _noPendingMessageDecoder { reinterpret_cast<const MessageDecoder*> (static_cast<intptr_t> (-1)) };
292-
static constexpr auto _noPendingMessageDecoder { static_cast<intptr_t> (-1) };
293-
MessageID _pendingMessageID { 0 }; // read/write _pendingMessageDecoder with proper barrier before/after reading/writing this
294-
std::atomic<const MessageDecoder*> _pendingMessageDecoder { reinterpret_cast<const MessageDecoder*> (_noPendingMessageDecoder) };
295-
296354
const PendingReplyHandler* _pendingReplyHandler { nullptr };
297355

298-
void* const _waitForMessageSemaphore; // concrete type is platform-dependent
356+
WaitableSingleMessageQueue _messageQueue {};
299357
};
300358

301359

@@ -324,36 +382,31 @@ void MainThreadMessageDispatcher::sendMessage (MessageID messageID, std::unique_
324382
}
325383
else
326384
{
327-
if (waitOnSemaphore ())
328-
processPendingMessageIfNeeded ();
385+
auto receivedData { _messageQueue.waitOnSemaphore (0.010) };
386+
if (receivedData )
387+
processMessage (receivedData->first, std::move (receivedData->second));
329388
else
330389
getConnection ()->_callWaitForMessageDelegate ();
331390
}
332391
}
333392
}
334393

335-
void MainThreadMessageDispatcher::processPendingMessageIfNeeded ()
394+
void MainThreadMessageDispatcher::processMessage (MessageID messageID, std::unique_ptr<const MessageDecoder> && decoder)
336395
{
337396
ARA_INTERNAL_ASSERT (getConnection ()->wasCreatedOnCurrentThread ());
338397

339-
const auto pendingMessageDecoder { _pendingMessageDecoder.exchange (reinterpret_cast<const MessageDecoder*> (_noPendingMessageDecoder), std::memory_order_acquire) };
340-
if (pendingMessageDecoder != reinterpret_cast<const MessageDecoder*> (_noPendingMessageDecoder))
398+
if (isReply (messageID))
341399
{
342-
std::unique_ptr<const MessageDecoder> ownedPendingMessageDecoder { pendingMessageDecoder };
343-
const auto pendingMessageID { _pendingMessageID };
344-
if (isReply (pendingMessageID))
345-
{
346-
ARA_INTERNAL_ASSERT (_pendingReplyHandler != nullptr);
347-
_handleReply (std::move (ownedPendingMessageDecoder), std::move (*_pendingReplyHandler->_replyHandler));
348-
_pendingReplyHandler = _pendingReplyHandler->_prevPendingReplyHandler;
349-
}
350-
else
351-
{
352-
++_processingMessagesCount;
353-
auto replyEncoder { _handleReceivedMessage (pendingMessageID, std::move (ownedPendingMessageDecoder)) };
354-
--_processingMessagesCount;
355-
_sendMessage (0, std::move (replyEncoder), false);
356-
}
400+
ARA_INTERNAL_ASSERT (_pendingReplyHandler != nullptr);
401+
_handleReply (std::move (decoder), std::move (*_pendingReplyHandler->_replyHandler));
402+
_pendingReplyHandler = _pendingReplyHandler->_prevPendingReplyHandler;
403+
}
404+
else
405+
{
406+
++_processingMessagesCount;
407+
auto replyEncoder { _handleReceivedMessage (messageID, std::move (decoder)) };
408+
--_processingMessagesCount;
409+
_sendMessage (0, std::move (replyEncoder), false);
357410
}
358411
}
359412

@@ -384,57 +437,21 @@ void MainThreadMessageDispatcher::routeReceivedMessage (MessageID messageID, std
384437
ARA_IPC_DECODE_RECEIVED_MESSAGE_ARGS (messageID), ARA_IPC_LABEL_THREAD_ARGS);
385438
}
386439

387-
_pendingMessageID = messageID;
388440
if (processSynchronously)
389441
{
390-
_pendingMessageDecoder.store (decoder.release (), std::memory_order_relaxed);
391-
392-
processPendingMessageIfNeeded ();
442+
processMessage (messageID, std::move (decoder));
393443
}
394444
else
395445
{
396-
_pendingMessageDecoder.store (decoder.release (), std::memory_order_release);
397-
398446
// only new transactions must be dispatched, otherwise the target thread is already waiting for the message received signal
399447
if (isResponse)
400-
signalSemaphore ();
448+
_messageQueue.signalSemaphore (messageID, std::move (decoder));
401449
else
402-
getConnection ()->dispatchToCreationThread (std::bind (&MainThreadMessageDispatcher::processPendingMessageIfNeeded, this));
450+
getConnection ()->dispatchToCreationThread ([this, messageID, d = decoder.release ()] ()
451+
{ processMessage (messageID, std::unique_ptr<const MessageDecoder> (d)); });
403452
}
404453
}
405454

406-
bool MainThreadMessageDispatcher::waitOnSemaphore ()
407-
{
408-
ARA_INTERNAL_ASSERT (getConnection ()->wasCreatedOnCurrentThread ());
409-
410-
constexpr ARATimeDuration timeout { 0.010 };
411-
bool didReceiveMessage;
412-
#if __cplusplus >= 202002L
413-
didReceiveMessage = static_cast<std::binary_semaphore*> (_waitForMessageSemaphore)->try_acquire_for (std::chrono::duration<ARATimeDuration> { timeout });
414-
#elif defined (_WIN32)
415-
didReceiveMessage = (::WaitForSingleObject (_waitForMessageSemaphore, static_cast<DWORD> (timeout * 1000.0 + 0.5)) == WAIT_OBJECT_0);
416-
#elif defined (__APPLE__)
417-
const auto deadline { dispatch_time (DISPATCH_TIME_NOW, static_cast<int64_t> (10e9 * timeout + 0.5)) };
418-
didReceiveMessage = (dispatch_semaphore_wait (static_cast<dispatch_semaphore_t> (_waitForMessageSemaphore), deadline) == 0);
419-
#else
420-
#error "IPC not yet implemented for this platform"
421-
#endif
422-
return didReceiveMessage;
423-
}
424-
425-
void MainThreadMessageDispatcher::signalSemaphore ()
426-
{
427-
#if __cplusplus >= 202002L
428-
static_cast<std::binary_semaphore*> (_waitForMessageSemaphore)->release ();
429-
#elif defined (_WIN32)
430-
::ReleaseSemaphore (_waitForMessageSemaphore, 1, nullptr);
431-
#elif defined (__APPLE__)
432-
dispatch_semaphore_signal (static_cast<dispatch_semaphore_t> (_waitForMessageSemaphore));
433-
#else
434-
#error "IPC not yet implemented for this platform"
435-
#endif
436-
}
437-
438455
//------------------------------------------------------------------------------
439456

440457
// multi-threaded variant for all non-main thread communication

0 commit comments

Comments
 (0)