Skip to content

Commit 3dcb47a

Browse files
authored
feat: Update asio to 1.34 and fixed build issues (#31)
* Update asio to 1.34 * Replace .wrap with bind_executor * Rename io_service to io_context * Use asio::post * CI: Updated Visual Studio Toolset to v141 / VS2017
1 parent d7c7b9f commit 3dcb47a

File tree

13 files changed

+65
-61
lines changed

13 files changed

+65
-61
lines changed

.github/workflows/build-macos.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ jobs:
3535
-DTCP_PUBSUB_USE_BUILTIN_RECYCLE=ON \
3636
-DTCP_PUBSUB_USE_BUILTIN_GTEST=ON \
3737
-DCMAKE_BUILD_TYPE=${{env.BUILD_TYPE}} \
38+
-DCMAKE_CXX_FLAGS=-DASIO_NO_DEPRECATED
3839
shell: bash
3940

4041
- name: Build

.github/workflows/build-ubuntu.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ jobs:
6464
-DTCP_PUBSUB_USE_BUILTIN_GTEST=ON \
6565
-DTCP_PUBSUB_LIBRARY_TYPE=${{env.tcp_pubsub_library_type}} \
6666
-DCMAKE_BUILD_TYPE=${{env.BUILD_TYPE}} \
67+
-DCMAKE_CXX_FLAGS=-DASIO_NO_DEPRECATED \
6768
-DBUILD_SHARED_LIBS=${{ env.build_shared_libs }}
6869
6970
- name: Build

.github/workflows/build-windows.yml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ env:
99
# Customize the CMake build type here (Release, Debug, RelWithDebInfo, etc.)
1010
INSTALL_PREFIX: _install
1111
PROJECT_NAME: tcp_pubsub
12-
VS_TOOLSET: v140
13-
VS_NAME: vs2015
12+
VS_TOOLSET: v141
13+
VS_NAME: vs2017
1414

1515
jobs:
1616
build-windows:
@@ -71,6 +71,7 @@ jobs:
7171
-DTCP_PUBSUB_USE_BUILTIN_RECYCLE=ON ^
7272
-DTCP_PUBSUB_USE_BUILTIN_GTEST=ON ^
7373
-DTCP_PUBSUB_LIBRARY_TYPE=${{env.tcp_pubsub_library_type}} ^
74+
-DCMAKE_CXX_FLAGS=/DASIO_NO_DEPRECATED ^
7475
-DCMAKE_INSTALL_PREFIX=${{env.INSTALL_PREFIX}}
7576
7677
- name: Build (Release)

tcp_pubsub/src/executor_impl.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ namespace tcp_pubsub
1818
{
1919
Executor_Impl::Executor_Impl(const logger::logger_t& log_function)
2020
: log_(log_function)
21-
, io_service_(std::make_shared<asio::io_service>())
22-
, dummy_work_(std::make_shared<asio::io_service::work>(*io_service_))
21+
, io_context_(std::make_shared<asio::io_context>())
22+
, dummy_work_(std::make_shared<work_guard_t>(io_context_->get_executor()))
2323
{
2424
#if (TCP_PUBSUB_LOG_DEBUG_ENABLED)
2525
log_(logger::LogLevel::Debug, "Executor: Creating Executor.");
@@ -76,7 +76,7 @@ namespace tcp_pubsub
7676
me->log_(logger::LogLevel::Debug, "Executor: IoService::Run() in thread " + thread_id);
7777
#endif
7878

79-
me->io_service_->run();
79+
me->io_context_->run();
8080

8181
#if (TCP_PUBSUB_LOG_DEBUG_ENABLED)
8282
me->log_(logger::LogLevel::Debug, "Executor: IoService: Shutdown of thread " + thread_id);
@@ -95,12 +95,12 @@ namespace tcp_pubsub
9595
dummy_work_.reset();
9696

9797
// Stop the IO Service
98-
io_service_->stop();
98+
io_context_->stop();
9999
}
100100

101-
std::shared_ptr<asio::io_service> Executor_Impl::ioService() const
101+
std::shared_ptr<asio::io_context> Executor_Impl::ioService() const
102102
{
103-
return io_service_;
103+
return io_context_;
104104
}
105105

106106
logger::logger_t Executor_Impl::logFunction() const

tcp_pubsub/src/executor_impl.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ namespace tcp_pubsub
3232
void start(size_t thread_count);
3333
void stop();
3434

35-
std::shared_ptr<asio::io_service> ioService() const;
35+
std::shared_ptr<asio::io_context> ioService() const;
3636
logger::logger_t logFunction() const;
3737

3838

@@ -43,9 +43,10 @@ namespace tcp_pubsub
4343

4444
private:
4545
const logger::logger_t log_; /// Logger
46-
std::shared_ptr<asio::io_service> io_service_; /// global io service
46+
std::shared_ptr<asio::io_context> io_context_; /// global io service
4747

4848
std::vector<std::thread> thread_pool_; /// Asio threadpool executing the io servic
49-
std::shared_ptr<asio::io_service::work> dummy_work_; /// Dummy work, so the io_service will never run out of work and shut down, even if there is no publisher or subscriber at the moment
49+
using work_guard_t = asio::executor_work_guard<asio::io_context::executor_type>;
50+
std::shared_ptr<work_guard_t> dummy_work_; /// Dummy work, so the io_context will never run out of work and shut down, even if there is no publisher or subscriber at the moment
5051
};
5152
}

