Skip to content

Commit 3f50d39

Browse files
committed
Reconnect on all transport layer errors and log them.
Summary: Ref #38 Reviewers: ivica Reviewed By: ivica Subscribers: miljen Differential Revision: https://repo.mireo.local/D37743
1 parent 150ba94 commit 3f50d39

File tree

8 files changed

+134
-53
lines changed

8 files changed

+134
-53
lines changed

doc/qbk/reference/concepts/LoggerType.qbk

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,13 @@ A type satisfies the `LoggerType` concept if it defines [*any number (including
7272
]
7373
[Invoked when the __DISCONNECT__ packet is received, indicating that the Broker wants to close this connection. ]
7474
]
75+
[
76+
[`void at_transport_error(error_code ec);`]
77+
[
78+
[*`ec`] is the `error_code` returned by the `async_read_some` or `async_write_some` operation on the underlying __StreamType__ stream.
79+
]
80+
[Invoked when a read or write operation on the underlying __StreamType__ stream fails, triggering a reconnection to the broker.]
81+
]
7582
]
7683

7784
For example, a type `T` that defines `at_connack` and `at_disconnect` functions with their respective arguments is considered a valid `LoggerType`.

include/boost/mqtt5/detail/log_invoke.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ class log_invoke {
6868
_logger.at_disconnect(rc, dc_props);
6969
}
7070

71+
void at_transport_error(error_code ec) {
72+
if constexpr (has_at_transport_error<LoggerType>)
73+
_logger.at_transport_error(ec);
74+
}
7175
};
7276

7377
} // end namespace boost::mqtt5::detail

