Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ set(${PROJECT_NAME}_SRCS
src/rclcpp/expand_topic_or_service_name.cpp
src/rclcpp/executors/events_executor.cpp
src/rclcpp/executors/events_executor_entities_collector.cpp
src/rclcpp/executors/events_queue.cpp
src/rclcpp/executors/multi_threaded_executor.cpp
src/rclcpp/executors/single_threaded_executor.cpp
src/rclcpp/executors/static_executor_entities_collector.cpp
Expand Down
21 changes: 20 additions & 1 deletion rclcpp/include/rclcpp/executor_options.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,37 @@
namespace rclcpp
{

enum class QueueStrategy
{
CPU_PERFORMANCE,
LIMITED_EVENTS_WITH_TIME_ORDERING,
BOUNDED
};

struct QueueOptions
{
QueueOptions()
: queue_strategy(QueueStrategy::CPU_PERFORMANCE),
max_events(1000)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following my comments above, max_events should be an internal detail of a bounded queue class

{}

QueueStrategy queue_strategy;
size_t max_events;
};

/// Options to be passed to the executor constructor.
struct ExecutorOptions
{
ExecutorOptions()
: memory_strategy(rclcpp::memory_strategies::create_default_strategy()),
context(rclcpp::contexts::get_global_default_context()),
max_conditions(0)
max_conditions(0), queue_options(QueueOptions())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ExecutorOption structure should only contain stuff that is used by all executors

{}

rclcpp::memory_strategy::MemoryStrategy::SharedPtr memory_strategy;
rclcpp::Context::SharedPtr context;
size_t max_conditions;
QueueOptions queue_options;
};

namespace executor
Expand Down
25 changes: 7 additions & 18 deletions rclcpp/include/rclcpp/executors/events_executor.hpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2020 Open Source Robotics Foundation, Inc.
// Copyright 2021 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -16,18 +16,17 @@
#define RCLCPP__EXECUTORS__EVENTS_EXECUTOR_HPP_

#include <chrono>
#include <deque>
#include <memory>
#include <queue>
#include <vector>

#include "rclcpp/executor.hpp"
#include "rclcpp/executors/events_executor_entities_collector.hpp"
#include "rclcpp/executors/events_executor_notify_waitable.hpp"
#include "rclcpp/executors/events_queue.hpp"
#include "rclcpp/executors/timers_manager.hpp"
#include "rclcpp/node.hpp"

#include "rmw/listener_event_types.h"

namespace rclcpp
{
namespace executors
Expand Down Expand Up @@ -174,7 +173,7 @@ class EventsExecutor : public rclcpp::Executor
private:
RCLCPP_DISABLE_COPY(EventsExecutor)

using EventQueue = std::queue<rmw_listener_event_t>;
using EventQueue = std::deque<rmw_listener_event_t>;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks not needed anymore now (and also you can remove the queue/deque include)


// Executor callback: Push new events into the queue and trigger cv.
// This function is called by the DDS entities when an event happened,
Expand All @@ -186,14 +185,8 @@ class EventsExecutor : public rclcpp::Executor
auto this_executor = const_cast<executors::EventsExecutor *>(
static_cast<const executors::EventsExecutor *>(executor_ptr));

// Event queue mutex scope
{
std::unique_lock<std::mutex> lock(this_executor->push_mutex_);

this_executor->event_queue_.push(event);
}
// Notify that the event queue has some events in it.
this_executor->event_queue_cv_.notify_one();
// Push event and notify the queue it has some events
this_executor->events_queue_->push_and_notify(event);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be just a simple push, then the notify should still be done here

}

/// Extract and execute events from the queue until it is empty
Expand All @@ -207,15 +200,11 @@ class EventsExecutor : public rclcpp::Executor
execute_event(const rmw_listener_event_t & event);

// Queue where entities can push events
EventQueue event_queue_;
EventsQueue::SharedPtr events_queue_;

EventsExecutorEntitiesCollector::SharedPtr entities_collector_;
EventsExecutorNotifyWaitable::SharedPtr executor_notifier_;

// Mutex to protect the insertion of events in the queue
std::mutex push_mutex_;
// Variable used to notify when an event is added to the queue
std::condition_variable event_queue_cv_;
// Timers manager
std::shared_ptr<TimersManager> timers_manager_;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,39 @@ class EventsExecutorEntitiesCollector final
rclcpp::Waitable::SharedPtr
get_waitable(const void * waitable_id);

///
/**
* Get the subscription qos depth corresponding
* to a subscription identifier
*/
RCLCPP_PUBLIC
size_t
get_subscription_qos_depth(const void * subscription_id);

///
/**
* Get the waitable qos depth corresponding
* to a waitable identifier
*/
RCLCPP_PUBLIC
size_t
get_waitable_qos_depth(const void * waitable_id);

///
/**
* Gets the QoS of the entities collector.
* This is useful for the events executor, when it has to
* decide if keep pushing events from this waitable into the qeueue
*/
RCLCPP_PUBLIC
rmw_qos_profile_t
get_actual_qos() const
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand what this is.
Why a waitable has a QOS?

{
rmw_qos_profile_t qos;
qos.depth = 0;
return qos;
}

///
/**
* Add a weak pointer to a waitable
Expand Down Expand Up @@ -258,6 +291,11 @@ class EventsExecutorEntitiesCollector final
std::unordered_map<const void *, rclcpp::ClientBase::WeakPtr> weak_clients_map_;
std::unordered_map<const void *, rclcpp::Waitable::WeakPtr> weak_waitables_map_;

// Maps: entity identifiers to qos->depth from the entities registered in the executor
using QosDepthMap = std::unordered_map<const void *, size_t>;
QosDepthMap qos_depth_subscriptions_map_;
QosDepthMap qos_depth_waitables_map_;

/// Executor using this entities collector object
EventsExecutor * associated_executor_ = nullptr;
/// Instance of the timers manager used by the associated executor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,21 @@ class EventsExecutorNotifyWaitable final : public EventWaitable
return nullptr;
}

///
/**
* Gets the QoS of this notify waitable.
* This is useful for the events executor, when it has to
* decide if keep pushing events from this waitable into the qeueue
*/
RCLCPP_PUBLIC
rmw_qos_profile_t
get_actual_qos() const
{
rmw_qos_profile_t qos;
qos.depth = 0;
return qos;
}

private:
std::list<const rcl_guard_condition_t *> notify_guard_conditions_;
};
Expand Down
202 changes: 202 additions & 0 deletions rclcpp/include/rclcpp/executors/events_queue.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
// Copyright 2021 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef RCLCPP__EXECUTORS__EVENTS_QUEUE_HPP_
#define RCLCPP__EXECUTORS__EVENTS_QUEUE_HPP_

#include <chrono>
#include <condition_variable>
#include <deque>
#include <mutex>
#include <unordered_map>
#include <utility>

#include "rclcpp/executor_options.hpp"
#include "rclcpp/executors/events_executor_entities_collector.hpp"
#include "rclcpp/macros.hpp"

#include "rmw/listener_event_types.h"

namespace rclcpp
{
namespace executors
{

/**
* @brief This class provides a queue implementation which supports
* different strategies for queueing and pruning events in case of
* queue overruns.
*
*/
class EventsQueue
{
public:
RCLCPP_SMART_PTR_DEFINITIONS(EventsQueue)

using EventQueue = std::deque<rmw_listener_event_t>;

/**
* @brief Construct a new EventsQueue object
* @param collector The entities collector associated to this queue
* @param options The event queue options
*/
EventsQueue(
EventsExecutorEntitiesCollector::SharedPtr collector,
QueueOptions options);

/**
* @brief Destruct the object.
*/
~EventsQueue();

/**
* @brief swap EventQueue with another
* @param event_queue The queue to swap with member queue
*/
void swap(EventQueue & event_queue);

/**
* @brief Waits for an event to happen or timeout
* @param timeout Time to wait and exit if no events received
*/
void wait_for_event(std::chrono::nanoseconds timeout);

/**
* @brief Waits for an event and swap queues
* @param event_queue The queue where the events will be swapped to
*/
void wait_for_event_and_swap(EventQueue & event_queue);

/**
* @brief Waits for an event or timeout and swap queues
* @param event_queue The queue where the events will be swapped to
* @param timeout Time to wait and swap if no events received
*/
void wait_for_event_and_swap(
EventQueue & event_queue,
std::chrono::nanoseconds timeout);

/**
* @brief Waits for an event or timeout and get first (oldest) event
* @param event Where the event will be stored
* @param timeout Time to wait and exit if no events received
* @return true if there was an event, false if timeout time elapsed
*/
bool wait_and_get_first_event(
rmw_listener_event_t & event,
std::chrono::nanoseconds timeout);

/**
* @brief push event into the queue and trigger cv
* @param event The event to push into the queue
*/
void push_and_notify(rmw_listener_event_t event);

private:
RCLCPP_DISABLE_COPY(EventsQueue)

// Function pointers to implement different queue strategies
using PushFunctionPtr = std::function<void (rmw_listener_event_t)>;
using ClearStatsFunctionPtr = std::function<void (void)>;

PushFunctionPtr push_and_notify_implem_;
ClearStatsFunctionPtr clear_stats_implem_;

//
// QueueStrategy::CPU_PERFORMANCE
//

/**
* @brief push event into the queue and trigger cv
* @param event The event to push into the queue
*/
void simple_push(rmw_listener_event_t event);

//
// QueueStrategy::LIMITED_EVENTS_WITH_TIME_ORDERING
//

/**
* @brief Follows policy in how to push to the queue
* Before pushing, counts how many events came from the entity
* and compares it with its QoS depth. It removes the old and
* adds a new event if one has expired, se we keep time ordering
* @param event The event to push into the queue
*/
void limited_events_push(rmw_listener_event_t event);

/**
* @brief Remove oldest event and pushes new one in the back
* @param event The event to remove and push back into the queue
*/
void remove_oldest_event_and_push_back_new(rmw_listener_event_t event);

/**
* @brief Informs if the amount of subscriptions events reached the limit
* @param subscription_id The subscription_id to obtain information
* @return true if reached limit
*/
bool subscription_events_reached_limit(const void * subscription_id);

/**
* @brief Informs if the amount of waitable events reached the limit
* @param waitable_id The waitable_id to obtain information
* @return true if reached limit
*/
bool waitable_events_reached_limit(const void * waitable_id);

/**
* @brief Clears events queue stats
*/
void clear_stats();

//
// QueueStrategy::BOUNDED
//

/**
* @brief push event into the queue and trigger cv
* @param event The event to push into the queue
*/
void bounded_push(rmw_listener_event_t event);

/**
* @brief prune mechanism for qos bounded queue
*/
void bounded_prune();

size_t queue_limit_;

// The underlying queue
EventQueue event_queue_;

// Mutex to protect the insertion of events in the queue
std::mutex push_mutex_;

// Variable used to notify when an event is added to the queue
std::condition_variable event_queue_cv_;

// Entities collector associated with executor and events queue
EventsExecutorEntitiesCollector::SharedPtr entities_collector_;

// Maps: entity identifiers to number of events in the queue
using EventsInQueueMap = std::unordered_map<const void *, size_t>;
EventsInQueueMap subscription_events_in_queue_map_;
EventsInQueueMap waitable_events_in_queue_map_;
};

} // namespace executors
} // namespace rclcpp

#endif // RCLCPP__EXECUTORS__EVENTS_QUEUE_HPP_
Loading