tcp_pubsub/src/publisher_impl.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ namespace tcp_pubsub
8383
std::atomic<bool> is_running_; /// Indicates whether this publisher is running and can send data. May be false, if e.g. binding to the given address has failed.
8484

8585
// Asio
86-
const std::shared_ptr<Executor> executor_; /// Global Executor (holding the io_service and thread pool)
86+
const std::shared_ptr<Executor> executor_; /// Global Executor (holding the io_context and thread pool)
8787
asio::ip::tcp::acceptor acceptor_; /// Acceptor used for waiting for clients (i.e. subscribers)
8888

8989
// Logger

tcp_pubsub/src/publisher_session.cpp

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,15 @@ namespace tcp_pubsub
3030
/// Constructor & Destructor
3131
//////////////////////////////////////////////
3232

33-
PublisherSession::PublisherSession(const std::shared_ptr<asio::io_service>& io_service
33+
PublisherSession::PublisherSession(const std::shared_ptr<asio::io_context>& io_context
3434
, const std::function<void(const std::shared_ptr<PublisherSession>&)>& session_closed_handler
3535
, const tcp_pubsub::logger::logger_t& log_function)
36-
: io_service_ (io_service)
36+
: io_context_ (io_context)
3737
, state_ (State::NotStarted)
3838
, session_closed_handler_ (session_closed_handler)
3939
, log_ (log_function)
40-
, data_socket_ (*io_service_)
41-
, data_strand_ (*io_service_)
40+
, data_socket_ (*io_context_)
41+
, data_strand_ (*io_context_)
4242
, sending_in_progress_ (false)
4343
{
4444
#if (TCP_PUBSUB_LOG_DEBUG_VERBOSE_ENABLED)
@@ -132,7 +132,7 @@ namespace tcp_pubsub
132132
asio::async_read(data_socket_
133133
, asio::buffer(&(header->header_size), sizeof(header->header_size))
134134
, asio::transfer_at_least(sizeof(header->header_size))
135-
, data_strand_.wrap([me = shared_from_this(), header](asio::error_code ec, std::size_t /*length*/)
135+
, asio::bind_executor(data_strand_, [me = shared_from_this(), header](asio::error_code ec, std::size_t /*length*/)
136136
{
137137
if (ec)
138138
{
@@ -165,7 +165,7 @@ namespace tcp_pubsub
165165
asio::async_read(data_socket_
166166
, asio::buffer(&reinterpret_cast<char*>(header.get())[sizeof(header->header_size)], bytes_to_read_from_socket)
167167
, asio::transfer_at_least(bytes_to_read_from_socket)
168-
, data_strand_.wrap([me = shared_from_this(), header, bytes_to_discard_from_socket](asio::error_code ec, std::size_t /*length*/)
168+
, asio::bind_executor(data_strand_, [me = shared_from_this(), header, bytes_to_discard_from_socket](asio::error_code ec, std::size_t /*length*/)
169169
{
170170
if (ec)
171171
{
@@ -207,7 +207,7 @@ namespace tcp_pubsub
207207
asio::async_read(data_socket_
208208
, asio::buffer(data_to_discard.data(), bytes_to_discard)
209209
, asio::transfer_at_least(bytes_to_discard)
210-
, data_strand_.wrap([me = shared_from_this(), header](asio::error_code ec, std::size_t /*length*/)
210+
, asio::bind_executor(data_strand_, [me = shared_from_this(), header](asio::error_code ec, std::size_t /*length*/)
211211
{
212212
if (ec)
213213
{
@@ -240,7 +240,7 @@ namespace tcp_pubsub
240240
asio::async_read(data_socket_
241241
, asio::buffer(data_buffer->data(), le64toh(header->data_size))
242242
, asio::transfer_at_least(le64toh(header->data_size))
243-
, data_strand_.wrap([me = shared_from_this(), header, data_buffer](asio::error_code ec, std::size_t /*length*/)
243+
, asio::bind_executor(data_strand_, [me = shared_from_this(), header, data_buffer](asio::error_code ec, std::size_t /*length*/)
244244
{
245245
if (ec)
246246
{
@@ -341,7 +341,7 @@ namespace tcp_pubsub
341341

342342
asio::async_write(data_socket_
343343
, asio::buffer(*buffer)
344-
, data_strand_.wrap(
344+
, asio::bind_executor(data_strand_,
345345
[me = shared_from_this(), buffer](asio::error_code ec, std::size_t /*bytes_to_transfer*/)
346346
{
347347
#if (TCP_PUBSUB_LOG_DEBUG_VERBOSE_ENABLED)

tcp_pubsub/src/publisher_session.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ namespace tcp_pubsub
3636
/// Constructor & Destructor
3737
//////////////////////////////////////////////
3838
public:
39-
PublisherSession(const std::shared_ptr<asio::io_service>& io_service
39+
PublisherSession(const std::shared_ptr<asio::io_context>& io_context
4040
, const std::function<void(const std::shared_ptr<PublisherSession>&)>& session_closed_handler
4141
, const tcp_pubsub::logger::logger_t& log_function);
4242

@@ -97,7 +97,7 @@ namespace tcp_pubsub
9797
//////////////////////////////////////////////
9898
private:
9999
// Asio IO Service
100-
std::shared_ptr<asio::io_service> io_service_;
100+
std::shared_ptr<asio::io_context> io_context_;
101101

102102
// Whether the session has been canceled
103103
std::atomic<State> state_;
@@ -109,11 +109,11 @@ namespace tcp_pubsub
109109

110110
// TCP Socket & Queue (protected by the strand!)
111111
asio::ip::tcp::socket data_socket_;
112-
asio::io_service::strand data_strand_;
112+
asio::io_context::strand data_strand_;
113113

114114
// Variable holding if we are currently sending any data and what data to send next
115115
std::mutex next_buffer_mutex_;
116116
bool sending_in_progress_;
117117
std::shared_ptr<std::vector<char>> next_buffer_to_send_;
118118
};
119-
}
119+
}

tcp_pubsub/src/subscriber_session_impl.cpp

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -32,20 +32,20 @@ namespace tcp_pubsub
3232
/// Constructor & Destructor
3333
//////////////////////////////////////////////
3434

35-
SubscriberSession_Impl::SubscriberSession_Impl(const std::shared_ptr<asio::io_service>& io_service
35+
SubscriberSession_Impl::SubscriberSession_Impl(const std::shared_ptr<asio::io_context>& io_context
3636
, const std::vector<std::pair<std::string, uint16_t>>& publisher_list
3737
, int max_reconnection_attempts
3838
, const std::function<std::shared_ptr<std::vector<char>>()>& get_buffer_handler
3939
, const std::function<void(const std::shared_ptr<SubscriberSession_Impl>&)>& session_closed_handler
4040
, const tcp_pubsub::logger::logger_t& log_function)
4141
: publisher_list_ (publisher_list)
42-
, resolver_ (*io_service)
42+
, resolver_ (*io_context)
4343
, max_reconnection_attempts_(max_reconnection_attempts)
4444
, retries_left_ (max_reconnection_attempts)
45-
, retry_timer_ (*io_service, std::chrono::seconds(1))
45+
, retry_timer_ (*io_context, std::chrono::seconds(1))
4646
, canceled_ (false)
47-
, data_socket_ (*io_service)
48-
, data_strand_ (*io_service)
47+
, data_socket_ (*io_context)
48+
, data_strand_ (*io_context)
4949
, get_buffer_handler_ (get_buffer_handler)
5050
, session_closed_handler_ (session_closed_handler)
5151
, log_ (log_function)
@@ -88,16 +88,15 @@ namespace tcp_pubsub
8888

8989
void SubscriberSession_Impl::resolveEndpoint(size_t publisher_list_index)
9090
{
91-
const asio::ip::tcp::resolver::query query(publisher_list_[publisher_list_index].first, std::to_string(publisher_list_[publisher_list_index].second));
92-
9391
if (canceled_)
9492
{
9593
connectionFailedHandler();
9694
return;
9795
}
9896

99-
resolver_.async_resolve(query
100-
, [me = shared_from_this(), publisher_list_index](asio::error_code ec, const asio::ip::tcp::resolver::iterator& resolved_endpoints)
97+
resolver_.async_resolve(publisher_list_[publisher_list_index].first
98+
, std::to_string(publisher_list_[publisher_list_index].second)
99+
, [me = shared_from_this(), publisher_list_index](asio::error_code ec, const asio::ip::tcp::resolver::results_type& resolved_endpoints)
101100
{
102101
if (ec)
103102
{
@@ -136,7 +135,7 @@ namespace tcp_pubsub
136135
});
137136
}
138137

139-
void SubscriberSession_Impl::connectToEndpoint(const asio::ip::tcp::resolver::iterator& resolved_endpoints, size_t publisher_list_index)
138+
void SubscriberSession_Impl::connectToEndpoint(const asio::ip::tcp::resolver::results_type& resolved_endpoints, size_t publisher_list_index)
140139
{
141140
if (canceled_)
142141
{
@@ -147,9 +146,9 @@ namespace tcp_pubsub
147146
// Convert the resolved_endpoints iterator to an endpoint sequence
148147
// (i.e. a vector of endpoints)
149148
auto endpoint_sequence = std::make_shared<std::vector<asio::ip::tcp::endpoint>>();
150-
for (auto it = resolved_endpoints; it != asio::ip::tcp::resolver::iterator(); ++it)
149+
for (const auto& endpoint : resolved_endpoints)
151150
{
152-
endpoint_sequence->push_back(*it);
151+
endpoint_sequence->push_back(endpoint);
153152
}
154153

155154
asio::async_connect(data_socket_
@@ -220,7 +219,7 @@ namespace tcp_pubsub
220219

221220
asio::async_write(data_socket_
222221
, asio::buffer(*buffer)
223-
, data_strand_.wrap(
222+
, asio::bind_executor(data_strand_,
224223
[me = shared_from_this(), buffer](asio::error_code ec, std::size_t /*bytes_to_transfer*/)
225224
{
226225
if (ec)
@@ -301,7 +300,7 @@ namespace tcp_pubsub
301300
asio::async_read(data_socket_
302301
, asio::buffer(&(header->header_size), sizeof(header->header_size))
303302
, asio::transfer_at_least(sizeof(header->header_size))
304-
, data_strand_.wrap([me = shared_from_this(), header](asio::error_code ec, std::size_t /*length*/)
303+
, asio::bind_executor(data_strand_, [me = shared_from_this(), header](asio::error_code ec, std::size_t /*length*/)
305304
{
306305
if (ec)
307306
{
@@ -337,7 +336,7 @@ namespace tcp_pubsub
337336
asio::async_read(data_socket_
338337
, asio::buffer(&reinterpret_cast<char*>(header.get())[sizeof(header->header_size)], bytes_to_read_from_socket)
339338
, asio::transfer_at_least(bytes_to_read_from_socket)
340-
, data_strand_.wrap([me = shared_from_this(), header, bytes_to_discard_from_socket](asio::error_code ec, std::size_t /*length*/)
339+
, asio::bind_executor(data_strand_, [me = shared_from_this(), header, bytes_to_discard_from_socket](asio::error_code ec, std::size_t /*length*/)
341340
{
342341
if (ec)
343342
{
@@ -380,7 +379,7 @@ namespace tcp_pubsub
380379
asio::async_read(data_socket_
381380
, asio::buffer(data_to_discard.data(), bytes_to_discard)
382381
, asio::transfer_at_least(bytes_to_discard)
383-
, data_strand_.wrap([me = shared_from_this(), header](asio::error_code ec, std::size_t /*length*/)
382+
, asio::bind_executor(data_strand_, [me = shared_from_this(), header](asio::error_code ec, std::size_t /*length*/)
384383
{
385384
if (ec)
386385
{
@@ -424,7 +423,7 @@ namespace tcp_pubsub
424423
asio::async_read(data_socket_
425424
, asio::buffer(data_buffer->data(), le64toh(header->data_size))
426425
, asio::transfer_at_least(le64toh(header->data_size))
427-
, data_strand_.wrap([me = shared_from_this(), header, data_buffer](asio::error_code ec, std::size_t /*length*/)
426+
, asio::bind_executor(data_strand_, [me = shared_from_this(), header, data_buffer](asio::error_code ec, std::size_t /*length*/)
428427
{
429428
if (ec)
430429
{
@@ -457,7 +456,7 @@ namespace tcp_pubsub
457456
me->log_(logger::LogLevel::DebugVerbose, "SubscriberSession " + me->endpointToString() + ": Received message of type \"RegularPayload\"");
458457
#endif
459458
// Call the callback first, ...
460-
me->data_strand_.post([me, data_buffer, header]()
459+
asio::post(me->data_strand_, [me, data_buffer, header]()
461460
{
462461
if (me->canceled_)
463462
{
@@ -476,10 +475,9 @@ namespace tcp_pubsub
476475
}
477476

478477
// ... then start reading the next message
479-
me->data_strand_.post([me]()
480-
{
481-
me->readHeaderLength();
482-
});
478+
asio::post(me->data_strand_, [me]() {
479+
me->readHeaderLength();
480+
});
483481
}));
484482
}
485483

@@ -495,7 +493,7 @@ namespace tcp_pubsub
495493
// - We can protect the variable with the data_strand => If the callback is currently running, the new callback will be applied afterwards
496494
// - We don't need an additional mutex, so a synchronous callback should actually be able to set another callback that gets activated once the current callback call ends
497495
// - Reading the next message will start once the callback call is finished. Therefore, read and callback are synchronized and the callback calls don't start stacking up
498-
data_strand_.post([me = shared_from_this(), callback]()
496+
asio::post(data_strand_, [me = shared_from_this(), callback]()
499497
{
500498
me->synchronous_callback_ = callback;
501499
});
@@ -545,14 +543,16 @@ namespace tcp_pubsub
545543
}
546544

547545
{
548-
asio::error_code ec;
549-
retry_timer_.cancel(ec);
546+
try {
547+
static_cast<void>(retry_timer_.cancel());
550548
#if (TCP_PUBSUB_LOG_DEBUG_VERBOSE_ENABLED)
551-
if (ec)
552-
log_(logger::LogLevel::DebugVerbose, "SubscriberSession " + endpointToString() + ": Failed canceling retry timer: " + ec.message());
553-
else
554-
log_(logger::LogLevel::DebugVerbose, "SubscriberSession " + endpointToString() + ": Successfully canceled retry timer.");
549+
log_(logger::LogLevel::DebugVerbose, "SubscriberSession " + endpointToString() + ": Successfully canceled retry timer.");
555550
#endif
551+
} catch (asio::system_error& err){
552+
#if (TCP_PUBSUB_LOG_DEBUG_VERBOSE_ENABLED)
553+
log_(logger::LogLevel::DebugVerbose, "SubscriberSession " + endpointToString() + ": Failed canceling retry timer: " + err.what());
554+
#endif
555+
}
556556
}
557557

558558
resolver_.cancel();

0 commit comments

Comments
 (0)