Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/ipm/Receiver.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ class Receiver : public opmonlib::MonitorableObject
};

Response receive(const duration_t& timeout, message_size_t num_bytes = s_any_size, bool no_tmoexcept_mode = false);
virtual bool data_pending() = 0;

virtual void register_callback(std::function<void(Response&)>) = 0;
virtual void unregister_callback() = 0;
Expand Down
11 changes: 11 additions & 0 deletions plugins/ZmqReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,17 @@ class ZmqReceiver : public Receiver
}
void unregister_callback() override { m_callback_adapter.clear_callback(); }

bool data_pending() override
{
try {
auto events = m_socket.get(zmq::sockopt::events);
return (events & ZMQ_POLLIN) != 0;
} catch (zmq::error_t const& err) {
ers::error(ZmqOperationError(ERS_HERE, "get events sockopt", "data_pending", err.what(), m_connection_string));
}
return false;
}

protected:
Receiver::Response receive_(const duration_t& timeout, bool no_tmoexcept_mode) override
{
Expand Down
12 changes: 12 additions & 0 deletions plugins/ZmqSubscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,18 @@ class ZmqSubscriber : public Subscriber
}
}

bool data_pending() override
{
try {
auto events = m_socket.get(zmq::sockopt::events);
return (events & ZMQ_POLLIN) != 0;
} catch (zmq::error_t const& err) {
ers::error(
ZmqOperationError(ERS_HERE, "get events sockopt", "data_pending", err.what(), *m_connection_strings.begin()));
}
return false;
}

void register_callback(std::function<void(Response&)> callback) override
{
m_callback_adapter.set_callback(callback);
Expand Down
2 changes: 2 additions & 0 deletions unittest/Receiver_test.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ class ReceiverImpl : public Receiver
m_can_receive = false;
}

bool data_pending() override { return true; }

protected:
Receiver::Response receive_(const duration_t& /* timeout */, bool /*no_tmoexcept_mode*/) override
{
Expand Down
2 changes: 2 additions & 0 deletions unittest/Subscriber_test.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ class SubscriberImpl : public Subscriber

std::set<std::string> get_subscriptions() const { return m_subscriptions; }

bool data_pending() override { return true; }

protected:
Receiver::Response receive_(const duration_t& /* timeout */, bool /*no_tmoexcept_mode*/) override
{
Expand Down
7 changes: 7 additions & 0 deletions unittest/ZmqPubSub_test.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ BOOST_AUTO_TEST_CASE(SendReceiveTest)
BOOST_REQUIRE(the_receiver->can_receive());
BOOST_REQUIRE(the_sender->can_send());

BOOST_REQUIRE(!the_receiver->data_pending());

the_receiver->subscribe("testTopic");

std::vector<char> test_data{ 'T', 'E', 'S', 'T' };
Expand All @@ -60,7 +62,9 @@ BOOST_AUTO_TEST_CASE(SendReceiveTest)
[&](dunedaq::ipm::ReceiveTimeoutExpired) { return elapsed_time_milliseconds(before_recv) >= 100; });

the_sender->send(test_data.data(), test_data.size(), Sender::s_no_block, "testTopic");
BOOST_REQUIRE(the_receiver->data_pending());
auto response = the_receiver->receive(Receiver::s_block);
BOOST_REQUIRE(!the_receiver->data_pending());
BOOST_REQUIRE_EQUAL(response.data.size(), 4);
BOOST_REQUIRE_EQUAL(response.data[0], 'T');
BOOST_REQUIRE_EQUAL(response.data[1], 'E');
Expand All @@ -69,6 +73,7 @@ BOOST_AUTO_TEST_CASE(SendReceiveTest)

the_receiver->unsubscribe("testTopic");
the_sender->send(test_data.data(), test_data.size(), Sender::s_no_block, "testTopic");
BOOST_REQUIRE(!the_receiver->data_pending());
BOOST_REQUIRE_EXCEPTION(
the_receiver->receive(std::chrono::milliseconds(2000)),
dunedaq::ipm::ReceiveTimeoutExpired,
Expand Down Expand Up @@ -135,6 +140,7 @@ BOOST_AUTO_TEST_CASE(CallbackTest)
BOOST_REQUIRE_EQUAL(message_received, false);
auto response = the_receiver->receive(Receiver::s_block);
BOOST_REQUIRE_EQUAL(response.data.size(), test_data.size());
BOOST_REQUIRE(!the_receiver->data_pending());
}

BOOST_AUTO_TEST_CASE(MultiplePublishers)
Expand Down Expand Up @@ -168,6 +174,7 @@ BOOST_AUTO_TEST_CASE(MultiplePublishers)
BOOST_REQUIRE_EQUAL(response2.data[1], 'E');
BOOST_REQUIRE_EQUAL(response2.data[2], 'S');
BOOST_REQUIRE_EQUAL(response2.data[3], 'T');
BOOST_REQUIRE(!the_subscriber->data_pending());
}

BOOST_AUTO_TEST_SUITE_END()
5 changes: 5 additions & 0 deletions unittest/ZmqSendReceive_test.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ BOOST_AUTO_TEST_CASE(SendReceiveTest)
auto the_receiver = make_ipm_receiver("ZmqReceiver");
BOOST_REQUIRE(the_receiver != nullptr);
BOOST_REQUIRE(!the_receiver->can_receive());
BOOST_REQUIRE(!the_receiver->data_pending());

auto the_sender = make_ipm_sender("ZmqSender");
BOOST_REQUIRE(the_sender != nullptr);
Expand All @@ -36,12 +37,16 @@ BOOST_AUTO_TEST_CASE(SendReceiveTest)
the_sender->connect_for_sends(send_info);

BOOST_REQUIRE(the_receiver->can_receive());
BOOST_REQUIRE(!the_receiver->data_pending());
BOOST_REQUIRE(the_sender->can_send());

std::vector<char> test_data{ 'T', 'E', 'S', 'T' };

the_sender->send(test_data.data(), test_data.size(), Sender::s_no_block);
BOOST_REQUIRE(the_receiver->data_pending());
auto response = the_receiver->receive(Receiver::s_block);
BOOST_REQUIRE(!the_receiver->data_pending());

BOOST_REQUIRE_EQUAL(response.data.size(), 4);
BOOST_REQUIRE_EQUAL(response.data[0], 'T');
BOOST_REQUIRE_EQUAL(response.data[1], 'E');
Expand Down
Loading