From 6f4fef243cc4a4694ce9d05320387a6b70e95c65 Mon Sep 17 00:00:00 2001 From: Paul Ribault Date: Sun, 4 Jan 2026 15:28:47 +0100 Subject: [PATCH] Take processor --- include/recpp/processors/Take.h | 58 +++++++++++++++++++ include/recpp/processors/inl/Take.inl | 60 ++++++++++++++++++++ include/recpp/rx/Observable.h | 8 +++ include/recpp/rx/inl/Observable.inl | 7 +++ tests/rx/Observable.cpp | 80 +++++++++++++++++++++++++++ 5 files changed, 213 insertions(+) create mode 100644 include/recpp/processors/Take.h create mode 100644 include/recpp/processors/inl/Take.inl diff --git a/include/recpp/processors/Take.h b/include/recpp/processors/Take.h new file mode 100644 index 0000000..fc64b55 --- /dev/null +++ b/include/recpp/processors/Take.h @@ -0,0 +1,58 @@ +#pragma once + +#include + +#include + +#include +#include +#include +#include + +namespace recpp::processors +{ + /** + * @class Take Take.h + * @brief {@link rscpp::Processor} that will subscribe to a given {@link rscpp::Publisher} and emits only the first count items emitted. + * + * @tparam T The type of element signaled to the {@link rscpp::Subscriber} and signaled from the {@link rscpp::Publisher}. + */ + template + class Take : public rscpp::Processor + { + class Impl : public rscpp::Processor + { + public: + explicit Impl(rscpp::Processor &parent, const rscpp::Publisher &publisher, std::size_t count); + + void onSubscribe(rscpp::Subscription &subscription) override; + + void onNext(const T &value) override; + + void onError(const std::exception_ptr &error) override; + + void onComplete() override; + + void subscribe(rscpp::Subscriber &subscriber) override; + + private: + rscpp::Processor &m_parent; + rscpp::Publisher m_publisher; + std::vector m_subscriptions; + rscpp::Subscriber m_subscriber; + std::mutex m_mutex; + std::size_t m_count = 0; + }; + + public: + /** + * @brief Construct a new {@link Take} instance. + * + * @param publisher The source {@link rscpp::Publisher} the {@link Take} {@link rscpp::Processor} subscribes to and filter the items. + * @param count The maximum number of items to emit. + */ + explicit Take(const rscpp::Publisher &publisher, std::size_t count); + }; +} // namespace recpp::processors + +#include diff --git a/include/recpp/processors/inl/Take.inl b/include/recpp/processors/inl/Take.inl new file mode 100644 index 0000000..3ab4152 --- /dev/null +++ b/include/recpp/processors/inl/Take.inl @@ -0,0 +1,60 @@ +#pragma once + +template +recpp::processors::Take::Take(const rscpp::Publisher &publisher, std::size_t count) + : rscpp::Processor(std::make_shared(*this, publisher, count)) +{ +} + +template +recpp::processors::Take::Impl::Impl(rscpp::Processor &parent, const rscpp::Publisher &publisher, std::size_t count) + : m_parent(parent) + , m_publisher(publisher) + , m_count(count) +{ +} + +template +void recpp::processors::Take::Impl::onSubscribe(rscpp::Subscription &subscription) +{ + auto filterSubscription = recpp::subscriptions::FilterSubscription(subscription); + m_subscriptions.push_back(filterSubscription); + m_subscriber.onSubscribe(filterSubscription); +} + +template +void recpp::processors::Take::Impl::onNext(const T &value) +{ + if (m_count == 0) + return; + + m_count--; + m_subscriber.onNext(value); + for (auto &subscription : m_subscriptions) + subscription.onNext(false); + if (m_count == 0) + { + for (auto &subscription : m_subscriptions) + subscription.cancel(); + m_subscriber.onComplete(); + } +} + +template +void recpp::processors::Take::Impl::onError(const std::exception_ptr &error) +{ + m_subscriber.onError(error); +} + +template +void recpp::processors::Take::Impl::onComplete() +{ + m_subscriber.onComplete(); +} + +template +void recpp::processors::Take::Impl::subscribe(rscpp::Subscriber &subscriber) +{ + m_subscriber = subscriber; + m_publisher.subscribe(m_parent); +} diff --git a/include/recpp/rx/Observable.h b/include/recpp/rx/Observable.h index 508ed01..09867fb 100644 --- a/include/recpp/rx/Observable.h +++ b/include/recpp/rx/Observable.h @@ -188,6 +188,14 @@ namespace recpp::rx */ Observable filter(const std::function &method); + /** + * @brief Emits only the first count items emitted. + * + * @param count The maximum number of items to emit. + * @return The new {@link Observable} instance. + */ + Observable take(std::size_t count); + /** * Convert this {@link Observable} into a {@link Completable} by discarding all contained value. * diff --git a/include/recpp/rx/inl/Observable.inl b/include/recpp/rx/inl/Observable.inl index 8acf365..ddc7ab1 100644 --- a/include/recpp/rx/inl/Observable.inl +++ b/include/recpp/rx/inl/Observable.inl @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -126,6 +127,12 @@ recpp::rx::Observable recpp::rx::Observable::filter(const std::function(std::make_shared>(*this, method)); } +template +recpp::rx::Observable recpp::rx::Observable::take(std::size_t count) +{ + return Observable(std::make_shared>(*this, count)); +} + template recpp::rx::Completable recpp::rx::Observable::ignoreElements() { diff --git a/tests/rx/Observable.cpp b/tests/rx/Observable.cpp index 146f343..8bab5aa 100644 --- a/tests/rx/Observable.cpp +++ b/tests/rx/Observable.cpp @@ -729,6 +729,86 @@ TEST_F(ObservableFilter, checkDoNotCompleteOnError) }); } +class ObservableTake : public EventLoopBasedTest +{ +protected: + Observable infiniteObservable() + { + return Observable::interval(chrono::milliseconds(1), *m_eventLoop); + } +}; + +TEST_F(ObservableTake, checkEmitedValues) +{ + bool completed = false; + const vector expected = {0, 1, 2}; + vector values; + infiniteObservable() // + .take(3) + .subscribe( + [&values](const auto value) + { + values.push_back(value); + }, + [](const auto &) {}, + [this, &values, &expected, &completed]() + { + EXPECT_EQ(values, expected); + completed = true; + m_eventLoop->stop(); + }); + m_eventLoop->run(); + EXPECT_TRUE(completed); +} + +TEST_F(ObservableTake, checkNoErrorIsEmited) +{ + infiniteObservable() // + .take(3) + .subscribe([](const auto) {}, + [](const auto &) + { + ADD_FAILURE(); + }, + [this]() + { + m_eventLoop->stop(); + }); + m_eventLoop->run(); +} + +TEST_F(ObservableTake, checkErrorsAreForwarded) +{ + bool gotError = false; + Observable::error(runtime_error(runtimeErrorMessage.data())) // + .take(3) + .subscribe([](const auto) {}, + [&gotError](const auto &exception) + { + gotError = true; + try + { + rethrow_exception(exception); + } + catch (runtime_error &runtimeError) + { + EXPECT_THAT(runtimeError.what(), testing::StrEq(runtimeErrorMessage)); + } + }); + EXPECT_TRUE(gotError); +} + +TEST_F(ObservableTake, checkDoNotCompleteOnError) +{ + Observable::error(runtime_error(runtimeErrorMessage.data())) // + .take(3) + .subscribe([](const auto) {}, [](const auto &) {}, + []() + { + ADD_FAILURE(); + }); +} + class ObservableIgnoreElements : public testing::Test { };