Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion include/recpp/processors/inl/AllOf.inl
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ void recpp::processors::AllOf<T, R>::Impl::onComplete()
{
m_subscriber.onNext(!m_gotFalse);
m_subscriber.onComplete();
m_subscriber = {};
}

template <typename T, typename R>
Expand Down
2 changes: 2 additions & 0 deletions include/recpp/processors/inl/AndThen.inl
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,14 @@ template <typename T, typename R>
void recpp::processors::AndThen<T, R>::Impl::onError(const std::exception_ptr &error)
{
m_subscriber.onError(error);
m_subscription = {};
}

template <typename T, typename R>
void recpp::processors::AndThen<T, R>::Impl::onComplete()
{
m_dest.subscribe(m_subscriber);
m_subscription = {};
}

template <typename T, typename R>
Expand Down
1 change: 0 additions & 1 deletion include/recpp/processors/inl/AnyOf.inl
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ void recpp::processors::AnyOf<T, R>::Impl::onComplete()
{
m_subscriber.onNext(m_gotTrue);
m_subscriber.onComplete();
m_subscriber = {};
}

template <typename T, typename R>
Expand Down
1 change: 0 additions & 1 deletion include/recpp/processors/inl/Count.inl
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ void recpp::processors::Count<T>::Impl::onComplete()
{
m_subscriber.onNext(m_count);
m_subscriber.onComplete();
m_subscriber = {};
}

template <typename T>
Expand Down
1 change: 0 additions & 1 deletion include/recpp/processors/inl/DefaultIfEmpty.inl
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ void recpp::processors::DefaultIfEmpty<T>::Impl::onComplete()
if (!m_gotValues)
m_subscriber.onNext(m_defaultValue);
m_subscriber.onComplete();
m_subscriber = {};
}

template <typename T>
Expand Down
1 change: 0 additions & 1 deletion include/recpp/processors/inl/ElementAt.inl
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ void recpp::processors::ElementAt<T>::Impl::onComplete()
if (m_result)
m_subscriber.onNext(*m_result);
m_subscriber.onComplete();
m_subscriber = {};
}

template <typename T>
Expand Down
1 change: 0 additions & 1 deletion include/recpp/processors/inl/Filter.inl
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ template <typename T>
void recpp::processors::Filter<T>::Impl::onComplete()
{
m_subscriber.onComplete();
m_subscriber = {};
}

template <typename T>
Expand Down
1 change: 0 additions & 1 deletion include/recpp/processors/inl/First.inl
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ void recpp::processors::First<T>::Impl::onComplete()
if (result)
m_subscriber.onNext(*result);
m_subscriber.onComplete();
m_subscriber = {};
}

template <typename T>
Expand Down
6 changes: 0 additions & 6 deletions include/recpp/processors/inl/FlatMap.inl
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,7 @@ void recpp::processors::FlatMap<T, R>::Impl::onNext(const T &value)
{
m_runningPublishers--;
if (!m_runningPublishers && m_completed)
{
m_subscriber.onComplete();
m_subscriber = {};
}
});
result.subscribe(subscriber);
}
Expand All @@ -55,10 +52,7 @@ void recpp::processors::FlatMap<T, R>::Impl::onComplete()
{
m_completed = true;
if (!m_runningPublishers)
{
m_subscriber.onComplete();
m_subscriber = {};
}
}

template <typename T, typename R>
Expand Down
3 changes: 2 additions & 1 deletion include/recpp/processors/inl/IgnoreElements.inl
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,14 @@ template <typename T, typename R>
void recpp::processors::IgnoreElements<T, R>::Impl::onError(const std::exception_ptr &error)
{
m_subscriber.onError(error);
m_subscription = {};
}

template <typename T, typename R>
void recpp::processors::IgnoreElements<T, R>::Impl::onComplete()
{
m_subscriber.onComplete();
m_subscriber = {};
m_subscription = {};
}

template <typename T, typename R>
Expand Down
1 change: 0 additions & 1 deletion include/recpp/processors/inl/Last.inl
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ void recpp::processors::Last<T>::Impl::onComplete()
if (result)
m_subscriber.onNext(*result);
m_subscriber.onComplete();
m_subscriber = {};
}

template <typename T>
Expand Down
1 change: 0 additions & 1 deletion include/recpp/processors/inl/Map.inl
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ template <typename T, typename R>
void recpp::processors::Map<T, R>::Impl::onComplete()
{
m_subscriber.onComplete();
m_subscriber = {};
}

template <typename T, typename R>
Expand Down
1 change: 0 additions & 1 deletion include/recpp/processors/inl/Max.inl
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ void recpp::processors::Max<T, R>::Impl::onComplete()
{
m_subscriber.onNext(m_max ? *m_max : T{});
m_subscriber.onComplete();
m_subscriber = {};
}

template <typename T, typename R>
Expand Down
1 change: 0 additions & 1 deletion include/recpp/processors/inl/Min.inl
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ void recpp::processors::Min<T, R>::Impl::onComplete()
{
m_subscriber.onNext(m_min ? *m_min : T{});
m_subscriber.onComplete();
m_subscriber = {};
}

template <typename T, typename R>
Expand Down
1 change: 0 additions & 1 deletion include/recpp/processors/inl/NoneOf.inl
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ void recpp::processors::NoneOf<T, R>::Impl::onComplete()
{
m_subscriber.onNext(!m_gotTrue);
m_subscriber.onComplete();
m_subscriber = {};
}

