Skip to content

Commit ab5c6bf

Browse files
committed
fix
1 parent 3f5b77b commit ab5c6bf

1 file changed

Lines changed: 40 additions & 35 deletions

File tree

src/runtime/output/protocol/pulsar/pulsar_protocol.rs

Lines changed: 40 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -43,48 +43,53 @@ impl OutputProtocol for PulsarOutputProtocol {
4343
fn send(&self, data: BufferOrEvent) -> Result<(), Box<dyn std::error::Error + Send>> {
4444
if let Some(payload) = data.into_buffer() {
4545
PULSAR_OUT_RT.with(|rt_cell| {
46-
PULSAR_PRODUCER.with(|producer_cell| -> Result<(), Box<dyn std::error::Error + Send>> {
47-
let mut rt_opt = rt_cell.borrow_mut();
48-
let mut producer_opt = producer_cell.borrow_mut();
46+
PULSAR_PRODUCER.with(
47+
|producer_cell| -> Result<(), Box<dyn std::error::Error + Send>> {
48+
let mut rt_opt = rt_cell.borrow_mut();
49+
let mut producer_opt = producer_cell.borrow_mut();
4950

50-
if producer_opt.is_none() {
51-
let rt = tokio::runtime::Runtime::new().map_err(|e| {
52-
Box::new(std::io::Error::other(e)) as Box<dyn std::error::Error + Send>
53-
})?;
54-
let url = self.config.url.clone();
55-
let topic = self.config.topic.clone();
56-
57-
let producer: Producer<TokioExecutor> = rt
58-
.block_on(async {
59-
let pulsar = Pulsar::builder(&url, TokioExecutor).build().await?;
60-
let producer = pulsar.producer().with_topic(&topic).build().await?;
61-
Result::<_, pulsar::Error>::Ok(producer)
62-
})
63-
.map_err(|e| {
51+
if producer_opt.is_none() {
52+
let rt = tokio::runtime::Runtime::new().map_err(|e| {
6453
Box::new(std::io::Error::other(e))
6554
as Box<dyn std::error::Error + Send>
6655
})?;
56+
let url = self.config.url.clone();
57+
let topic = self.config.topic.clone();
6758

68-
*rt_opt = Some(rt);
69-
*producer_opt = Some(producer);
70-
}
59+
let producer: Producer<TokioExecutor> = rt
60+
.block_on(async {
61+
let pulsar =
62+
Pulsar::builder(&url, TokioExecutor).build().await?;
63+
let producer =
64+
pulsar.producer().with_topic(&topic).build().await?;
65+
Result::<_, pulsar::Error>::Ok(producer)
66+
})
67+
.map_err(|e| {
68+
Box::new(std::io::Error::other(e))
69+
as Box<dyn std::error::Error + Send>
70+
})?;
7171

72-
let rt = rt_opt.as_ref().unwrap();
73-
let producer = producer_opt.as_mut().unwrap();
72+
*rt_opt = Some(rt);
73+
*producer_opt = Some(producer);
74+
}
7475

75-
rt.block_on(async {
76-
producer
77-
.create_message()
78-
.with_content(payload)
79-
.send_non_blocking()
80-
.await
81-
.map_err(|e| {
82-
Box::new(std::io::Error::other(e))
83-
as Box<dyn std::error::Error + Send>
84-
})
85-
})?;
86-
Ok(())
87-
})
76+
let rt = rt_opt.as_ref().unwrap();
77+
let producer = producer_opt.as_mut().unwrap();
78+
79+
rt.block_on(async {
80+
producer
81+
.create_message()
82+
.with_content(payload)
83+
.send_non_blocking()
84+
.await
85+
.map_err(|e| {
86+
Box::new(std::io::Error::other(e))
87+
as Box<dyn std::error::Error + Send>
88+
})
89+
})?;
90+
Ok(())
91+
},
92+
)
8893
})?;
8994
}
9095
Ok(())

0 commit comments

Comments
 (0)