Skip to content

Commit 921e41e

Browse files
authored
Merge pull request #5 from SynaptiveMedical/synaptive/dev/sever/COM-513_SharedState
Shared State
2 parents 38a3d89 + 088eac3 commit 921e41e

File tree

4 files changed

+190
-124
lines changed

4 files changed

+190
-124
lines changed

include/thread_pool/rouser.hpp

Lines changed: 13 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include <thread_pool/slotted_bag.hpp>
44
#include <thread_pool/thread_pool_options.hpp>
5+
#include <thread_pool/thread_pool_state.hpp>
56
#include <thread_pool/worker.hpp>
67

78
#include <atomic>
@@ -52,17 +53,13 @@ class Rouser final
5253

5354
/**
5455
* @brief Move ctor implementation.
55-
* @note Be very careful when invoking this while the thread pool is
56-
* active, or in an otherwise undefined state.
5756
*/
58-
Rouser(Rouser&& rhs) noexcept;
57+
Rouser(Rouser&& rhs) = delete;
5958

6059
/**
6160
* @brief Move assignment implementaion.
62-
* @note Be very careful when invoking this while the thread pool is
63-
* active, or in an otherwise undefined state.
6461
*/
65-
Rouser& operator=(Rouser&& rhs) noexcept;
62+
Rouser& operator=(Rouser&& rhs) = delete;
6663

6764
/**
6865
* @brief Destructor implementation.
@@ -71,13 +68,11 @@ class Rouser final
7168

7269
/**
7370
* @brief start Create the executing thread and start tasks execution.
74-
* @param workers A reference to the vector containing sibling workers for performing round robin work stealing.
75-
* @param idle_workers A reference to the slotted bag containing all idle workers.
76-
* @param num_busy_waiters A reference to the atomic busy waiter counter.
71+
* @param state A pointer to thread pool's shared state.
7772
* @note The parameters passed into this function generally relate to the global thread pool state.
7873
*/
7974
template <typename Task, template<typename> class Queue>
80-
void start(std::vector<std::unique_ptr<Worker<Task, Queue>>>& workers, SlottedBag<Queue>& idle_workers, std::atomic<size_t>& num_busy_waiters);
75+
void start(std::shared_ptr<ThreadPoolState<Task, Queue>> state);
8176

8277
/**
8378
* @brief stop Stop all worker's thread and stealing activity.
@@ -91,12 +86,10 @@ class Rouser final
9186

9287
/**
9388
* @brief threadFunc Executing thread function.
94-
* @param workers A reference to the vector containing sibling workers for performing round robin work stealing.
95-
* @param idle_workers A reference to the slotted bag containing all idle workers.
96-
* @param num_busy_waiters A reference to the atomic busy waiter counter.
89+
* @param state A pointer to thread pool's shared state.
9790
*/
9891
template <typename Task, template<typename> class Queue>
99-
void threadFunc(std::vector<std::unique_ptr<Worker<Task, Queue>>>& workers, SlottedBag<Queue>& idle_workers, std::atomic<size_t>& num_busy_waiters);
92+
void threadFunc(std::shared_ptr<ThreadPoolState<Task, Queue>> shared_state);
10093

10194
std::atomic<State> m_state;
10295
std::thread m_thread;
@@ -109,36 +102,19 @@ inline Rouser::Rouser(std::chrono::microseconds rouse_period)
109102
{
110103
}
111104

112-
inline Rouser::Rouser(Rouser&& rhs) noexcept
113-
{
114-
*this = std::move(rhs);
115-
}
116-
117-
inline Rouser& Rouser::operator=(Rouser&& rhs) noexcept
118-
{
119-
if (this != &rhs)
120-
{
121-
m_state = rhs.m_state.load();
122-
m_thread = std::move(rhs.m_thread);
123-
m_rouse_period = std::move(rhs.m_rouse_period);
124-
}
125-
126-
return *this;
127-
}
128-
129105
inline Rouser::~Rouser()
130106
{
131107
stop();
132108
}
133109

134110
template <typename Task, template<typename> class Queue>
135-
inline void Rouser::start(std::vector<std::unique_ptr<Worker<Task, Queue>>>& workers, SlottedBag<Queue>& idle_workers, std::atomic<size_t>& num_busy_waiters)
111+
inline void Rouser::start(std::shared_ptr<ThreadPoolState<Task, Queue>> state)
136112
{
137113
auto expectedState = State::Initialized;
138114
if (!m_state.compare_exchange_strong(expectedState, State::Running, std::memory_order_acq_rel))
139115
throw std::runtime_error("Cannot start Rouser: it has previously been started or stopped.");
140116

141-
m_thread = std::thread(&Rouser::threadFunc<Task, Queue>, this, std::ref(workers), std::ref(idle_workers), std::ref(num_busy_waiters));
117+
m_thread = std::thread(&Rouser::threadFunc<Task, Queue>, this, state);
142118
}
143119

