diff --git a/.github/workflows/md-inc.yml b/.github/workflows/md-inc.yml index c1c6e7d..6882247 100644 --- a/.github/workflows/md-inc.yml +++ b/.github/workflows/md-inc.yml @@ -10,6 +10,6 @@ jobs: uses: actions/checkout@v2 - name: Update code in wiki - uses: filipdulic/markdown-include-wiki-action@v1.0.1 + uses: filipdulic/markdown-include-wiki-action@v1.0.2 with: WIKI_TOKEN: ${{ secrets.WIKI_TOKEN }} diff --git a/.github/workflows/tarpaulin.yml b/.github/workflows/tarpaulin.yml index ac867d9..ccf7d01 100644 --- a/.github/workflows/tarpaulin.yml +++ b/.github/workflows/tarpaulin.yml @@ -16,11 +16,11 @@ jobs: toolchain: stable override: true + - name: Install cargo-tarpaulin + run: cargo install cargo-tarpaulin --locked + - name: Run cargo-tarpaulin - uses: actions-rs/tarpaulin@v0.1 - with: - version: '0.9.0' - args: '--all-features -- --test-threads 1' + run: cargo tarpaulin --all-features -- --test-threads 1 - name: Upload to codecov.io uses: codecov/codecov-action@v1.0.2 @@ -28,7 +28,7 @@ jobs: token: ${{secrets.CODECOV_TOKEN}} - name: Archive code coverage results - uses: actions/upload-artifact@v1 + uses: actions/upload-artifact@v4 with: name: code-coverage-report path: cobertura.xml diff --git a/Cargo.toml b/Cargo.toml index 2107144..a41fa8f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,10 +17,9 @@ event-listener = "5.4.1" # conc = {version="0.5.1", optional = true} [dev-dependencies] -# rand = "0.10.0-rc.0" +rand = "0.10.0-rc.0" futures-test = "0.3.31" futures = {version = "0.3.31", features = ["thread-pool"]} -serial_test = "3.2.0" [features] default = ["arcswap", "rwlock"] diff --git a/tests/async_tests.rs b/tests/async_tests.rs index 7f0cca6..9922cc9 100644 --- a/tests/async_tests.rs +++ b/tests/async_tests.rs @@ -1,53 +1,49 @@ use bus_queue::flavors::arc_swap::async_bounded; -// use futures::{executor, pin_mut, task::Poll, task::SpawnExt, FutureExt, SinkExt, StreamExt}; -use futures::{FutureExt, SinkExt, pin_mut, task::Poll}; +use futures::{FutureExt, SinkExt, StreamExt, executor, pin_mut, task::Poll, task::SpawnExt}; use futures_test::task::noop_context; use futures_test::{assert_stream_done, assert_stream_next, assert_stream_pending}; -// use rand::Rng; +use rand::Rng; use std::sync::Arc; -// use std::time::Duration; - -// pool.spawn alternative -// pool.spawn(stream.forward(publisher).map(drop)).unwrap(); - -// #[test] -// fn test_subscriber_item_drop_related_to_ratio_of_timing() { -// const LEAD_IN_TIME: Duration = Duration::from_millis(10); -// const MIN_PUB_MS: u64 = 2; -// const MAX_PUB_MS: u64 = 10; -// const MIN_SUB_MULTIPLIER: u64 = 2; -// const MAX_SUB_MULTIPLIER: u64 = 10; -// const NUMBER_OF_GENERATED: usize = 1000; -// let mut rng = rand::thread_rng(); -// let pub_ms = rng.gen_range(MIN_PUB_MS, MAX_PUB_MS); -// let pub_time = Duration::from_millis(pub_ms); -// let sub_multiplier = rng.gen_range(MIN_SUB_MULTIPLIER, MAX_SUB_MULTIPLIER); -// let sub_time = Duration::from_millis(sub_multiplier * pub_ms); -// let pool = executor::ThreadPool::new().unwrap(); -// let (mut publisher, mut subscriber) = async_bounded::(1); -// pool.spawn(async move { -// std::thread::sleep(LEAD_IN_TIME); -// for i in 0usize..NUMBER_OF_GENERATED { -// std::thread::sleep(pub_time); -// publisher.send(i).await.unwrap() -// } -// }) -// .unwrap(); -// let vec: Vec = executor::block_on(async move { -// let mut vec = Vec::new(); -// loop { -// std::thread::sleep(sub_time); -// match subscriber.next().await { -// Some(item) => vec.push(*item), -// _ => return vec, -// } -// } -// }); -// assert!( -// (vec.len() >= (NUMBER_OF_GENERATED / (sub_multiplier as usize + 1usize))) -// && (vec.len() <= (NUMBER_OF_GENERATED / (sub_multiplier as usize - 1usize))) -// ) -// } +use std::time::Duration; + +#[test] +fn test_subscriber_item_drop_related_to_ratio_of_timing() { + const LEAD_IN_TIME: Duration = Duration::from_millis(10); + const MIN_PUB_MS: u64 = 2; + const MAX_PUB_MS: u64 = 10; + const MIN_SUB_MULTIPLIER: u64 = 2; + const MAX_SUB_MULTIPLIER: u64 = 10; + const NUMBER_OF_GENERATED: usize = 1000; + let mut rng = rand::rng(); + let pub_ms = rng.random_range(MIN_PUB_MS..=MAX_PUB_MS); + let pub_time = Duration::from_millis(pub_ms); + let sub_multiplier = rng.random_range(MIN_SUB_MULTIPLIER..=MAX_SUB_MULTIPLIER); + let sub_time = Duration::from_millis(sub_multiplier * pub_ms); + let pool = executor::ThreadPool::new().unwrap(); + let (mut publisher, mut subscriber) = async_bounded::(1); + pool.spawn(async move { + std::thread::sleep(LEAD_IN_TIME); + for i in 0usize..NUMBER_OF_GENERATED { + std::thread::sleep(pub_time); + publisher.send(i).await.unwrap() + } + }) + .unwrap(); + let vec: Vec = executor::block_on(async move { + let mut vec = Vec::new(); + loop { + std::thread::sleep(sub_time); + match subscriber.next().await { + Some(item) => vec.push(*item), + _ => return vec, + } + } + }); + assert!( + (vec.len() >= (NUMBER_OF_GENERATED / (sub_multiplier as usize + 1usize))) + && (vec.len() <= (NUMBER_OF_GENERATED / (sub_multiplier as usize - 1usize))) + ) +} #[test] fn subscriber_is_in_pending_state_before_first_data_is_published() { let (_publisher, subscriber) = async_bounded::(1);