Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ set(SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/include/recpp/processors/ObserveOn.h
${CMAKE_CURRENT_SOURCE_DIR}/include/recpp/processors/Reduce.h
${CMAKE_CURRENT_SOURCE_DIR}/include/recpp/processors/SubscribeOn.h
${CMAKE_CURRENT_SOURCE_DIR}/include/recpp/processors/SwitchOnError.h
${CMAKE_CURRENT_SOURCE_DIR}/include/recpp/processors/Take.h
${CMAKE_CURRENT_SOURCE_DIR}/include/recpp/processors/TakeWhile.h
${CMAKE_CURRENT_SOURCE_DIR}/include/recpp/processors/Tap.h
Expand All @@ -63,6 +64,7 @@ set(SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/include/recpp/processors/inl/ObserveOn.inl
${CMAKE_CURRENT_SOURCE_DIR}/include/recpp/processors/inl/Reduce.inl
${CMAKE_CURRENT_SOURCE_DIR}/include/recpp/processors/inl/SubscribeOn.inl
${CMAKE_CURRENT_SOURCE_DIR}/include/recpp/processors/inl/SwitchOnError.inl
${CMAKE_CURRENT_SOURCE_DIR}/include/recpp/processors/inl/Take.inl
${CMAKE_CURRENT_SOURCE_DIR}/include/recpp/processors/inl/TakeWhile.inl
${CMAKE_CURRENT_SOURCE_DIR}/include/recpp/processors/inl/Tap.inl
Expand Down
57 changes: 57 additions & 0 deletions include/recpp/processors/SwitchOnError.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#pragma once

#include <rscpp/Processor.h>

#include <recpp/subscriptions/FilterSubscription.h>

#include <functional>
#include <vector>

namespace recpp::processors
{
/**
* @class SwitchOnError SwitchOnError.h <recpp/processors/SwitchOnError.h>
* @brief {@link rscpp::Processor} that will subscribe to a given {@link rscpp::Publisher} and switch to another {@link rscpp::Publisher} if the first one
* emits an error.
*
* @tparam T The type of element signaled to the {@link rscpp::Subscriber} and signaled from the {@link rscpp::Publisher}.
*/
template <typename T>
class SwitchOnError : public rscpp::Processor<T, T>
{
class Impl : public rscpp::Processor<T, T>
{
public:
explicit Impl(rscpp::Processor<T, T> &parent, const rscpp::Publisher<T> &publisher, const rscpp::Publisher<T> &fallbackPublisher);

void onSubscribe(rscpp::Subscription &subscription) override;

void onNext(const T &value) override;

void onError(const std::exception_ptr &error) override;

void onComplete() override;

void subscribe(rscpp::Subscriber<T> &subscriber) override;

private:
rscpp::Processor<T, T> &m_parent;
rscpp::Publisher<T> m_publisher;
rscpp::Publisher<T> m_fallbackPublisher;
rscpp::Subscriber<T> m_subscriber;
bool m_publisherErrored = false;
};

public:
/**
* @brief Construct a new {@link SwitchOnError} instance.
*
* @param publisher The source {@link rscpp::Publisher} the {@link SwitchOnError} {@link rscpp::Processor} subscribes to.
* @param fallbackPublisher The second {@link rscpp::Publisher} the {@link SwitchOnError} {@link rscpp::Processor} subscribes to in case the first one
* emits an error.
*/
explicit SwitchOnError(const rscpp::Publisher<T> &publisher, const rscpp::Publisher<T> &fallbackPublisher);
};
} // namespace recpp::processors

#include <recpp/processors/inl/SwitchOnError.inl>
55 changes: 55 additions & 0 deletions include/recpp/processors/inl/SwitchOnError.inl
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#pragma once

template <typename T>
recpp::processors::SwitchOnError<T>::SwitchOnError(const rscpp::Publisher<T> &publisher, const rscpp::Publisher<T> &fallbackPublisher)
: rscpp::Processor<T, T>(std::make_shared<Impl>(*this, publisher, fallbackPublisher))
{
}

template <typename T>
recpp::processors::SwitchOnError<T>::Impl::Impl(rscpp::Processor<T, T> &parent, const rscpp::Publisher<T> &publisher,
const rscpp::Publisher<T> &fallbackPublisher)
: m_parent(parent)
, m_publisher(publisher)
, m_fallbackPublisher(fallbackPublisher)
{
}

template <typename T>
void recpp::processors::SwitchOnError<T>::Impl::onSubscribe(rscpp::Subscription &subscription)
{
auto forwardSubscription = recpp::subscriptions::ForwardSubscription(subscription);
m_subscriber.onSubscribe(forwardSubscription);
}

template <typename T>
void recpp::processors::SwitchOnError<T>::Impl::onNext(const T &value)
{
m_subscriber.onNext(value);
}

template <typename T>
void recpp::processors::SwitchOnError<T>::Impl::onError(const std::exception_ptr &error)
{
if (!m_publisherErrored)
{
m_publisherErrored = true;
m_fallbackPublisher.subscribe(m_parent);
}
else
m_subscriber.onError(error);
}

template <typename T>
void recpp::processors::SwitchOnError<T>::Impl::onComplete()
{
m_subscriber.onComplete();
}

template <typename T>
void recpp::processors::SwitchOnError<T>::Impl::subscribe(rscpp::Subscriber<T> &subscriber)
{
m_subscriber = subscriber;
m_publisher.subscribe(m_parent);
m_publisherErrored = false;
}
8 changes: 8 additions & 0 deletions include/recpp/rx/Completable.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,14 @@ namespace recpp::rx
*/
Completable tap(const OnCompleteMethod &onCompleteMethod, const OnErrorMethod &onErrorMethod);

/**
* @brief Switch to another {@link Completable} in case this one emits an error.
*
* @param fallbackCompletable The {@link Completable} to use in case this one emits an error.
* @return The new {@link Completable} instance.
*/
Completable switchOnError(const Completable &fallbackCompletable);

/**
* @brief Forwards all emissions on the given {@link async::Scheduler}.
*
Expand Down
8 changes: 8 additions & 0 deletions include/recpp/rx/Maybe.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,14 @@ namespace recpp::rx
*/
Maybe<T> tap(const OnNextMethod &onNextMethod, const OnErrorMethod &onErrorMethod, const OnCompleteMethod &onCompleteMethod);

/**
* @brief Switch to another {@link Maybe} in case this one emits an error.
*
* @param fallbackMaybe The {@link Maybe} to use in case this one emits an error.
* @return The new {@link Maybe} instance.
*/
Maybe<T> switchOnError(const Maybe<T> &fallbackMaybe);

/**
* @brief Forwards all emissions on the given {@link async::Scheduler}.
*
Expand Down
8 changes: 8 additions & 0 deletions include/recpp/rx/Observable.h
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,14 @@ namespace recpp::rx
*/
Observable<T> tap(const OnNextMethod &onNextMethod, const OnErrorMethod &onErrorMethod, const OnCompleteMethod &onCompleteMethod);

/**
* @brief Switch to another {@link Observable} in case this one emits an error.
*
* @param fallbackObservable The {@link Observable} to use in case this one emits an error.
* @return The new {@link Observable} instance.
*/
Observable<T> switchOnError(const Observable<T> &fallbackObservable);

/**
* @brief Forwards all emissions on the given {@link async::Scheduler}.
*
Expand Down
8 changes: 8 additions & 0 deletions include/recpp/rx/Single.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,14 @@ namespace recpp::rx
*/
Single<T> tap(const OnSuccessMethod &onSuccessMethod, const OnErrorMethod &onErrorMethod);

/**
* @brief Switch to another {@link Single} in case this one emits an error.
*
* @param fallbackSingle The {@link Single} to use in case this one emits an error.
* @return The new {@link Single} instance.
*/
Single<T> switchOnError(const Single<T> &fallbackSingle);

/**
* @brief Forwards all emissions on the given {@link async::Scheduler}.
*
Expand Down
7 changes: 7 additions & 0 deletions include/recpp/rx/inl/Maybe.inl
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <recpp/processors/Map.h>
#include <recpp/processors/ObserveOn.h>
#include <recpp/processors/SubscribeOn.h>
#include <recpp/processors/SwitchOnError.h>
#include <recpp/processors/Tap.h>
#include <recpp/publishers/CreatePublisher.h>
#include <recpp/publishers/DeferPublisher.h>
Expand Down Expand Up @@ -135,6 +136,12 @@ recpp::rx::Maybe<T> recpp::rx::Maybe<T>::tap(const OnNextMethod &onNextMethod, c
return Maybe<T>(std::make_shared<processors::Tap<T>>(*this, onNextMethod, onErrorMethod, onCompleteMethod));
}

template <typename T>
recpp::rx::Maybe<T> recpp::rx::Maybe<T>::switchOnError(const Maybe<T> &fallbackMaybe)
{
return Maybe<T>(std::make_shared<processors::SwitchOnError<T>>(*this, fallbackMaybe));
}

template <typename T>
recpp::rx::Maybe<T> recpp::rx::Maybe<T>::observeOn(recpp::async::Scheduler &scheduler)
{
Expand Down
7 changes: 7 additions & 0 deletions include/recpp/rx/inl/Observable.inl
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <recpp/processors/ObserveOn.h>
#include <recpp/processors/Reduce.h>
#include <recpp/processors/SubscribeOn.h>
#include <recpp/processors/SwitchOnError.h>
#include <recpp/processors/Take.h>
#include <recpp/processors/TakeWhile.h>
#include <recpp/processors/Tap.h>
Expand Down Expand Up @@ -207,6 +208,12 @@ recpp::rx::Observable<T> recpp::rx::Observable<T>::tap(const OnNextMethod &onNex
return Observable<T>(std::make_shared<processors::Tap<T>>(*this, onNextMethod, onErrorMethod, onCompleteMethod));
}

template <typename T>
recpp::rx::Observable<T> recpp::rx::Observable<T>::switchOnError(const Observable<T> &fallbackObservable)
{
return Observable<T>(std::make_shared<processors::SwitchOnError<T>>(*this, fallbackObservable));
}

template <typename T>
recpp::rx::Observable<T> recpp::rx::Observable<T>::observeOn(recpp::async::Scheduler &scheduler)
{
Expand Down
7 changes: 7 additions & 0 deletions include/recpp/rx/inl/Single.inl
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <recpp/processors/Map.h>
#include <recpp/processors/ObserveOn.h>
#include <recpp/processors/SubscribeOn.h>
#include <recpp/processors/SwitchOnError.h>
#include <recpp/processors/Tap.h>
#include <recpp/publishers/CreatePublisher.h>
#include <recpp/publishers/DeferPublisher.h>
Expand Down Expand Up @@ -138,6 +139,12 @@ recpp::rx::Single<T> recpp::rx::Single<T>::tap(const OnSuccessMethod &onSuccessM
return Single<T>(std::make_shared<processors::Tap<T>>(*this, onSuccessMethod, onErrorMethod, nullptr));
}

template <typename T>
recpp::rx::Single<T> recpp::rx::Single<T>::switchOnError(const Single<T> &fallbackSingle)
{
return Single<T>(std::make_shared<processors::SwitchOnError<T>>(*this, fallbackSingle));
}

template <typename T>
recpp::rx::Single<T> recpp::rx::Single<T>::observeOn(recpp::async::Scheduler &scheduler)
{
Expand Down
5 changes: 5 additions & 0 deletions src/rx/Completable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ Completable Completable::tap(const Completable::OnCompleteMethod &onCompleteMeth
return Completable(make_shared<Tap<int>>(*this, nullptr, onErrorMethod, onCompleteMethod));
}

Completable Completable::switchOnError(const Completable &fallbackCompletable)
{
return Completable(make_shared<SwitchOnError<int>>(*this, fallbackCompletable));
}

Completable Completable::observeOn(Scheduler &scheduler)
{
return Completable(make_shared<ObserveOn<int>>(*this, scheduler));
Expand Down
76 changes: 76 additions & 0 deletions tests/rx/Completable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,82 @@ TEST_F(CompletableTap, checkOnErrorIsNotEmitedOnComplete)
.subscribe();
}

class CompletableSwitchOnError : public testing::Test
{
};

TEST_F(CompletableSwitchOnError, checkCompletes)
{
auto completed = false;
Completable::complete()
.switchOnError(Completable::complete())
.subscribe(
[&completed]()
{
EXPECT_FALSE(completed);
completed = true;
});
EXPECT_TRUE(completed);
}

TEST_F(CompletableSwitchOnError, checkNoError)
{
Completable::complete()
.switchOnError(Completable::complete())
.subscribe(nullptr,
[](const auto &)
{
ADD_FAILURE();
});
}

TEST_F(CompletableSwitchOnError, checkSwitchCompletes)
{
auto completed = false;
Completable::error(runtime_error(runtimeErrorMessage.data())) //
.switchOnError(Completable::complete())
.subscribe(
[&completed]()
{
EXPECT_FALSE(completed);
completed = true;
});
EXPECT_TRUE(completed);
}

TEST_F(CompletableSwitchOnError, checkSwitchNoError)
{
Completable::error(runtime_error(runtimeErrorMessage.data())) //
.switchOnError(Completable::complete())
.subscribe(nullptr,
[](const auto &)
{
ADD_FAILURE();
});
}

TEST_F(CompletableSwitchOnError, checkForwardsError)
{
auto gotError = false;
Completable::error(runtime_error(runtimeErrorMessage.data())) //
.switchOnError(Completable::error(runtime_error(runtimeErrorMessage.data())))
.subscribe(nullptr,
[&gotError](const auto &error)
{
EXPECT_FALSE(gotError);
gotError = true;
try
{
rethrow_exception(error);
}
catch (runtime_error &runtimeError)
{
EXPECT_THAT(runtimeError.what(), testing::StrEq(runtimeErrorMessage));
}
});
EXPECT_TRUE(gotError);
}

class CompletableObserveOn : public WorkerThreadBasedTest
{
};
Expand Down
Loading