Skip to content

Commit a049272

Browse files
committed
refactor: rename spsc module to mpsc
1 parent 1a630b0 commit a049272

File tree

7 files changed

+57
-57
lines changed

7 files changed

+57
-57
lines changed

examples/compute.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::{
77
use anyhow::bail;
88
use futures_buffered::BufferedStreamExt;
99
use irpc::{
10-
channel::{oneshot, spsc},
10+
channel::{oneshot, mpsc},
1111
rpc::{listen, Handler},
1212
rpc_requests,
1313
util::{make_client_endpoint, make_server_endpoint},
@@ -61,11 +61,11 @@ enum ComputeRequest {
6161
enum ComputeProtocol {
6262
#[rpc(tx=oneshot::Sender<u128>)]
6363
Sqr(Sqr),
64-
#[rpc(rx=spsc::Receiver<i64>, tx=oneshot::Sender<i64>)]
64+
#[rpc(rx=mpsc::Receiver<i64>, tx=oneshot::Sender<i64>)]
6565
Sum(Sum),
66-
#[rpc(tx=spsc::Sender<u64>)]
66+
#[rpc(tx=mpsc::Sender<u64>)]
6767
Fibonacci(Fibonacci),
68-
#[rpc(rx=spsc::Receiver<u64>, tx=spsc::Sender<u64>)]
68+
#[rpc(rx=mpsc::Receiver<u64>, tx=mpsc::Sender<u64>)]
6969
Multiply(Multiply),
7070
}
7171

@@ -200,11 +200,11 @@ impl ComputeApi {
200200
}
201201
}
202202

203-
pub async fn sum(&self) -> anyhow::Result<(spsc::Sender<i64>, oneshot::Receiver<i64>)> {
203+
pub async fn sum(&self) -> anyhow::Result<(mpsc::Sender<i64>, oneshot::Receiver<i64>)> {
204204
let msg = Sum;
205205
match self.inner.request().await? {
206206
Request::Local(request) => {
207-
let (num_tx, num_rx) = spsc::channel(10);
207+
let (num_tx, num_rx) = mpsc::channel(10);
208208
let (sum_tx, sum_rx) = oneshot::channel();
209209
request.send((msg, sum_tx, num_rx)).await?;
210210
Ok((num_tx, sum_rx))
@@ -216,11 +216,11 @@ impl ComputeApi {
216216
}
217217
}
218218

219-
pub async fn fibonacci(&self, max: u64) -> anyhow::Result<spsc::Receiver<u64>> {
219+
pub async fn fibonacci(&self, max: u64) -> anyhow::Result<mpsc::Receiver<u64>> {
220220
let msg = Fibonacci { max };
221221
match self.inner.request().await? {
222222
Request::Local(request) => {
223-
let (tx, rx) = spsc::channel(128);
223+
let (tx, rx) = mpsc::channel(128);
224224
request.send((msg, tx)).await?;
225225
Ok(rx)
226226
}
@@ -234,12 +234,12 @@ impl ComputeApi {
234234
pub async fn multiply(
235235
&self,
236236
initial: u64,
237-
) -> anyhow::Result<(spsc::Sender<u64>, spsc::Receiver<u64>)> {
237+
) -> anyhow::Result<(mpsc::Sender<u64>, mpsc::Receiver<u64>)> {
238238
let msg = Multiply { initial };
239239
match self.inner.request().await? {
240240
Request::Local(request) => {
241-
let (in_tx, in_rx) = spsc::channel(128);
242-
let (out_tx, out_rx) = spsc::channel(128);
241+
let (in_tx, in_rx) = mpsc::channel(128);
242+
let (out_tx, out_rx) = mpsc::channel(128);
243243
request.send((msg, out_tx, in_rx)).await?;
244244
Ok((in_tx, out_rx))
245245
}

examples/derive.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::{
66

77
use anyhow::{Context, Result};
88
use irpc::{
9-
channel::{oneshot, spsc},
9+
channel::{oneshot, mpsc},
1010
rpc::Handler,
1111
rpc_requests,
1212
util::{make_client_endpoint, make_server_endpoint},
@@ -55,9 +55,9 @@ enum StorageProtocol {
5555
Get(Get),
5656
#[rpc(tx=oneshot::Sender<()>)]
5757
Set(Set),
58-
#[rpc(tx=oneshot::Sender<u64>, rx=spsc::Receiver<(String, String)>)]
58+
#[rpc(tx=oneshot::Sender<u64>, rx=mpsc::Receiver<(String, String)>)]
5959
SetMany(SetMany),
60-
#[rpc(tx=spsc::Sender<String>)]
60+
#[rpc(tx=mpsc::Sender<String>)]
6161
List(List),
6262
}
6363

@@ -152,7 +152,7 @@ impl StorageApi {
152152
self.inner.rpc(Get { key }).await
153153
}
154154

155-
pub async fn list(&self) -> irpc::Result<spsc::Receiver<String>> {
155+
pub async fn list(&self) -> irpc::Result<mpsc::Receiver<String>> {
156156
self.inner.server_streaming(List, 16).await
157157
}
158158

@@ -162,7 +162,7 @@ impl StorageApi {
162162

163163
pub async fn set_many(
164164
&self,
165-
) -> irpc::Result<(spsc::Sender<(String, String)>, oneshot::Receiver<u64>)> {
165+
) -> irpc::Result<(mpsc::Sender<(String, String)>, oneshot::Receiver<u64>)> {
166166
self.inner.client_streaming(SetMany, 4).await
167167
}
168168
}

examples/storage.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::{
66

77
use anyhow::bail;
88
use irpc::{
9-
channel::{none::NoReceiver, oneshot, spsc},
9+
channel::{none::NoReceiver, oneshot, mpsc},
1010
rpc::{listen, Handler},
1111
util::{make_client_endpoint, make_server_endpoint},
1212
Channels, Client, LocalSender, Request, Service, WithChannels,
@@ -36,7 +36,7 @@ struct List;
3636

3737
impl Channels<StorageService> for List {
3838
type Rx = NoReceiver;
39-
type Tx = spsc::Sender<String>;
39+
type Tx = mpsc::Sender<String>;
4040
}
4141

4242
#[derive(Debug, Serialize, Deserialize)]
@@ -157,11 +157,11 @@ impl StorageApi {
157157
}
158158
}
159159

160-
pub async fn list(&self) -> anyhow::Result<spsc::Receiver<String>> {
160+
pub async fn list(&self) -> anyhow::Result<mpsc::Receiver<String>> {
161161
let msg = List;
162162
match self.inner.request().await? {
163163
Request::Local(request) => {
164-
let (tx, rx) = spsc::channel(10);
164+
let (tx, rx) = mpsc::channel(10);
165165
request.send((msg, tx)).await?;
166166
Ok(rx)
167167
}

irpc-iroh/examples/auth.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ mod storage {
7272
Endpoint,
7373
};
7474
use irpc::{
75-
channel::{oneshot, spsc},
75+
channel::{oneshot, mpsc},
7676
Client, Service, WithChannels,
7777
};
7878
// Import the macro
@@ -122,9 +122,9 @@ mod storage {
122122
Get(Get),
123123
#[rpc(tx=oneshot::Sender<()>)]
124124
Set(Set),
125-
#[rpc(tx=oneshot::Sender<u64>, rx=spsc::Receiver<(String, String)>)]
125+
#[rpc(tx=oneshot::Sender<u64>, rx=mpsc::Receiver<(String, String)>)]
126126
SetMany(SetMany),
127-
#[rpc(tx=spsc::Sender<String>)]
127+
#[rpc(tx=mpsc::Sender<String>)]
128128
List(List),
129129
}
130130

@@ -265,7 +265,7 @@ mod storage {
265265
self.inner.rpc(Get { key }).await
266266
}
267267

268-
pub async fn list(&self) -> Result<spsc::Receiver<String>, irpc::Error> {
268+
pub async fn list(&self) -> Result<mpsc::Receiver<String>, irpc::Error> {
269269
self.inner.server_streaming(List, 10).await
270270
}
271271

irpc-iroh/examples/derive.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ mod storage {
6060
use anyhow::{Context, Result};
6161
use iroh::{protocol::ProtocolHandler, Endpoint};
6262
use irpc::{
63-
channel::{oneshot, spsc},
63+
channel::{oneshot, mpsc},
6464
rpc::Handler,
6565
rpc_requests, Client, LocalSender, Service, WithChannels,
6666
};
@@ -97,7 +97,7 @@ mod storage {
9797
Get(Get),
9898
#[rpc(tx=oneshot::Sender<()>)]
9999
Set(Set),
100-
#[rpc(tx=spsc::Sender<String>)]
100+
#[rpc(tx=mpsc::Sender<String>)]
101101
List(List),
102102
}
103103

@@ -190,7 +190,7 @@ mod storage {
190190
self.inner.rpc(Get { key }).await
191191
}
192192

193-
pub async fn list(&self) -> irpc::Result<spsc::Receiver<String>> {
193+
pub async fn list(&self) -> irpc::Result<mpsc::Receiver<String>> {
194194
self.inner.server_streaming(List, 10).await
195195
}
196196

src/lib.rs

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -127,9 +127,9 @@ pub trait Receiver: Debug + Sealed {}
127127

128128
/// Trait to specify channels for a message and service
129129
pub trait Channels<S: Service> {
130-
/// The sender type, can be either spsc, oneshot or none
130+
/// The sender type, can be either mpsc, oneshot or none
131131
type Tx: Sender;
132-
/// The receiver type, can be either spsc, oneshot or none
132+
/// The receiver type, can be either mpsc, oneshot or none
133133
///
134134
/// For many services, the receiver is not needed, so it can be set to [`NoReceiver`].
135135
type Rx: Receiver;
@@ -315,14 +315,14 @@ pub mod channel {
315315

316316
/// SPSC channel, similar to tokio's mpsc channel
317317
///
318-
/// For the rpc case, the send side can not be cloned, hence spsc instead of mpsc.
319-
pub mod spsc {
318+
/// For the rpc case, the send side can not be cloned, hence mpsc instead of mpsc.
319+
pub mod mpsc {
320320
use std::{fmt::Debug, future::Future, io, pin::Pin, sync::Arc};
321321

322322
use super::{RecvError, SendError};
323323
use crate::RpcMessage;
324324

325-
/// Create a local spsc sender and receiver pair, with the given buffer size.
325+
/// Create a local mpsc sender and receiver pair, with the given buffer size.
326326
///
327327
/// This is currently using a tokio channel pair internally.
328328
pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
@@ -582,7 +582,7 @@ pub mod channel {
582582
impl crate::Receiver for NoReceiver {}
583583
}
584584

585-
/// Error when sending a oneshot or spsc message. For local communication,
585+
/// Error when sending a oneshot or mpsc message. For local communication,
586586
/// the only thing that can go wrong is that the receiver has been dropped.
587587
///
588588
/// For rpc communication, there can be any number of errors, so this is a
@@ -608,7 +608,7 @@ pub mod channel {
608608
}
609609
}
610610

611-
/// Error when receiving a oneshot or spsc message. For local communication,
611+
/// Error when receiving a oneshot or mpsc message. For local communication,
612612
/// the only thing that can go wrong is that the sender has been closed.
613613
///
614614
/// For rpc communication, there can be any number of errors, so this is a
@@ -871,24 +871,24 @@ impl<M, R, S> Client<M, R, S> {
871871
}
872872
}
873873

874-
/// Performs a request for which the server returns a spsc receiver.
874+
/// Performs a request for which the server returns a mpsc receiver.
875875
pub fn server_streaming<Req, Res>(
876876
&self,
877877
msg: Req,
878878
local_response_cap: usize,
879-
) -> impl Future<Output = Result<channel::spsc::Receiver<Res>>> + Send + 'static
879+
) -> impl Future<Output = Result<channel::mpsc::Receiver<Res>>> + Send + 'static
880880
where
881881
S: Service,
882882
M: From<WithChannels<Req, S>> + Send + Sync + Unpin + 'static,
883883
R: From<Req> + Serialize + Send + Sync + 'static,
884-
Req: Channels<S, Tx = channel::spsc::Sender<Res>, Rx = NoReceiver> + Send + 'static,
884+
Req: Channels<S, Tx = channel::mpsc::Sender<Res>, Rx = NoReceiver> + Send + 'static,
885885
Res: RpcMessage,
886886
{
887887
let request = self.request();
888888
async move {
889-
let recv: channel::spsc::Receiver<Res> = match request.await? {
889+
let recv: channel::mpsc::Receiver<Res> = match request.await? {
890890
Request::Local(request) => {
891-
let (tx, rx) = channel::spsc::channel(local_response_cap);
891+
let (tx, rx) = channel::mpsc::channel(local_response_cap);
892892
request.send((msg, tx)).await?;
893893
rx
894894
}
@@ -911,26 +911,26 @@ impl<M, R, S> Client<M, R, S> {
911911
local_update_cap: usize,
912912
) -> impl Future<
913913
Output = Result<(
914-
channel::spsc::Sender<Update>,
914+
channel::mpsc::Sender<Update>,
915915
channel::oneshot::Receiver<Res>,
916916
)>,
917917
>
918918
where
919919
S: Service,
920920
M: From<WithChannels<Req, S>> + Send + Sync + Unpin + 'static,
921921
R: From<Req> + Serialize + 'static,
922-
Req: Channels<S, Tx = channel::oneshot::Sender<Res>, Rx = channel::spsc::Receiver<Update>>,
922+
Req: Channels<S, Tx = channel::oneshot::Sender<Res>, Rx = channel::mpsc::Receiver<Update>>,
923923
Update: RpcMessage,
924924
Res: RpcMessage,
925925
{
926926
let request = self.request();
927927
async move {
928928
let (update_tx, res_rx): (
929-
channel::spsc::Sender<Update>,
929+
channel::mpsc::Sender<Update>,
930930
channel::oneshot::Receiver<Res>,
931931
) = match request.await? {
932932
Request::Local(request) => {
933-
let (req_tx, req_rx) = channel::spsc::channel(local_update_cap);
933+
let (req_tx, req_rx) = channel::mpsc::channel(local_update_cap);
934934
let (res_tx, res_rx) = channel::oneshot::channel();
935935
request.send((msg, res_tx, req_rx)).await?;
936936
(req_tx, res_rx)
@@ -947,32 +947,32 @@ impl<M, R, S> Client<M, R, S> {
947947
}
948948
}
949949

950-
/// Performs a request for which the client can send updates, and the server returns a spsc receiver.
950+
/// Performs a request for which the client can send updates, and the server returns a mpsc receiver.
951951
pub fn bidi_streaming<Req, Update, Res>(
952952
&self,
953953
msg: Req,
954954
local_update_cap: usize,
955955
local_response_cap: usize,
956-
) -> impl Future<Output = Result<(channel::spsc::Sender<Update>, channel::spsc::Receiver<Res>)>>
956+
) -> impl Future<Output = Result<(channel::mpsc::Sender<Update>, channel::mpsc::Receiver<Res>)>>
957957
+ Send
958958
+ 'static
959959
where
960960
S: Service,
961961
M: From<WithChannels<Req, S>> + Send + Sync + Unpin + 'static,
962962
R: From<Req> + Serialize + Send + 'static,
963-
Req: Channels<S, Tx = channel::spsc::Sender<Res>, Rx = channel::spsc::Receiver<Update>>
963+
Req: Channels<S, Tx = channel::mpsc::Sender<Res>, Rx = channel::mpsc::Receiver<Update>>
964964
+ Send
965965
+ 'static,
966966
Update: RpcMessage,
967967
Res: RpcMessage,
968968
{
969969
let request = self.request();
970970
async move {
971-
let (update_tx, res_rx): (channel::spsc::Sender<Update>, channel::spsc::Receiver<Res>) =
971+
let (update_tx, res_rx): (channel::mpsc::Sender<Update>, channel::mpsc::Receiver<Res>) =
972972
match request.await? {
973973
Request::Local(request) => {
974-
let (update_tx, update_rx) = channel::spsc::channel(local_update_cap);
975-
let (res_tx, res_rx) = channel::spsc::channel(local_response_cap);
974+
let (update_tx, update_rx) = channel::mpsc::channel(local_update_cap);
975+
let (res_tx, res_rx) = channel::mpsc::channel(local_response_cap);
976976
request.send((msg, res_tx, update_rx)).await?;
977977
(update_tx, res_rx)
978978
}
@@ -1120,7 +1120,7 @@ pub mod rpc {
11201120
channel::{
11211121
none::NoSender,
11221122
oneshot,
1123-
spsc::{self, DynReceiver, DynSender},
1123+
mpsc::{self, DynReceiver, DynSender},
11241124
RecvError, SendError,
11251125
},
11261126
util::{now_or_never, AsyncReadVarintExt, WriteVarintExt},
@@ -1290,9 +1290,9 @@ pub mod rpc {
12901290
}
12911291
}
12921292

1293-
impl<T: RpcMessage> From<quinn::RecvStream> for spsc::Receiver<T> {
1293+
impl<T: RpcMessage> From<quinn::RecvStream> for mpsc::Receiver<T> {
12941294
fn from(read: quinn::RecvStream) -> Self {
1295-
spsc::Receiver::Boxed(Box::new(QuinnReceiver {
1295+
mpsc::Receiver::Boxed(Box::new(QuinnReceiver {
12961296
recv: read,
12971297
_marker: PhantomData,
12981298
}))
@@ -1320,9 +1320,9 @@ pub mod rpc {
13201320
}
13211321
}
13221322

1323-
impl<T: RpcMessage> From<quinn::SendStream> for spsc::Sender<T> {
1323+
impl<T: RpcMessage> From<quinn::SendStream> for mpsc::Sender<T> {
13241324
fn from(write: quinn::SendStream) -> Self {
1325-
spsc::Sender::Boxed(Arc::new(QuinnSender(tokio::sync::Mutex::new(
1325+
mpsc::Sender::Boxed(Arc::new(QuinnSender(tokio::sync::Mutex::new(
13261326
QuinnSenderState::Open(QuinnSenderInner {
13271327
send: write,
13281328
buffer: SmallVec::new(),

0 commit comments

Comments
 (0)