template <typename T, typename R>
Expand Down
1 change: 0 additions & 1 deletion include/recpp/processors/inl/Reduce.inl
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ void recpp::processors::Reduce<T, R>::Impl::onComplete()
{
m_subscriber.onNext(m_current);
m_subscriber.onComplete();
m_subscriber = {};
}

template <typename T, typename R>
Expand Down
1 change: 0 additions & 1 deletion include/recpp/processors/inl/SubscribeOn.inl
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ template <typename T>
void recpp::processors::SubscribeOn<T>::Impl::onComplete()
{
m_subscriber.onComplete();
m_subscriber = {};
}

template <typename T>
Expand Down
1 change: 0 additions & 1 deletion include/recpp/processors/inl/Tap.inl
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ void recpp::processors::Tap<T>::Impl::onComplete()
if (m_onCompleteMethod)
m_onCompleteMethod();
m_subscriber.onComplete();
m_subscriber = {};
}

template <typename T>
Expand Down
2 changes: 2 additions & 0 deletions include/recpp/subscribers/inl/DefaultSubscriber.inl
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,13 @@ void recpp::subscribers::DefaultSubscriber<T>::Impl::onError(const std::exceptio
{
if (m_onErrorMethod)
m_onErrorMethod(error);
m_subscription = {};
}

template <typename T>
void recpp::subscribers::DefaultSubscriber<T>::Impl::onComplete()
{
if (m_onCompleteMethod)
m_onCompleteMethod();
m_subscription = {};
}
2 changes: 0 additions & 2 deletions include/recpp/subscribers/inl/MaybeSubscriber.inl
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ void recpp::subscribers::MaybeSubscriber<T>::onNext(const T &value)
m_ended = true;
m_subscriber.onNext(value);
m_subscriber.onComplete();
m_subscriber = {};
}
}

Expand Down Expand Up @@ -44,6 +43,5 @@ void recpp::subscribers::MaybeSubscriber<T>::onComplete()
{
m_ended = true;
m_subscriber.onComplete();
m_subscriber = {};
}
}
1 change: 0 additions & 1 deletion include/recpp/subscribers/inl/ObservableSubscriber.inl
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,5 @@ void recpp::subscribers::ObservableSubscriber<T>::onComplete()
{
m_ended = true;
m_subscriber.onComplete();
m_subscriber = {};
}
}
1 change: 0 additions & 1 deletion include/recpp/subscribers/inl/SingleSubscriber.inl
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ void recpp::subscribers::SingleSubscriber<T>::onNext(const T &value)
m_ended = true;
m_subscriber.onNext(value);
m_subscriber.onComplete();
m_subscriber = {};
}
}

Expand Down
2 changes: 0 additions & 2 deletions include/recpp/subscriptions/inl/EmptySubscription.inl
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ void recpp::subscriptions::EmptySubscription<T>::Impl::request(std::size_t count
{
m_completed = true;
m_subscriber.onComplete();
m_subscriber = {};
}
}

Expand All @@ -32,6 +31,5 @@ void recpp::subscriptions::EmptySubscription<T>::Impl::cancel()
{
m_completed = true;
m_subscriber.onComplete();
m_subscriber = {};
}
}
2 changes: 0 additions & 2 deletions include/recpp/subscriptions/inl/JustSubscription.inl
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,10 @@ void recpp::subscriptions::JustSubscription<T>::Impl::request(std::size_t count)
m_completed = true;
m_subscriber.onNext(m_value);
m_subscriber.onComplete();
m_subscriber = {};
}

template <typename T>
void recpp::subscriptions::JustSubscription<T>::Impl::cancel()
{
m_subscriber.onComplete();
m_subscriber = {};
}
6 changes: 0 additions & 6 deletions include/recpp/subscriptions/inl/MergeSubscription.inl
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,7 @@ recpp::subscriptions::MergeSubscription<T, P>::Impl::Impl(const rscpp::Subscribe
{
m_completed = true;
if (!m_canceled)
{
m_subscriber.onComplete();
m_subscriber = {};
}
}
});
publisherSource.subscribe(sourceSubscriber);
Expand Down Expand Up @@ -133,10 +130,7 @@ void recpp::subscriptions::MergeSubscription<T, P>::Impl::onPublisherComplete(st
{
m_completed = true;
if (!m_canceled)
{
m_subscriber.onComplete();
m_subscriber = {};
}
}
}

Expand Down
1 change: 0 additions & 1 deletion include/recpp/subscriptions/inl/NeverSubscription.inl
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,4 @@ template <typename T>
void recpp::subscriptions::NeverSubscription<T>::Impl::cancel()
{
m_subscriber.onComplete();
m_subscriber = {};
}
2 changes: 0 additions & 2 deletions include/recpp/subscriptions/inl/RangeSubscription.inl
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ void recpp::subscriptions::RangeSubscription<T, I>::Impl::request(std::size_t co
{
m_completed = true;
m_subscriber.onComplete();
m_subscriber = {};
break;
}
const auto it = m_current++;
Expand All @@ -38,5 +37,4 @@ void recpp::subscriptions::RangeSubscription<T, I>::Impl::cancel()
{
m_canceled = true;
m_subscriber.onComplete();
m_subscriber = {};
}
1 change: 0 additions & 1 deletion src/subscribers/CompletableSubscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,5 @@ void recpp::subscribers::CompletableSubscriber::onComplete()
{
m_ended = true;
m_subscriber.onComplete();
m_subscriber = {};
}
}
1 change: 1 addition & 0 deletions src/subscriptions/ForwardSubscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ void ForwardSubscription::Impl::request(size_t count)
void ForwardSubscription::Impl::cancel()
{
m_subscription.cancel();
m_subscription = {};
}