diff --git a/CMakeLists.txt b/CMakeLists.txt index aa28010..44032d1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -41,6 +41,7 @@ set(SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/include/recpp/processors/ObserveOn.h ${CMAKE_CURRENT_SOURCE_DIR}/include/recpp/processors/Reduce.h ${CMAKE_CURRENT_SOURCE_DIR}/include/recpp/processors/SubscribeOn.h + ${CMAKE_CURRENT_SOURCE_DIR}/include/recpp/processors/SwitchOnError.h ${CMAKE_CURRENT_SOURCE_DIR}/include/recpp/processors/Take.h ${CMAKE_CURRENT_SOURCE_DIR}/include/recpp/processors/TakeWhile.h ${CMAKE_CURRENT_SOURCE_DIR}/include/recpp/processors/Tap.h @@ -63,6 +64,7 @@ set(SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/include/recpp/processors/inl/ObserveOn.inl ${CMAKE_CURRENT_SOURCE_DIR}/include/recpp/processors/inl/Reduce.inl ${CMAKE_CURRENT_SOURCE_DIR}/include/recpp/processors/inl/SubscribeOn.inl + ${CMAKE_CURRENT_SOURCE_DIR}/include/recpp/processors/inl/SwitchOnError.inl ${CMAKE_CURRENT_SOURCE_DIR}/include/recpp/processors/inl/Take.inl ${CMAKE_CURRENT_SOURCE_DIR}/include/recpp/processors/inl/TakeWhile.inl ${CMAKE_CURRENT_SOURCE_DIR}/include/recpp/processors/inl/Tap.inl diff --git a/include/recpp/processors/SwitchOnError.h b/include/recpp/processors/SwitchOnError.h new file mode 100644 index 0000000..4648e80 --- /dev/null +++ b/include/recpp/processors/SwitchOnError.h @@ -0,0 +1,57 @@ +#pragma once + +#include + +#include + +#include +#include + +namespace recpp::processors +{ + /** + * @class SwitchOnError SwitchOnError.h + * @brief {@link rscpp::Processor} that will subscribe to a given {@link rscpp::Publisher} and switch to another {@link rscpp::Publisher} if the first one + * emits an error. + * + * @tparam T The type of element signaled to the {@link rscpp::Subscriber} and signaled from the {@link rscpp::Publisher}. + */ + template + class SwitchOnError : public rscpp::Processor + { + class Impl : public rscpp::Processor + { + public: + explicit Impl(rscpp::Processor &parent, const rscpp::Publisher &publisher, const rscpp::Publisher &fallbackPublisher); + + void onSubscribe(rscpp::Subscription &subscription) override; + + void onNext(const T &value) override; + + void onError(const std::exception_ptr &error) override; + + void onComplete() override; + + void subscribe(rscpp::Subscriber &subscriber) override; + + private: + rscpp::Processor &m_parent; + rscpp::Publisher m_publisher; + rscpp::Publisher m_fallbackPublisher; + rscpp::Subscriber m_subscriber; + bool m_publisherErrored = false; + }; + + public: + /** + * @brief Construct a new {@link SwitchOnError} instance. + * + * @param publisher The source {@link rscpp::Publisher} the {@link SwitchOnError} {@link rscpp::Processor} subscribes to. + * @param fallbackPublisher The second {@link rscpp::Publisher} the {@link SwitchOnError} {@link rscpp::Processor} subscribes to in case the first one + * emits an error. + */ + explicit SwitchOnError(const rscpp::Publisher &publisher, const rscpp::Publisher &fallbackPublisher); + }; +} // namespace recpp::processors + +#include diff --git a/include/recpp/processors/inl/SwitchOnError.inl b/include/recpp/processors/inl/SwitchOnError.inl new file mode 100644 index 0000000..52a98f4 --- /dev/null +++ b/include/recpp/processors/inl/SwitchOnError.inl @@ -0,0 +1,55 @@ +#pragma once + +template +recpp::processors::SwitchOnError::SwitchOnError(const rscpp::Publisher &publisher, const rscpp::Publisher &fallbackPublisher) + : rscpp::Processor(std::make_shared(*this, publisher, fallbackPublisher)) +{ +} + +template +recpp::processors::SwitchOnError::Impl::Impl(rscpp::Processor &parent, const rscpp::Publisher &publisher, + const rscpp::Publisher &fallbackPublisher) + : m_parent(parent) + , m_publisher(publisher) + , m_fallbackPublisher(fallbackPublisher) +{ +} + +template +void recpp::processors::SwitchOnError::Impl::onSubscribe(rscpp::Subscription &subscription) +{ + auto forwardSubscription = recpp::subscriptions::ForwardSubscription(subscription); + m_subscriber.onSubscribe(forwardSubscription); +} + +template +void recpp::processors::SwitchOnError::Impl::onNext(const T &value) +{ + m_subscriber.onNext(value); +} + +template +void recpp::processors::SwitchOnError::Impl::onError(const std::exception_ptr &error) +{ + if (!m_publisherErrored) + { + m_publisherErrored = true; + m_fallbackPublisher.subscribe(m_parent); + } + else + m_subscriber.onError(error); +} + +template +void recpp::processors::SwitchOnError::Impl::onComplete() +{ + m_subscriber.onComplete(); +} + +template +void recpp::processors::SwitchOnError::Impl::subscribe(rscpp::Subscriber &subscriber) +{ + m_subscriber = subscriber; + m_publisher.subscribe(m_parent); + m_publisherErrored = false; +} diff --git a/include/recpp/rx/Completable.h b/include/recpp/rx/Completable.h index a8d9fa0..0e22e35 100644 --- a/include/recpp/rx/Completable.h +++ b/include/recpp/rx/Completable.h @@ -154,6 +154,14 @@ namespace recpp::rx */ Completable tap(const OnCompleteMethod &onCompleteMethod, const OnErrorMethod &onErrorMethod); + /** + * @brief Switch to another {@link Completable} in case this one emits an error. + * + * @param fallbackCompletable The {@link Completable} to use in case this one emits an error. + * @return The new {@link Completable} instance. + */ + Completable switchOnError(const Completable &fallbackCompletable); + /** * @brief Forwards all emissions on the given {@link async::Scheduler}. * diff --git a/include/recpp/rx/Maybe.h b/include/recpp/rx/Maybe.h index 8fc2e03..b7618d8 100644 --- a/include/recpp/rx/Maybe.h +++ b/include/recpp/rx/Maybe.h @@ -210,6 +210,14 @@ namespace recpp::rx */ Maybe tap(const OnNextMethod &onNextMethod, const OnErrorMethod &onErrorMethod, const OnCompleteMethod &onCompleteMethod); + /** + * @brief Switch to another {@link Maybe} in case this one emits an error. + * + * @param fallbackMaybe The {@link Maybe} to use in case this one emits an error. + * @return The new {@link Maybe} instance. + */ + Maybe switchOnError(const Maybe &fallbackMaybe); + /** * @brief Forwards all emissions on the given {@link async::Scheduler}. * diff --git a/include/recpp/rx/Observable.h b/include/recpp/rx/Observable.h index e6e4735..8319dca 100644 --- a/include/recpp/rx/Observable.h +++ b/include/recpp/rx/Observable.h @@ -283,6 +283,14 @@ namespace recpp::rx */ Observable tap(const OnNextMethod &onNextMethod, const OnErrorMethod &onErrorMethod, const OnCompleteMethod &onCompleteMethod); + /** + * @brief Switch to another {@link Observable} in case this one emits an error. + * + * @param fallbackObservable The {@link Observable} to use in case this one emits an error. + * @return The new {@link Observable} instance. + */ + Observable switchOnError(const Observable &fallbackObservable); + /** * @brief Forwards all emissions on the given {@link async::Scheduler}. * diff --git a/include/recpp/rx/Single.h b/include/recpp/rx/Single.h index 6ce9cbb..5dd5fb5 100644 --- a/include/recpp/rx/Single.h +++ b/include/recpp/rx/Single.h @@ -228,6 +228,14 @@ namespace recpp::rx */ Single tap(const OnSuccessMethod &onSuccessMethod, const OnErrorMethod &onErrorMethod); + /** + * @brief Switch to another {@link Single} in case this one emits an error. + * + * @param fallbackSingle The {@link Single} to use in case this one emits an error. + * @return The new {@link Single} instance. + */ + Single switchOnError(const Single &fallbackSingle); + /** * @brief Forwards all emissions on the given {@link async::Scheduler}. * diff --git a/include/recpp/rx/inl/Maybe.inl b/include/recpp/rx/inl/Maybe.inl index b339063..447632d 100644 --- a/include/recpp/rx/inl/Maybe.inl +++ b/include/recpp/rx/inl/Maybe.inl @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -135,6 +136,12 @@ recpp::rx::Maybe recpp::rx::Maybe::tap(const OnNextMethod &onNextMethod, c return Maybe(std::make_shared>(*this, onNextMethod, onErrorMethod, onCompleteMethod)); } +template +recpp::rx::Maybe recpp::rx::Maybe::switchOnError(const Maybe &fallbackMaybe) +{ + return Maybe(std::make_shared>(*this, fallbackMaybe)); +} + template recpp::rx::Maybe recpp::rx::Maybe::observeOn(recpp::async::Scheduler &scheduler) { diff --git a/include/recpp/rx/inl/Observable.inl b/include/recpp/rx/inl/Observable.inl index 971187b..bc13643 100644 --- a/include/recpp/rx/inl/Observable.inl +++ b/include/recpp/rx/inl/Observable.inl @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -207,6 +208,12 @@ recpp::rx::Observable recpp::rx::Observable::tap(const OnNextMethod &onNex return Observable(std::make_shared>(*this, onNextMethod, onErrorMethod, onCompleteMethod)); } +template +recpp::rx::Observable recpp::rx::Observable::switchOnError(const Observable &fallbackObservable) +{ + return Observable(std::make_shared>(*this, fallbackObservable)); +} + template recpp::rx::Observable recpp::rx::Observable::observeOn(recpp::async::Scheduler &scheduler) { diff --git a/include/recpp/rx/inl/Single.inl b/include/recpp/rx/inl/Single.inl index dc1ae7c..f49e5a5 100644 --- a/include/recpp/rx/inl/Single.inl +++ b/include/recpp/rx/inl/Single.inl @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -138,6 +139,12 @@ recpp::rx::Single recpp::rx::Single::tap(const OnSuccessMethod &onSuccessM return Single(std::make_shared>(*this, onSuccessMethod, onErrorMethod, nullptr)); } +template +recpp::rx::Single recpp::rx::Single::switchOnError(const Single &fallbackSingle) +{ + return Single(std::make_shared>(*this, fallbackSingle)); +} + template recpp::rx::Single recpp::rx::Single::observeOn(recpp::async::Scheduler &scheduler) { diff --git a/src/rx/Completable.cpp b/src/rx/Completable.cpp index 88e78b2..c3898dc 100644 --- a/src/rx/Completable.cpp +++ b/src/rx/Completable.cpp @@ -85,6 +85,11 @@ Completable Completable::tap(const Completable::OnCompleteMethod &onCompleteMeth return Completable(make_shared>(*this, nullptr, onErrorMethod, onCompleteMethod)); } +Completable Completable::switchOnError(const Completable &fallbackCompletable) +{ + return Completable(make_shared>(*this, fallbackCompletable)); +} + Completable Completable::observeOn(Scheduler &scheduler) { return Completable(make_shared>(*this, scheduler)); diff --git a/tests/rx/Completable.cpp b/tests/rx/Completable.cpp index e75cb93..06e371f 100644 --- a/tests/rx/Completable.cpp +++ b/tests/rx/Completable.cpp @@ -574,6 +574,82 @@ TEST_F(CompletableTap, checkOnErrorIsNotEmitedOnComplete) .subscribe(); } +class CompletableSwitchOnError : public testing::Test +{ +}; + +TEST_F(CompletableSwitchOnError, checkCompletes) +{ + auto completed = false; + Completable::complete() + .switchOnError(Completable::complete()) + .subscribe( + [&completed]() + { + EXPECT_FALSE(completed); + completed = true; + }); + EXPECT_TRUE(completed); +} + +TEST_F(CompletableSwitchOnError, checkNoError) +{ + Completable::complete() + .switchOnError(Completable::complete()) + .subscribe(nullptr, + [](const auto &) + { + ADD_FAILURE(); + }); +} + +TEST_F(CompletableSwitchOnError, checkSwitchCompletes) +{ + auto completed = false; + Completable::error(runtime_error(runtimeErrorMessage.data())) // + .switchOnError(Completable::complete()) + .subscribe( + [&completed]() + { + EXPECT_FALSE(completed); + completed = true; + }); + EXPECT_TRUE(completed); +} + +TEST_F(CompletableSwitchOnError, checkSwitchNoError) +{ + Completable::error(runtime_error(runtimeErrorMessage.data())) // + .switchOnError(Completable::complete()) + .subscribe(nullptr, + [](const auto &) + { + ADD_FAILURE(); + }); +} + +TEST_F(CompletableSwitchOnError, checkForwardsError) +{ + auto gotError = false; + Completable::error(runtime_error(runtimeErrorMessage.data())) // + .switchOnError(Completable::error(runtime_error(runtimeErrorMessage.data()))) + .subscribe(nullptr, + [&gotError](const auto &error) + { + EXPECT_FALSE(gotError); + gotError = true; + try + { + rethrow_exception(error); + } + catch (runtime_error &runtimeError) + { + EXPECT_THAT(runtimeError.what(), testing::StrEq(runtimeErrorMessage)); + } + }); + EXPECT_TRUE(gotError); +} + class CompletableObserveOn : public WorkerThreadBasedTest { }; diff --git a/tests/rx/Maybe.cpp b/tests/rx/Maybe.cpp index c11e3ef..71ecb4e 100644 --- a/tests/rx/Maybe.cpp +++ b/tests/rx/Maybe.cpp @@ -1002,6 +1002,104 @@ TEST_F(MaybeTap, checkTapDoOnNextIsNotCalledInCaseOfAnError) .subscribe(); } +class MaybeSwitchOnError : public testing::Test +{ +}; + +TEST_F(MaybeSwitchOnError, checkForwardsValues) +{ + Maybe::just(defaultValue) // + .switchOnError(Maybe::just(otherValue)) + .subscribe( + [](const auto value) + { + EXPECT_EQ(value, defaultValue); + }); +} + +TEST_F(MaybeSwitchOnError, checkCompletes) +{ + auto completed = false; + Maybe::just(defaultValue) // + .switchOnError(Maybe::just(otherValue)) + .subscribe( + [&completed](const auto) + { + EXPECT_FALSE(completed); + completed = true; + }); + EXPECT_TRUE(completed); +} + +TEST_F(MaybeSwitchOnError, checkNoError) +{ + Maybe::just(defaultValue) // + .switchOnError(Maybe::just(otherValue)) + .subscribe(nullptr, + [](const auto &) + { + ADD_FAILURE(); + }); +} + +TEST_F(MaybeSwitchOnError, checkSwitchWithValue) +{ + Maybe::error(runtime_error(runtimeErrorMessage.data())) // + .switchOnError(Maybe::just(defaultValue)) + .subscribe( + [](const auto value) + { + EXPECT_EQ(value, defaultValue); + }); +} + +TEST_F(MaybeSwitchOnError, checkSwitchCompletes) +{ + auto completed = false; + Maybe::error(runtime_error(runtimeErrorMessage.data())) // + .switchOnError(Maybe::just(defaultValue)) + .subscribe( + [&completed](const auto) + { + EXPECT_FALSE(completed); + completed = true; + }); + EXPECT_TRUE(completed); +} + +TEST_F(MaybeSwitchOnError, checkSwitchNoError) +{ + Maybe::error(runtime_error(runtimeErrorMessage.data())) // + .switchOnError(Maybe::just(defaultValue)) + .subscribe(nullptr, + [](const auto &) + { + ADD_FAILURE(); + }); +} + +TEST_F(MaybeSwitchOnError, checkForwardsError) +{ + auto gotError = false; + Maybe::error(runtime_error(runtimeErrorMessage.data())) // + .switchOnError(Maybe::error(runtime_error(runtimeErrorMessage.data()))) + .subscribe(nullptr, + [&gotError](const auto &error) + { + EXPECT_FALSE(gotError); + gotError = true; + try + { + rethrow_exception(error); + } + catch (runtime_error &runtimeError) + { + EXPECT_THAT(runtimeError.what(), testing::StrEq(runtimeErrorMessage)); + } + }); + EXPECT_TRUE(gotError); +} + class MaybeObserveOn : public WorkerThreadBasedTest { }; diff --git a/tests/rx/Observable.cpp b/tests/rx/Observable.cpp index 842ab95..d249403 100644 --- a/tests/rx/Observable.cpp +++ b/tests/rx/Observable.cpp @@ -1458,6 +1458,106 @@ TEST_F(ObservableTap, checkTapDoOnNextIsNotCalledInCaseOfAnError) .subscribe(); } +class ObservableSwitchOnError : public testing::Test +{ +}; + +TEST_F(ObservableSwitchOnError, checkForwardsValues) +{ + vector values; + Observable::range(defaultValues) // + .switchOnError(Observable::just(defaultValue)) + .subscribe( + [&values](const auto value) + { + values.push_back(value); + }); + EXPECT_EQ(values, defaultValues); +} + +TEST_F(ObservableSwitchOnError, checkCompletes) +{ + auto completed = false; + Observable::range(defaultValues) // + .switchOnError(Observable::just(defaultValue)) + .subscribe(nullptr, nullptr, + [&completed]() + { + EXPECT_FALSE(completed); + completed = true; + }); + EXPECT_TRUE(completed); +} + +TEST_F(ObservableSwitchOnError, checkNoError) +{ + Observable::range(defaultValues) // + .switchOnError(Observable::just(defaultValue)) + .subscribe(nullptr, + [](const auto &) + { + ADD_FAILURE(); + }); +} + +TEST_F(ObservableSwitchOnError, checkSwitchWithValue) +{ + Observable::error(runtime_error(runtimeErrorMessage.data())) // + .switchOnError(Observable::just(defaultValue)) + .subscribe( + [](const auto value) + { + EXPECT_EQ(value, defaultValue); + }); +} + +TEST_F(ObservableSwitchOnError, checkSwitchCompletes) +{ + auto completed = false; + Observable::error(runtime_error(runtimeErrorMessage.data())) // + .switchOnError(Observable::just(defaultValue)) + .subscribe(nullptr, nullptr, + [&completed]() + { + EXPECT_FALSE(completed); + completed = true; + }); + EXPECT_TRUE(completed); +} + +TEST_F(ObservableSwitchOnError, checkSwitchNoError) +{ + Observable::error(runtime_error(runtimeErrorMessage.data())) // + .switchOnError(Observable::just(defaultValue)) + .subscribe(nullptr, + [](const auto &) + { + ADD_FAILURE(); + }); +} + +TEST_F(ObservableSwitchOnError, checkForwardsError) +{ + auto gotError = false; + Observable::error(runtime_error(runtimeErrorMessage.data())) // + .switchOnError(Observable::error(runtime_error(runtimeErrorMessage.data()))) + .subscribe(nullptr, + [&gotError](const auto &error) + { + EXPECT_FALSE(gotError); + gotError = true; + try + { + rethrow_exception(error); + } + catch (runtime_error &runtimeError) + { + EXPECT_THAT(runtimeError.what(), testing::StrEq(runtimeErrorMessage)); + } + }); + EXPECT_TRUE(gotError); +} + class ObservableObserveOn : public WorkerThreadBasedTest { }; diff --git a/tests/rx/Single.cpp b/tests/rx/Single.cpp index 160268a..30aca04 100644 --- a/tests/rx/Single.cpp +++ b/tests/rx/Single.cpp @@ -21,6 +21,7 @@ using namespace std; namespace { constexpr int defaultValue = 42; + constexpr int otherValue = 66; const vector defaultValues = {1, 2, 3}; constexpr auto sleepDuration = chrono::milliseconds(10); constexpr auto delayTolerance = chrono::milliseconds(1); @@ -1131,6 +1132,104 @@ TEST_F(SingleTap, checkTapDoOnNextIsNotCalledInCaseOfAnError) .subscribe(); } +class SingleSwitchOnError : public testing::Test +{ +}; + +TEST_F(SingleSwitchOnError, checkForwardsValues) +{ + Single::just(defaultValue) // + .switchOnError(Single::just(otherValue)) + .subscribe( + [](const auto value) + { + EXPECT_EQ(value, defaultValue); + }); +} + +TEST_F(SingleSwitchOnError, checkCompletes) +{ + auto completed = false; + Single::just(defaultValue) // + .switchOnError(Single::just(otherValue)) + .subscribe( + [&completed](const auto) + { + EXPECT_FALSE(completed); + completed = true; + }); + EXPECT_TRUE(completed); +} + +TEST_F(SingleSwitchOnError, checkNoError) +{ + Single::just(defaultValue) // + .switchOnError(Single::just(otherValue)) + .subscribe(nullptr, + [](const auto &) + { + ADD_FAILURE(); + }); +} + +TEST_F(SingleSwitchOnError, checkSwitchWithValue) +{ + Single::error(runtime_error(runtimeErrorMessage.data())) // + .switchOnError(Single::just(defaultValue)) + .subscribe( + [](const auto value) + { + EXPECT_EQ(value, defaultValue); + }); +} + +TEST_F(SingleSwitchOnError, checkSwitchCompletes) +{ + auto completed = false; + Single::error(runtime_error(runtimeErrorMessage.data())) // + .switchOnError(Single::just(defaultValue)) + .subscribe( + [&completed](const auto) + { + EXPECT_FALSE(completed); + completed = true; + }); + EXPECT_TRUE(completed); +} + +TEST_F(SingleSwitchOnError, checkSwitchNoError) +{ + Single::error(runtime_error(runtimeErrorMessage.data())) // + .switchOnError(Single::just(defaultValue)) + .subscribe(nullptr, + [](const auto &) + { + ADD_FAILURE(); + }); +} + +TEST_F(SingleSwitchOnError, checkForwardsError) +{ + auto gotError = false; + Single::error(runtime_error(runtimeErrorMessage.data())) // + .switchOnError(Single::error(runtime_error(runtimeErrorMessage.data()))) + .subscribe(nullptr, + [&gotError](const auto &error) + { + EXPECT_FALSE(gotError); + gotError = true; + try + { + rethrow_exception(error); + } + catch (runtime_error &runtimeError) + { + EXPECT_THAT(runtimeError.what(), testing::StrEq(runtimeErrorMessage)); + } + }); + EXPECT_TRUE(gotError); +} + class SingleObserveOn : public WorkerThreadBasedTest { };