Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions include/recpp/processors/Take.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#pragma once

#include <rscpp/Processor.h>

#include <recpp/subscriptions/FilterSubscription.h>

#include <atomic>
#include <functional>
#include <mutex>
#include <vector>

namespace recpp::processors
{
/**
* @class Take Take.h <recpp/processors/Take.h>
* @brief {@link rscpp::Processor} that will subscribe to a given {@link rscpp::Publisher} and emits only the first count items emitted.
*
* @tparam T The type of element signaled to the {@link rscpp::Subscriber} and signaled from the {@link rscpp::Publisher}.
*/
template <typename T>
class Take : public rscpp::Processor<T, T>
{
class Impl : public rscpp::Processor<T, T>
{
public:
explicit Impl(rscpp::Processor<T, T> &parent, const rscpp::Publisher<T> &publisher, std::size_t count);

void onSubscribe(rscpp::Subscription &subscription) override;

void onNext(const T &value) override;

void onError(const std::exception_ptr &error) override;

void onComplete() override;

void subscribe(rscpp::Subscriber<T> &subscriber) override;

private:
rscpp::Processor<T, T> &m_parent;
rscpp::Publisher<T> m_publisher;
std::vector<recpp::subscriptions::FilterSubscription> m_subscriptions;
rscpp::Subscriber<T> m_subscriber;
std::mutex m_mutex;
std::size_t m_count = 0;
};

public:
/**
* @brief Construct a new {@link Take} instance.
*
* @param publisher The source {@link rscpp::Publisher} the {@link Take} {@link rscpp::Processor} subscribes to and filter the items.
* @param count The maximum number of items to emit.
*/
explicit Take(const rscpp::Publisher<T> &publisher, std::size_t count);
};
} // namespace recpp::processors

#include <recpp/processors/inl/Take.inl>
60 changes: 60 additions & 0 deletions include/recpp/processors/inl/Take.inl
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#pragma once

template <typename T>
recpp::processors::Take<T>::Take(const rscpp::Publisher<T> &publisher, std::size_t count)
: rscpp::Processor<T, T>(std::make_shared<Impl>(*this, publisher, count))
{
}

template <typename T>
recpp::processors::Take<T>::Impl::Impl(rscpp::Processor<T, T> &parent, const rscpp::Publisher<T> &publisher, std::size_t count)
: m_parent(parent)
, m_publisher(publisher)
, m_count(count)
{
}

template <typename T>
void recpp::processors::Take<T>::Impl::onSubscribe(rscpp::Subscription &subscription)
{
auto filterSubscription = recpp::subscriptions::FilterSubscription(subscription);
m_subscriptions.push_back(filterSubscription);
m_subscriber.onSubscribe(filterSubscription);
}

template <typename T>
void recpp::processors::Take<T>::Impl::onNext(const T &value)
{
if (m_count == 0)
return;

m_count--;
m_subscriber.onNext(value);
for (auto &subscription : m_subscriptions)
subscription.onNext(false);
if (m_count == 0)
{
for (auto &subscription : m_subscriptions)
subscription.cancel();
m_subscriber.onComplete();
}
}

template <typename T>
void recpp::processors::Take<T>::Impl::onError(const std::exception_ptr &error)
{
m_subscriber.onError(error);
}

template <typename T>
void recpp::processors::Take<T>::Impl::onComplete()
{
m_subscriber.onComplete();
}

template <typename T>
void recpp::processors::Take<T>::Impl::subscribe(rscpp::Subscriber<T> &subscriber)
{
m_subscriber = subscriber;
m_publisher.subscribe(m_parent);
}
8 changes: 8 additions & 0 deletions include/recpp/rx/Observable.h
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,14 @@ namespace recpp::rx
*/
Observable<T> filter(const std::function<bool(const T & /* value */)> &method);

/**
* @brief Emits only the first count items emitted.
*
* @param count The maximum number of items to emit.
* @return The new {@link Observable} instance.
*/
Observable<T> take(std::size_t count);

/**
* Convert this {@link Observable} into a {@link Completable} by discarding all contained value.
*
Expand Down
7 changes: 7 additions & 0 deletions include/recpp/rx/inl/Observable.inl
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <recpp/processors/ObserveOn.h>
#include <recpp/processors/Reduce.h>
#include <recpp/processors/SubscribeOn.h>
#include <recpp/processors/Take.h>
#include <recpp/processors/Tap.h>
#include <recpp/publishers/CreatePublisher.h>
#include <recpp/publishers/DeferPublisher.h>
Expand Down Expand Up @@ -126,6 +127,12 @@ recpp::rx::Observable<T> recpp::rx::Observable<T>::filter(const std::function<bo
return Observable<T>(std::make_shared<processors::Filter<T>>(*this, method));
}

template <typename T>
recpp::rx::Observable<T> recpp::rx::Observable<T>::take(std::size_t count)
{
return Observable<T>(std::make_shared<processors::Take<T>>(*this, count));
}

template <typename T>
recpp::rx::Completable recpp::rx::Observable<T>::ignoreElements()
{
Expand Down
80 changes: 80 additions & 0 deletions tests/rx/Observable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,86 @@ TEST_F(ObservableFilter, checkDoNotCompleteOnError)
});
}

class ObservableTake : public EventLoopBasedTest
{
protected:
Observable<int> infiniteObservable()
{
return Observable<int>::interval(chrono::milliseconds(1), *m_eventLoop);
}
};

TEST_F(ObservableTake, checkEmitedValues)
{
bool completed = false;
const vector<int> expected = {0, 1, 2};
vector<int> values;
infiniteObservable() //
.take(3)
.subscribe(
[&values](const auto value)
{
values.push_back(value);
},
[](const auto &) {},
[this, &values, &expected, &completed]()
{
EXPECT_EQ(values, expected);
completed = true;
m_eventLoop->stop();
});
m_eventLoop->run();
EXPECT_TRUE(completed);
}

TEST_F(ObservableTake, checkNoErrorIsEmited)
{
infiniteObservable() //
.take(3)
.subscribe([](const auto) {},
[](const auto &)
{
ADD_FAILURE();
},
[this]()
{
m_eventLoop->stop();
});
m_eventLoop->run();
}

TEST_F(ObservableTake, checkErrorsAreForwarded)
{
bool gotError = false;
Observable<int>::error(runtime_error(runtimeErrorMessage.data())) //
.take(3)
.subscribe([](const auto) {},
[&gotError](const auto &exception)
{
gotError = true;
try
{
rethrow_exception(exception);
}
catch (runtime_error &runtimeError)
{
EXPECT_THAT(runtimeError.what(), testing::StrEq(runtimeErrorMessage));
}
});
EXPECT_TRUE(gotError);
}

TEST_F(ObservableTake, checkDoNotCompleteOnError)
{
Observable<int>::error(runtime_error(runtimeErrorMessage.data())) //
.take(3)
.subscribe([](const auto) {}, [](const auto &) {},
[]()
{
ADD_FAILURE();
});
}

class ObservableIgnoreElements : public testing::Test
{
};
Expand Down