144120
inline void Rouser::stop()
@@ -151,16 +127,16 @@ inline void Rouser::stop()
151127
}
152128

153129
template <typename Task, template<typename> class Queue>
154-
inline void Rouser::threadFunc(std::vector<std::unique_ptr<Worker<Task, Queue>>>& workers, SlottedBag<Queue>& idle_workers, std::atomic<size_t>& num_busy_waiters)
130+
inline void Rouser::threadFunc(std::shared_ptr<ThreadPoolState<Task, Queue>> shared_state)
155131
{
156132
while (m_state.load(std::memory_order_acquire) == State::Running)
157133
{
158134
// Try to wake up a thread if there are no current busy waiters.
159-
if (num_busy_waiters.load(std::memory_order_acquire) == 0)
135+
if (shared_state->numBusyWaiters().load(std::memory_order_acquire) == 0)
160136
{
161-
auto result = idle_workers.tryEmptyAny();
137+
auto result = shared_state->idleWorkers().tryEmptyAny();
162138
if (result.first)
163-
workers[result.second]->wake();
139+
shared_state->workers()[result.second]->wake();
164140
}
165141

166142
// Sleep.

include/thread_pool/thread_pool.hpp

Lines changed: 31 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include <thread_pool/mpmc_bounded_queue.hpp>
55
#include <thread_pool/slotted_bag.hpp>
66
#include <thread_pool/thread_pool_options.hpp>
7+
#include <thread_pool/thread_pool_state.hpp>
78
#include <thread_pool/worker.hpp>
89
#include <thread_pool/rouser.hpp>
910

@@ -53,15 +54,11 @@ class GenericThreadPool final
5354

5455
/**
5556
* @brief Move ctor implementation.
56-
* @note Be very careful when invoking this while the thread pool is
57-
* active, or in an otherwise undefined state.
5857
*/
5958
GenericThreadPool(GenericThreadPool&& rhs) noexcept;
6059

6160
/**
6261
* @brief Move assignment implementaion.v
63-
* @note Be very careful when invoking this while the thread pool is
64-
* active, or in an otherwise undefined state.
6562
*/
6663
GenericThreadPool& operator=(GenericThreadPool&& rhs) noexcept;
6764

@@ -109,35 +106,31 @@ class GenericThreadPool final
109106
*/
110107
size_t getWorkerId();
111108

112-
SlottedBag<Queue> m_idle_workers;
113-
WorkerVector m_workers;
114-
Rouser m_rouser;
115109
size_t m_failed_wakeup_retry_cap;
116110
std::atomic<size_t> m_next_worker;
117-
std::atomic<size_t> m_num_busy_waiters;
111+
std::shared_ptr<Rouser> m_rouser;
112+
std::shared_ptr<ThreadPoolState<Task, Queue>> m_state;
118113
};
119114

120115

121116
/// Implementation
122117

123118
template <typename Task, template<typename> class Queue>
124119
inline GenericThreadPool<Task, Queue>::GenericThreadPool(ThreadPoolOptions options)
125-
: m_idle_workers(options.threadCount())
126-
, m_workers(options.threadCount())
127-
, m_rouser(options.rousePeriod())
128-
, m_failed_wakeup_retry_cap(options.failedWakeupRetryCap())
120+
: m_failed_wakeup_retry_cap(options.failedWakeupRetryCap())
129121
, m_next_worker(0)
130-
, m_num_busy_waiters(0)
122+
, m_rouser(std::make_shared<Rouser>(options.rousePeriod()))
123+
, m_state(ThreadPoolState<Task, Queue>::create(options))
131124
{
132125
// Instatiate all workers.
133-
for (auto it = m_workers.begin(); it != m_workers.end(); ++it)
126+
for (auto it = m_state->workers().begin(); it != m_state->workers().end(); ++it)
134127
it->reset(new Worker<Task, Queue>(options.busyWaitOptions(), options.queueSize()));
135128

136129
// Initialize all worker threads.
137-
for (size_t i = 0; i < m_workers.size(); ++i)
138-
m_workers[i]->start(i, m_workers, m_idle_workers, m_num_busy_waiters);
130+
for (size_t i = 0; i < m_state->workers().size(); ++i)
131+
m_state->workers()[i]->start(i, m_state);
139132

140-
m_rouser.start(m_workers, m_idle_workers, m_num_busy_waiters);
133+
m_rouser->start(m_state);
141134
}
142135

143136
template <typename Task, template<typename> class Queue>
@@ -151,12 +144,10 @@ inline GenericThreadPool<Task, Queue>& GenericThreadPool<Task, Queue>::operator=
151144
{
152145
if (this != &rhs)
153146
{
154-
m_idle_workers = std::move(rhs.m_idle_workers);
155-
m_workers = std::move(rhs.m_workers);
156-
m_rouser = std::move(rhs.m_rouser);
157147
m_failed_wakeup_retry_cap = rhs.m_failed_wakeup_retry_cap;
158148
m_next_worker = rhs.m_next_worker.load();
159-
m_num_busy_waiters = rhs.m_num_busy_waiters.load();
149+
m_rouser = std::move(rhs.m_rouser);
150+
m_state = std::move(rhs.m_state);
160151
}
161152

162153
return *this;
@@ -165,16 +156,22 @@ inline GenericThreadPool<Task, Queue>& GenericThreadPool<Task, Queue>::operator=
165156
template <typename Task, template<typename> class Queue>
166157
inline GenericThreadPool<Task, Queue>::~GenericThreadPool()
167158
{
168-
m_rouser.stop();
159+
if (!m_state || !m_rouser)
160+
return;
169161

170-
for (auto& worker_ptr : m_workers)
162+
m_rouser->stop();
163+
164+
for (auto& worker_ptr : m_state->workers())
171165
worker_ptr->stop();
172166
}
173167

174168
template <typename Task, template<typename> class Queue>
175169
template <typename Handler>
176170
inline bool GenericThreadPool<Task, Queue>::tryPost(Handler&& handler)
177171
{
172+
if (!m_state || !m_rouser)
173+
throw std::runtime_error("Attempting to invoke post on a moved object.");
174+
178175
return tryPostImpl(std::forward<Handler>(handler), m_failed_wakeup_retry_cap);
179176
}
180177

@@ -195,13 +192,13 @@ inline bool GenericThreadPool<Task, Queue>::tryPostImpl(Handler&& handler, size_
195192
// is fully utilized (num active workers = argmin(num tasks, num total workers)).
196193
// If there aren't busy waiters, let's see if we have any idling threads.
197194
// These incur higher overhead to wake up than the busy waiters.
198-
if (m_num_busy_waiters.load(std::memory_order_acquire) == 0)
195+
if (m_state->numBusyWaiters().load(std::memory_order_acquire) == 0)
199196
{
200-
auto result = m_idle_workers.tryEmptyAny();
197+
auto result = m_state->idleWorkers().tryEmptyAny();
201198
if (result.first)
202199
{
203-
auto success = m_workers[result.second]->tryPost(std::forward<Handler>(handler));
204-
m_workers[result.second]->wake();
200+
auto success = m_state->workers()[result.second]->tryPost(std::forward<Handler>(handler));
201+
m_state->workers()[result.second]->wake();
205202

206203
// The above post will only fail if the idle worker's queue is full, which is an extremely
207204
// low probability scenario. In that case, we wake the worker and let it get to work on
@@ -219,24 +216,24 @@ inline bool GenericThreadPool<Task, Queue>::tryPostImpl(Handler&& handler, size_
219216
auto initialWorkerId = id;
220217
do
221218
{
222-
if (m_workers[id]->tryPost(std::forward<Handler>(handler)))
219+
if (m_state->workers()[id]->tryPost(std::forward<Handler>(handler)))
223220
{
224221
// The following section increases the probability that tasks will not be dropped.
225222
// This is a soft constraint, the strict task dropping bound is covered by the Rouser
226223
// thread's functionality. This code experimentally lowers task response time under
227224
// low thread pool utilization without incurring significant performance penalties at
228225
// high thread pool utilization.
229-
if (m_num_busy_waiters.load(std::memory_order_acquire) == 0)
226+
if (m_state->numBusyWaiters().load(std::memory_order_acquire) == 0)
230227
{
231-
auto result = m_idle_workers.tryEmptyAny();
228+
auto result = m_state->idleWorkers().tryEmptyAny();
232229
if (result.first)
233-
m_workers[result.second]->wake();
230+
m_state->workers()[result.second]->wake();
234231
}
235232

236233
return true;
237234
}
238235

239-
++id %= m_workers.size();
236+
++id %= m_state->workers().size();
240237
} while (id != initialWorkerId);
241238

242239
// All Queues in our thread pool are full during one whole iteration.
@@ -249,8 +246,8 @@ inline size_t GenericThreadPool<Task, Queue>::getWorkerId()
249246
{
250247
auto id = Worker<Task, Queue>::getWorkerIdForCurrentThread();
251248

252-
if (id > m_workers.size())
253-
id = m_next_worker.fetch_add(1, std::memory_order_relaxed) % m_workers.size();
249+
if (id > m_state->workers().size())
250+
id = m_next_worker.fetch_add(1, std::memory_order_relaxed) % m_state->workers().size();
254251

255252
return id;
256253
}

0 commit comments

Comments
 (0)