Skip to content

Commit df80387

Browse files
committed
update code format
1 parent fd03113 commit df80387

2 files changed

Lines changed: 27 additions & 15 deletions

File tree

src/runtime/input/protocol/pulsar/pulsar_protocol.rs

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,17 @@ impl InputProtocol for PulsarProtocol {
5454
let mut consumer_opt = consumer_cell.borrow_mut();
5555

5656
if consumer_opt.is_none() {
57-
let rt = tokio::runtime::Runtime::new()
58-
.map_err(|e| Box::new(std::io::Error::other(e)) as Box<dyn std::error::Error + Send>)?;
57+
let rt = tokio::runtime::Runtime::new().map_err(|e| {
58+
Box::new(std::io::Error::other(e)) as Box<dyn std::error::Error + Send>
59+
})?;
5960
let url = self.config.url.clone();
6061
let topic = self.config.topic.clone();
6162
let subscription = self.config.subscription.clone();
62-
let sub_type = self.config.subscription_type.as_deref().unwrap_or("Exclusive");
63+
let sub_type = self
64+
.config
65+
.subscription_type
66+
.as_deref()
67+
.unwrap_or("Exclusive");
6368
let sub_type_enum = match sub_type.to_lowercase().as_str() {
6469
"shared" => SubType::Shared,
6570
"key_shared" => SubType::KeyShared,
@@ -78,7 +83,9 @@ impl InputProtocol for PulsarProtocol {
7883
let consumer = builder.build().await?;
7984
Result::<_, pulsar::Error>::Ok(consumer)
8085
})
81-
.map_err(|e| Box::new(std::io::Error::other(e)) as Box<dyn std::error::Error + Send>)?;
86+
.map_err(|e| {
87+
Box::new(std::io::Error::other(e)) as Box<dyn std::error::Error + Send>
88+
})?;
8289

8390
*rt_opt = Some(rt);
8491
*consumer_opt = Some(consumer);
@@ -93,7 +100,9 @@ impl InputProtocol for PulsarProtocol {
93100
let next_fut = consumer.next();
94101
match tokio::time::timeout(Duration::from_millis(timeout_ms), next_fut).await {
95102
Ok(Some(Ok(msg))) => {
96-
let payload = msg.deserialize().unwrap_or_else(|_| msg.payload.data.clone());
103+
let payload = msg
104+
.deserialize()
105+
.unwrap_or_else(|_| msg.payload.data.clone());
97106
let _ = consumer.ack(&msg).await;
98107
Some(Ok(payload))
99108
}
@@ -109,7 +118,9 @@ impl InputProtocol for PulsarProtocol {
109118
false,
110119
false,
111120
))),
112-
Some(Err(e)) => Err(Box::new(std::io::Error::other(e)) as Box<dyn std::error::Error + Send>),
121+
Some(Err(e)) => {
122+
Err(Box::new(std::io::Error::other(e)) as Box<dyn std::error::Error + Send>)
123+
}
113124
None => Ok(None),
114125
}
115126
})

src/runtime/output/protocol/pulsar/pulsar_protocol.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -48,22 +48,22 @@ impl OutputProtocol for PulsarOutputProtocol {
4848
let mut producer_opt = producer_cell.borrow_mut();
4949

5050
if producer_opt.is_none() {
51-
let rt = tokio::runtime::Runtime::new()
52-
.map_err(|e| Box::new(std::io::Error::other(e)) as Box<dyn std::error::Error + Send>)?;
51+
let rt = tokio::runtime::Runtime::new().map_err(|e| {
52+
Box::new(std::io::Error::other(e)) as Box<dyn std::error::Error + Send>
53+
})?;
5354
let url = self.config.url.clone();
5455
let topic = self.config.topic.clone();
5556

5657
let producer: Producer<TokioExecutor> = rt
5758
.block_on(async {
5859
let pulsar = Pulsar::builder(&url, TokioExecutor).build().await?;
59-
let producer = pulsar
60-
.producer()
61-
.with_topic(&topic)
62-
.build()
63-
.await?;
60+
let producer = pulsar.producer().with_topic(&topic).build().await?;
6461
Result::<_, pulsar::Error>::Ok(producer)
6562
})
66-
.map_err(|e| Box::new(std::io::Error::other(e)) as Box<dyn std::error::Error + Send>)?;
63+
.map_err(|e| {
64+
Box::new(std::io::Error::other(e))
65+
as Box<dyn std::error::Error + Send>
66+
})?;
6767

6868
*rt_opt = Some(rt);
6969
*producer_opt = Some(producer);
@@ -79,7 +79,8 @@ impl OutputProtocol for PulsarOutputProtocol {
7979
.send()
8080
.await
8181
.map_err(|e| {
82-
Box::new(std::io::Error::other(e)) as Box<dyn std::error::Error + Send>
82+
Box::new(std::io::Error::other(e))
83+
as Box<dyn std::error::Error + Send>
8384
})
8485
})?;
8586
})

0 commit comments

Comments
 (0)