Skip to content

Commit 74d2e7d

Browse files
committed
Try Windows support
Signed-off-by: Juan Cruz Viotti <[email protected]>
1 parent 06fc606 commit 74d2e7d

File tree

1 file changed

+91
-33
lines changed

1 file changed

+91
-33
lines changed

src/lang/parallel/include/sourcemeta/core/parallel_for_each.h

Lines changed: 91 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,34 @@
88
#include <iterator> // std::input_iterator, std::iter_reference_t
99
#include <mutex> // std::mutex, std::lock_guard
1010
#include <queue> // std::queue
11+
#include <stdexcept> // std::runtime_error
1112
#include <thread> // std::thread
1213
#include <utility> // std::forward
1314
#include <vector> // std::vector
1415

16+
#if defined(_WIN32)
17+
#include <process.h> // _beginthreadex
18+
#include <windows.h>
19+
#else
1520
#include <pthread.h>
21+
#endif
1622

1723
namespace sourcemeta::core {
1824

25+
#ifndef DOXYGEN
26+
#if defined(_WIN32)
27+
// See
28+
// https://learn.microsoft.com/en-us/cpp/c-runtime-library/reference/beginthread-beginthreadex?view=msvc-170
29+
inline unsigned __stdcall parallel_for_each_windows_thread_start(
30+
void *argument) {
31+
auto *function_ptr = static_cast<std::function<void()> *>(argument);
32+
(*function_ptr)();
33+
delete function_ptr;
34+
return 0;
35+
}
36+
#endif
37+
#endif
38+
1939
/// @ingroup parallel
2040
///
2141
/// Process a collection in parallel. If the parallelism is set to zero, the
@@ -80,51 +100,89 @@ auto parallel_for_each(
80100
workers.reserve(effective_parallelism);
81101

82102
const auto total{tasks.size()};
103+
104+
// Worker function that runs the actual per-item work and captures the
105+
// environment by reference. It will be heap-copied into the native thread
106+
// API.
107+
auto worker_callable = [&tasks, &queue_mutex, &effective_callback,
108+
&handle_exception, effective_parallelism, total] {
109+
try {
110+
while (true) {
111+
Iterator iterator;
112+
std::size_t cursor{0};
113+
{
114+
std::lock_guard<std::mutex> lock{queue_mutex};
115+
if (tasks.empty()) {
116+
return;
117+
}
118+
iterator = tasks.front();
119+
cursor = total - tasks.size() + 1;
120+
tasks.pop();
121+
}
122+
effective_callback(*iterator, effective_parallelism, cursor);
123+
}
124+
} catch (...) {
125+
handle_exception(std::current_exception());
126+
}
127+
};
128+
129+
#if defined(_WIN32)
83130
for (std::size_t index = 0; index < effective_parallelism; ++index) {
84-
// We can't use std::thread, as it doesn't let
85-
// us tweak the thread stack size
131+
auto *heap_function = new std::function<void()>(worker_callable);
132+
if (stack_size_bytes > static_cast<std::size_t>(UINT_MAX)) {
133+
delete heap_function;
134+
throw std::runtime_error(
135+
"The requested stack size is too large for this platform");
136+
}
137+
138+
auto raw_handle = _beginthreadex(
139+
nullptr, static_cast<unsigned>(stack_size_bytes),
140+
&parallel_for_each_windows_thread_start, heap_function, 0, nullptr);
141+
if (raw_handle == 0) {
142+
delete heap_function;
143+
throw std::runtime_error("Could not create thread");
144+
}
145+
146+
HANDLE thread_handle = reinterpret_cast<HANDLE>(raw_handle);
147+
workers.emplace_back([thread_handle] {
148+
WaitForSingleObject(thread_handle, INFINITE);
149+
CloseHandle(thread_handle);
150+
});
151+
}
152+
#else
153+
for (std::size_t index = 0; index < effective_parallelism; ++index) {
154+
// We can't use std::thread, as it doesn't let us tweak the thread stack
155+
// size
86156
pthread_attr_t attr;
87157
pthread_attr_init(&attr);
88158
if (stack_size_bytes > 0) {
89159
pthread_attr_setstacksize(&attr, stack_size_bytes);
90160
}
91-
pthread_t handle;
92-
pthread_create(
93-
&handle, &attr,
161+
162+
auto *heap_function = new std::function<void()>(worker_callable);
163+
pthread_t pthread_handle;
164+
auto raw_handle = pthread_create(
165+
&pthread_handle, &attr,
94166
[](void *arg) -> void * {
95-
auto *fn = static_cast<std::function<void()> *>(arg);
96-
(*fn)();
97-
delete fn;
167+
auto *function_ptr = static_cast<std::function<void()> *>(arg);
168+
(*function_ptr)();
169+
delete function_ptr;
98170
return nullptr;
99171
},
100-
new std::function<void()>([&tasks, &queue_mutex, &effective_callback,
101-
&handle_exception, effective_parallelism,
102-
total] {
103-
try {
104-
while (true) {
105-
Iterator iterator;
106-
std::size_t cursor{0};
107-
{
108-
std::lock_guard<std::mutex> lock{queue_mutex};
109-
if (tasks.empty()) {
110-
return;
111-
}
112-
iterator = tasks.front();
113-
cursor = total - tasks.size() + 1;
114-
tasks.pop();
115-
}
116-
effective_callback(*iterator, effective_parallelism, cursor);
117-
}
118-
} catch (...) {
119-
handle_exception(std::current_exception());
120-
}
121-
}));
122-
workers.emplace_back([handle] { pthread_join(handle, nullptr); });
172+
heap_function);
173+
if (raw_handle != 0) {
174+
pthread_attr_destroy(&attr);
175+
delete heap_function;
176+
throw std::runtime_error("Could not create thread");
177+
}
178+
workers.emplace_back(
179+
[pthread_handle] { pthread_join(pthread_handle, nullptr); });
123180
pthread_attr_destroy(&attr);
124181
}
182+
#endif
125183

126-
for (auto &thread : workers) {
127-
thread.join();
184+
for (auto &worker_thread : workers) {
185+
worker_thread.join();
128186
}
129187

130188
if (exception) {

0 commit comments

Comments
 (0)