include/boost/mqtt5/impl/connect_op.hpp

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -240,8 +240,10 @@ class connect_op {
240240
if (is_cancelled())
241241
return complete(asio::error::operation_aborted);
242242

243-
if (ec)
243+
if (ec) {
244+
_log.at_transport_error(ec);
244245
return do_shutdown(ec);
246+
}
245247

246248
_buffer_ptr = std::make_unique<std::string>(min_packet_sz, char(0));
247249

@@ -258,8 +260,10 @@ class connect_op {
258260
if (is_cancelled())
259261
return complete(asio::error::operation_aborted);
260262

261-
if (ec)
263+
if (ec) {
264+
_log.at_transport_error(ec);
262265
return do_shutdown(ec);
266+
}
263267

264268
auto code = control_code_e((*_buffer_ptr)[0] & 0b11110000);
265269

@@ -301,8 +305,10 @@ class connect_op {
301305
if (is_cancelled())
302306
return complete(asio::error::operation_aborted);
303307

304-
if (ec)
308+
if (ec) {
309+
_log.at_transport_error(ec);
305310
return do_shutdown(ec);
311+
}
306312

307313
if (code == control_code_e::connack)
308314
return on_connack(first, last);
@@ -402,8 +408,10 @@ class connect_op {
402408
if (is_cancelled())
403409
return complete(asio::error::operation_aborted);
404410

405-
if (ec)
411+
if (ec) {
412+
_log.at_transport_error(ec);
406413
return do_shutdown(ec);
414+
}
407415

408416
auto buff = asio::buffer(_buffer_ptr->data(), min_packet_sz);
409417
asio::async_read(

include/boost/mqtt5/impl/read_op.hpp

Lines changed: 6 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -76,13 +76,8 @@ class read_op {
7676
);
7777
}
7878
else
79-
asio::post(
80-
_owner.get_executor(),
81-
asio::prepend(
82-
std::move(*this), on_read {}, stream_ptr,
83-
std::array<size_t, 2> { 0, 1 },
84-
asio::error::not_connected, 0, error_code {}
85-
)
79+
_owner.async_reconnect(
80+
stream_ptr, asio::prepend(std::move(*this), on_reconnect {})
8681
);
8782
}
8883

@@ -100,13 +95,10 @@ class read_op {
10095
if (!ec)
10196
return complete(ec, bytes_read);
10297

103-
// websocket returns operation_aborted if disconnected
104-
if (should_reconnect(ec) || ec == asio::error::operation_aborted)
105-
return _owner.async_reconnect(
106-
stream_ptr, asio::prepend(std::move(*this), on_reconnect {})
107-
);
108-
109-
return complete(asio::error::no_recovery, bytes_read);
98+
_owner.log().at_transport_error(ec);
99+
_owner.async_reconnect(
100+
stream_ptr, asio::prepend(std::move(*this), on_reconnect {})
101+
);
110102
}
111103

112104
void operator()(on_reconnect, error_code ec) {
@@ -120,17 +112,6 @@ class read_op {
120112
void complete(error_code ec, size_t bytes_read) {
121113
std::move(_handler)(ec, bytes_read);
122114
}
123-
124-
static bool should_reconnect(error_code ec) {
125-
using namespace asio::error;
126-
// note: Win ERROR_SEM_TIMEOUT == Posix ENOLINK (Reserved)
127-
return ec.value() == 1236L || /* Win ERROR_CONNECTION_ABORTED */
128-
ec.value() == 121L || /* Win ERROR_SEM_TIMEOUT */
129-
ec == connection_aborted || ec == not_connected ||
130-
ec == timed_out || ec == connection_reset ||
131-
ec == broken_pipe || ec == asio::error::eof;
132-
}
133-
134115
};
135116

136117

include/boost/mqtt5/impl/write_op.hpp

Lines changed: 6 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,8 @@ class write_op {
6161
asio::prepend(std::move(*this), on_write {}, stream_ptr)
6262
);
6363
else
64-
asio::post(
65-
_owner.get_executor(),
66-
asio::prepend(
67-
std::move(*this), on_write {},
68-
stream_ptr, asio::error::not_connected, 0
69-
)
64+
_owner.async_reconnect(
65+
stream_ptr, asio::prepend(std::move(*this), on_reconnect {})
7066
);
7167
}
7268

@@ -80,13 +76,10 @@ class write_op {
8076
if (!ec)
8177
return complete(ec, bytes_written);
8278

83-
// websocket returns operation_aborted if disconnected
84-
if (should_reconnect(ec) || ec == asio::error::operation_aborted)
85-
return _owner.async_reconnect(
86-
stream_ptr, asio::prepend(std::move(*this), on_reconnect {})
87-
);
88-
89-
return complete(asio::error::no_recovery, 0);
79+
_owner.log().at_transport_error(ec);
80+
_owner.async_reconnect(
81+
stream_ptr, asio::prepend(std::move(*this), on_reconnect {})
82+
);
9083
}
9184

9285
void operator()(on_reconnect, error_code ec) {
@@ -100,17 +93,6 @@ class write_op {
10093
void complete(error_code ec, size_t bytes_written) {
10194
std::move(_handler)(ec, bytes_written);
10295
}
103-
104-
static bool should_reconnect(error_code ec) {
105-
using namespace asio::error;
106-
// note: Win ERROR_SEM_TIMEOUT == Posix ENOLINK (Reserved)
107-
return ec.value() == 1236L || /* Win ERROR_CONNECTION_ABORTED */
108-
ec.value() == 121L || /* Win ERROR_SEM_TIMEOUT */
109-
ec == connection_aborted || ec == not_connected ||
110-
ec == timed_out || ec == connection_reset ||
111-
ec == broken_pipe || ec == asio::error::eof;
112-
}
113-
11496
};
11597

11698
} // end namespace boost::mqtt5::detail

include/boost/mqtt5/logger.hpp

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,22 @@ class logger {
198198
std::clog << std::endl;
199199
}
200200

201+
/**
202+
* \brief Outputs the error message when it occurs at the transport layer
203+
* read or write operations.
204+
*
205+
* \param ec The error code returned by the `async_read_some` or
206+
* `async_write_some` operation on the underlying \__StreamType\__ stream.
207+
*/
208+
void at_transport_error(error_code ec) {
209+
if (_level < log_level::info)
210+
return;
211+
212+
output_prefix();
213+
std::clog
214+
<< "transport layer error: " << ec.message() << "." << std::endl;
215+
}
216+
201217
private:
202218
void output_prefix() {
203219
std::clog << prefix << " ";
@@ -251,6 +267,7 @@ static_assert(has_at_tls_handshake<logger>);
251267
static_assert(has_at_ws_handshake<logger>);
252268
static_assert(has_at_connack<logger>);
253269
static_assert(has_at_disconnect<logger>);
270+
static_assert(has_at_transport_error<logger>);
254271

255272
} // end namespace boost::mqtt5
256273

include/boost/mqtt5/logger_traits.hpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,15 @@ using at_disconnect_sig = decltype(
9797
template <typename T>
9898
constexpr bool has_at_disconnect = boost::is_detected<at_disconnect_sig, T>::value;
9999

100+
// at_transport_error
101+
102+
template <typename T>
103+
using at_transport_error = decltype(
104+
std::declval<T&>().at_transport_error(std::declval<error_code>())
105+
);
106+
template <typename T>
107+
constexpr bool has_at_transport_error = boost::is_detected<at_transport_error, T>::value;
108+
100109
} // end namespace boost::mqtt5
101110

102111

test/unit/logger.cpp

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ void logger_test() {
4040
BOOST_STATIC_ASSERT(has_at_ws_handshake<logger>);
4141
BOOST_STATIC_ASSERT(has_at_connack<logger>);
4242
BOOST_STATIC_ASSERT(has_at_disconnect<logger>);
43+
BOOST_STATIC_ASSERT(has_at_transport_error<logger>);
4344
}
4445

4546
BOOST_AUTO_TEST_SUITE(logger_tests)
@@ -417,6 +418,19 @@ BOOST_FIXTURE_TEST_CASE(at_disconnect_debug, disconnect_test_data) {
417418
test_logger_output(std::move(test_fun), expected_output);
418419
}
419420

421+
// at_transport_error
422+
423+
BOOST_AUTO_TEST_CASE(at_transport_error_info) {
424+
const auto expected_output = "[Boost.MQTT5] transport layer error: End of file.\n";
425+
426+
auto test_fun = [] {
427+
logger l(log_level::info);
428+
l.at_transport_error(asio::error::eof);
429+
};
430+
431+
test_logger_output(std::move(test_fun), expected_output);
432+
}
433+
420434
// Test that the mqtt_client calls logger functions as expected.
421435

422436
BOOST_AUTO_TEST_CASE(client_disconnect) {
@@ -477,6 +491,65 @@ BOOST_AUTO_TEST_CASE(client_disconnect) {
477491
BOOST_TEST(log == expected_msg);
478492
}
479493

494+
BOOST_AUTO_TEST_CASE(client_transport_error) {
495+
using test::after;
496+
using namespace std::chrono_literals;
497+
498+
const auto& success = success_msg();
499+
const auto expected_msg =
500+
"[Boost.MQTT5] resolve: 127.0.0.1:1883 - " + success + ".\n"
501+
"[Boost.MQTT5] TCP connect: 127.0.0.1:1883 - " + success + ".\n"
502+
"[Boost.MQTT5] transport layer error: End of file.\n"
503+
"[Boost.MQTT5] resolve: 127.0.0.1:1883 - " + success + ".\n"
504+
"[Boost.MQTT5] TCP connect: 127.0.0.1:1883 - " + success + ".\n"
505+
"[Boost.MQTT5] connack: The operation completed successfully.\n"
506+
;
507+
508+
boost::test_tools::output_test_stream output;
509+
{
510+
clog_redirect guard(output.rdbuf());
511+
512+
// packets
513+
auto connect = encoders::encode_connect(
514+
"", std::nullopt, std::nullopt, 60, false, test::dflt_cprops, std::nullopt
515+
);
516+
auto connack = encoders::encode_connack(false, uint8_t(0x00), {});
517+
518+
const std::string topic = "topic", payload = "payload0";
519+
const std::string publish = encoders::encode_publish(
520+
0, topic, payload, qos_e::at_most_once, retain_e::no, dup_e::no, {}
521+
);
522+
523+
test::msg_exchange broker_side;
524+
broker_side
525+
.expect(connect)
526+
.complete_with(asio::error::eof, after(0ms))
527+
.expect(connect)
528+
.complete_with(error_code{}, after(0ms))
529+
.reply_with(connack, after(10ms));
530+
531+
asio::io_context ioc;
532+
auto executor = ioc.get_executor();
533+
auto& broker = asio::make_service<test::test_broker>(
534+
ioc, executor, std::move(broker_side)
535+
);
536+
537+
mqtt_client<test::test_stream, std::monostate, logger> c(executor);
538+
c.brokers("127.0.0.1,127.0.0.1") // to avoid reconnect backoff
539+
.async_run(asio::detached);
540+
541+
asio::steady_timer timer(c.get_executor());
542+
timer.expires_after(100ms);
543+
timer.async_wait([&c](error_code) { c.cancel(); });
544+
545+
ioc.run();
546+
BOOST_TEST(broker.received_all_expected());
547+
}
548+
549+
std::string log = output.rdbuf()->str();
550+
BOOST_TEST(log == expected_msg);
551+
}
552+
480553
#ifdef BOOST_MQTT5_EXTRA_DEPS
481554
using stream_type = boost::beast::websocket::stream<
482555
asio::ssl::stream<asio::ip::tcp::socket>

0 commit comments

Comments
 (0)