From 336d0e4bb0391d74277761e3d18d79992980a39b Mon Sep 17 00:00:00 2001 From: mojoBrendan Date: Fri, 17 Jul 2015 08:08:35 -0500 Subject: [PATCH 1/3] add timeout to dequeue --- blockingconcurrentqueue.h | 64 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 60 insertions(+), 4 deletions(-) diff --git a/blockingconcurrentqueue.h b/blockingconcurrentqueue.h index 996c556b..5a1e1f71 100644 --- a/blockingconcurrentqueue.h +++ b/blockingconcurrentqueue.h @@ -167,6 +167,32 @@ namespace details while (rc == -1 && errno == EINTR); } + bool wait_for_usecs(int64_t usecs) { + int rc; + struct timespec ts; + const int usecs_in_1_sec = 1000000; + const int nsecs_in_1_sec = 1000000000; + clock_gettime(CLOCK_REALTIME, &ts); + ts.tv_sec += usecs / usecs_in_1_sec; + ts.tv_nsec += (usecs % usecs_in_1_sec) * 1000; + // sem_timedwait bombs if you have more than 1e9 in tv_nsec + // so we have to clean things up before passing it in + if (ts.tv_nsec > nsecs_in_1_sec) { + ts.tv_nsec -= nsecs_in_1_sec; + ts.tv_sec += 1; + } + + do { + rc = sem_timedwait(&m_sema, &ts); + } while (rc == -1 && errno == EINTR); + + if (rc == -1 && errno == ETIMEDOUT) { + return false; + } else { + return true; + } + } + void signal() { sem_post(&m_sema); @@ -196,7 +222,7 @@ namespace details std::atomic m_count; Semaphore m_sema; - void waitWithPartialSpinning() + bool waitWithPartialSpinning(int64_t timeout_usecs) { ssize_t oldCount; // Is there a better way to set the initial spin count? @@ -207,14 +233,22 @@ namespace details { oldCount = m_count.load(std::memory_order_relaxed); if ((oldCount > 0) && m_count.compare_exchange_strong(oldCount, oldCount - 1, std::memory_order_acquire, std::memory_order_relaxed)) - return; + return true; std::atomic_signal_fence(std::memory_order_acquire); // Prevent the compiler from collapsing the loop. } oldCount = m_count.fetch_sub(1, std::memory_order_acquire); if (oldCount <= 0) { - m_sema.wait(); + if (timeout_usecs >= 0) + { + return m_sema.wait_for_usecs(timeout_usecs); + } + else + { + m_sema.wait(); + } } + return true; } ssize_t waitManyWithPartialSpinning(ssize_t max) @@ -261,9 +295,18 @@ namespace details void wait() { if (!tryWait()) - waitWithPartialSpinning(); + waitWithPartialSpinning(-1); } + bool wait_for_usecs(int64_t timeout_usecs) + { + if (!tryWait()) { + return waitWithPartialSpinning(timeout_usecs); + } else { + return true; + } + } + // Acquires between 0 and (greedily) max, inclusive ssize_t tryWaitMany(ssize_t max) { @@ -654,6 +697,19 @@ class BlockingConcurrentQueue continue; } } + + template + inline bool wait_dequeue_with_timeout(int64_t timeout_usecs, U& item) + { + // 0us is still a timeout, hence >= + if (timeout_usecs >= 0 && !sema->wait_for_usecs(timeout_usecs)) { + return false; + } + while (!inner.try_dequeue(item)) { + continue; + } + return true; + } // Blocks the current thread until there's something to dequeue, then // dequeues it using an explicit consumer token. From 0782c7ef958514c8e919fe73c1e0b45da3162dab Mon Sep 17 00:00:00 2001 From: mojoBrendan Date: Wed, 4 May 2016 13:24:15 +0200 Subject: [PATCH 2/3] add missing include --- blockingconcurrentqueue.h | 1 + 1 file changed, 1 insertion(+) diff --git a/blockingconcurrentqueue.h b/blockingconcurrentqueue.h index 4f64fae4..5cc1ab01 100644 --- a/blockingconcurrentqueue.h +++ b/blockingconcurrentqueue.h @@ -26,6 +26,7 @@ extern "C" { #elif defined(__MACH__) #include #elif defined(__unix__) +#include #include #endif From b590ad1e71ff56a7c6bce97248df34a2b68916c9 Mon Sep 17 00:00:00 2001 From: mojoBrendan Date: Wed, 4 May 2016 13:32:38 +0200 Subject: [PATCH 3/3] use consistent header naming --- blockingconcurrentqueue.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/blockingconcurrentqueue.h b/blockingconcurrentqueue.h index 5cc1ab01..51df251a 100644 --- a/blockingconcurrentqueue.h +++ b/blockingconcurrentqueue.h @@ -26,7 +26,7 @@ extern "C" { #elif defined(__MACH__) #include #elif defined(__unix__) -#include +#include #include #endif