diff --git a/CMakeLists.txt b/CMakeLists.txt index 7c4911e..aa28010 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -41,6 +41,8 @@ set(SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/include/recpp/processors/ObserveOn.h ${CMAKE_CURRENT_SOURCE_DIR}/include/recpp/processors/Reduce.h ${CMAKE_CURRENT_SOURCE_DIR}/include/recpp/processors/SubscribeOn.h + ${CMAKE_CURRENT_SOURCE_DIR}/include/recpp/processors/Take.h + ${CMAKE_CURRENT_SOURCE_DIR}/include/recpp/processors/TakeWhile.h ${CMAKE_CURRENT_SOURCE_DIR}/include/recpp/processors/Tap.h ${CMAKE_CURRENT_SOURCE_DIR}/include/recpp/processors/inl/AllOf.inl ${CMAKE_CURRENT_SOURCE_DIR}/include/recpp/processors/inl/AndThen.inl @@ -61,6 +63,8 @@ set(SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/include/recpp/processors/inl/ObserveOn.inl ${CMAKE_CURRENT_SOURCE_DIR}/include/recpp/processors/inl/Reduce.inl ${CMAKE_CURRENT_SOURCE_DIR}/include/recpp/processors/inl/SubscribeOn.inl + ${CMAKE_CURRENT_SOURCE_DIR}/include/recpp/processors/inl/Take.inl + ${CMAKE_CURRENT_SOURCE_DIR}/include/recpp/processors/inl/TakeWhile.inl ${CMAKE_CURRENT_SOURCE_DIR}/include/recpp/processors/inl/Tap.inl ${CMAKE_CURRENT_SOURCE_DIR}/include/recpp/publishers/CreatePublisher.h ${CMAKE_CURRENT_SOURCE_DIR}/include/recpp/publishers/DeferPublisher.h diff --git a/include/recpp/processors/Take.h b/include/recpp/processors/Take.h index fc64b55..58259f7 100644 --- a/include/recpp/processors/Take.h +++ b/include/recpp/processors/Take.h @@ -4,9 +4,7 @@ #include -#include #include -#include #include namespace recpp::processors @@ -40,7 +38,6 @@ namespace recpp::processors rscpp::Publisher m_publisher; std::vector m_subscriptions; rscpp::Subscriber m_subscriber; - std::mutex m_mutex; std::size_t m_count = 0; }; diff --git a/include/recpp/processors/TakeWhile.h b/include/recpp/processors/TakeWhile.h new file mode 100644 index 0000000..4a8bf3b --- /dev/null +++ b/include/recpp/processors/TakeWhile.h @@ -0,0 +1,55 @@ +#pragma once + +#include + +#include + +#include +#include + +namespace recpp::processors +{ + /** + * @class TakeWhile TakeWhile.h + * @brief {@link rscpp::Processor} that will subscribe to a given {@link rscpp::Publisher} and emits items as long as they satisfy the given predicate. + * + * @tparam T The type of element signaled to the {@link rscpp::Subscriber} and signaled from the {@link rscpp::Publisher}. + */ + template + class TakeWhile : public rscpp::Processor + { + class Impl : public rscpp::Processor + { + public: + explicit Impl(rscpp::Processor &parent, const rscpp::Publisher &publisher, const std::function &predicate); + + void onSubscribe(rscpp::Subscription &subscription) override; + + void onNext(const T &value) override; + + void onError(const std::exception_ptr &error) override; + + void onComplete() override; + + void subscribe(rscpp::Subscriber &subscriber) override; + + private: + rscpp::Processor &m_parent; + rscpp::Publisher m_publisher; + std::vector m_subscriptions; + rscpp::Subscriber m_subscriber; + std::function m_predicate = 0; + }; + + public: + /** + * @brief Construct a new {@link TakeWhile} instance. + * + * @param publisher The source {@link rscpp::Publisher} the {@link TakeWhile} {@link rscpp::Processor} subscribes to and filter the items. + * @param predicate The maximum number of items to emit. + */ + explicit TakeWhile(const rscpp::Publisher &publisher, const std::function &predicate); + }; +} // namespace recpp::processors + +#include diff --git a/include/recpp/processors/inl/TakeWhile.inl b/include/recpp/processors/inl/TakeWhile.inl new file mode 100644 index 0000000..2ceebef --- /dev/null +++ b/include/recpp/processors/inl/TakeWhile.inl @@ -0,0 +1,59 @@ +#pragma once + +template +recpp::processors::TakeWhile::TakeWhile(const rscpp::Publisher &publisher, const std::function &predicate) + : rscpp::Processor(std::make_shared(*this, publisher, predicate)) +{ +} + +template +recpp::processors::TakeWhile::Impl::Impl(rscpp::Processor &parent, const rscpp::Publisher &publisher, + const std::function &predicate) + : m_parent(parent) + , m_publisher(publisher) + , m_predicate(predicate) +{ +} + +template +void recpp::processors::TakeWhile::Impl::onSubscribe(rscpp::Subscription &subscription) +{ + auto filterSubscription = recpp::subscriptions::FilterSubscription(subscription); + m_subscriptions.push_back(filterSubscription); + m_subscriber.onSubscribe(filterSubscription); +} + +template +void recpp::processors::TakeWhile::Impl::onNext(const T &value) +{ + bool stop = !m_predicate(value); + if (!stop) + m_subscriber.onNext(value); + for (auto &subscription : m_subscriptions) + subscription.onNext(stop); + if (stop) + { + for (auto &subscription : m_subscriptions) + subscription.cancel(); + m_subscriber.onComplete(); + } +} + +template +void recpp::processors::TakeWhile::Impl::onError(const std::exception_ptr &error) +{ + m_subscriber.onError(error); +} + +template +void recpp::processors::TakeWhile::Impl::onComplete() +{ + m_subscriber.onComplete(); +} + +template +void recpp::processors::TakeWhile::Impl::subscribe(rscpp::Subscriber &subscriber) +{ + m_subscriber = subscriber; + m_publisher.subscribe(m_parent); +} diff --git a/include/recpp/rx/Observable.h b/include/recpp/rx/Observable.h index 09867fb..e6e4735 100644 --- a/include/recpp/rx/Observable.h +++ b/include/recpp/rx/Observable.h @@ -196,6 +196,22 @@ namespace recpp::rx */ Observable take(std::size_t count); + /** + * @brief Emits items as long as each item satisfies the given predicate. + * + * @param predicate The predicate the items must satisfy. + * @return The new {@link Observable} instance. + */ + Observable takeWhile(const std::function &predicate); + + /** + * @brief Emits items until one item satisfies the given predicate. + * + * @param stopPredicate The stop predicate. + * @return The new {@link Observable} instance. + */ + Observable takeUntil(const std::function &stopPredicate); + /** * Convert this {@link Observable} into a {@link Completable} by discarding all contained value. * diff --git a/include/recpp/rx/inl/Observable.inl b/include/recpp/rx/inl/Observable.inl index ddc7ab1..971187b 100644 --- a/include/recpp/rx/inl/Observable.inl +++ b/include/recpp/rx/inl/Observable.inl @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -133,6 +134,22 @@ recpp::rx::Observable recpp::rx::Observable::take(std::size_t count) return Observable(std::make_shared>(*this, count)); } +template +recpp::rx::Observable recpp::rx::Observable::takeWhile(const std::function &predicate) +{ + return Observable(std::make_shared>(*this, predicate)); +} + +template +recpp::rx::Observable recpp::rx::Observable::takeUntil(const std::function &stopPredicate) +{ + return Observable(std::make_shared>(*this, + [stopPredicate](const auto &value) + { + return !stopPredicate(value); + })); +} + template recpp::rx::Completable recpp::rx::Observable::ignoreElements() { diff --git a/tests/rx/Observable.cpp b/tests/rx/Observable.cpp index 8bab5aa..842ab95 100644 --- a/tests/rx/Observable.cpp +++ b/tests/rx/Observable.cpp @@ -732,9 +732,9 @@ TEST_F(ObservableFilter, checkDoNotCompleteOnError) class ObservableTake : public EventLoopBasedTest { protected: - Observable infiniteObservable() + Observable takeObservable() { - return Observable::interval(chrono::milliseconds(1), *m_eventLoop); + return Observable::interval(chrono::milliseconds(1), *m_eventLoop).take(3); } }; @@ -743,8 +743,7 @@ TEST_F(ObservableTake, checkEmitedValues) bool completed = false; const vector expected = {0, 1, 2}; vector values; - infiniteObservable() // - .take(3) + takeObservable() // .subscribe( [&values](const auto value) { @@ -763,8 +762,7 @@ TEST_F(ObservableTake, checkEmitedValues) TEST_F(ObservableTake, checkNoErrorIsEmited) { - infiniteObservable() // - .take(3) + takeObservable() // .subscribe([](const auto) {}, [](const auto &) { @@ -809,6 +807,188 @@ TEST_F(ObservableTake, checkDoNotCompleteOnError) }); } +class ObservableTakeWhile : public EventLoopBasedTest +{ +protected: + Observable takeWhileObservable() + { + return Observable::interval(chrono::milliseconds(1), *m_eventLoop) + .takeWhile( + [](const auto value) + { + return value < 3; + }); + } +}; + +TEST_F(ObservableTakeWhile, checkEmitedValues) +{ + bool completed = false; + const vector expected = {0, 1, 2}; + vector values; + takeWhileObservable() // + .subscribe( + [&values](const auto value) + { + values.push_back(value); + }, + [](const auto &) {}, + [this, &values, &expected, &completed]() + { + EXPECT_EQ(values, expected); + completed = true; + m_eventLoop->stop(); + }); + m_eventLoop->run(); + EXPECT_TRUE(completed); +} + +TEST_F(ObservableTakeWhile, checkNoErrorIsEmited) +{ + takeWhileObservable() // + .subscribe([](const auto) {}, + [](const auto &) + { + ADD_FAILURE(); + }, + [this]() + { + m_eventLoop->stop(); + }); + m_eventLoop->run(); +} + +TEST_F(ObservableTakeWhile, checkErrorsAreForwarded) +{ + bool gotError = false; + Observable::error(runtime_error(runtimeErrorMessage.data())) // + .takeWhile( + [](const auto value) + { + return value < 3; + }) + .subscribe([](const auto) {}, + [&gotError](const auto &exception) + { + gotError = true; + try + { + rethrow_exception(exception); + } + catch (runtime_error &runtimeError) + { + EXPECT_THAT(runtimeError.what(), testing::StrEq(runtimeErrorMessage)); + } + }); + EXPECT_TRUE(gotError); +} + +TEST_F(ObservableTakeWhile, checkDoNotCompleteOnError) +{ + Observable::error(runtime_error(runtimeErrorMessage.data())) // + .takeWhile( + [](const auto value) + { + return value < 3; + }) + .subscribe([](const auto) {}, [](const auto &) {}, + []() + { + ADD_FAILURE(); + }); +} + +class ObservableTakeUntil : public EventLoopBasedTest +{ +protected: + Observable takeUntilObservable() + { + return Observable::interval(chrono::milliseconds(1), *m_eventLoop) + .takeUntil( + [](const auto value) + { + return value > 2; + }); + } +}; + +TEST_F(ObservableTakeUntil, checkEmitedValues) +{ + bool completed = false; + const vector expected = {0, 1, 2}; + vector values; + takeUntilObservable() // + .subscribe( + [&values](const auto value) + { + values.push_back(value); + }, + [](const auto &) {}, + [this, &values, &expected, &completed]() + { + EXPECT_EQ(values, expected); + completed = true; + m_eventLoop->stop(); + }); + m_eventLoop->run(); + EXPECT_TRUE(completed); +} + +TEST_F(ObservableTakeUntil, checkNoErrorIsEmited) +{ + takeUntilObservable() // + .subscribe([](const auto) {}, + [](const auto &) + { + ADD_FAILURE(); + }, + [this]() + { + m_eventLoop->stop(); + }); + m_eventLoop->run(); +} + +TEST_F(ObservableTakeUntil, checkErrorsAreForwarded) +{ + bool gotError = false; + Observable::error(runtime_error(runtimeErrorMessage.data())) // + .takeWhile( + [](const auto value) + { + return value < 3; + }) + .subscribe([](const auto) {}, + [&gotError](const auto &exception) + { + gotError = true; + try + { + rethrow_exception(exception); + } + catch (runtime_error &runtimeError) + { + EXPECT_THAT(runtimeError.what(), testing::StrEq(runtimeErrorMessage)); + } + }); + EXPECT_TRUE(gotError); +} + +TEST_F(ObservableTakeUntil, checkDoNotCompleteOnError) +{ + Observable::error(runtime_error(runtimeErrorMessage.data())) // + .takeWhile( + [](const auto value) + { + return value < 3; + }) + .subscribe([](const auto) {}, [](const auto &) {}, + []() + { + ADD_FAILURE(); + }); +} + class ObservableIgnoreElements : public testing::Test { };