Skip to content

Commit fd03113

Browse files
committed
feat(mq): add Pulsar input and output support
Made-with: Cursor
1 parent f58af82 commit fd03113

12 files changed

Lines changed: 418 additions & 0 deletions

File tree

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ async-trait = "0.1"
3636
num_cpus = "1.0"
3737
protocol = { path = "./protocol" }
3838
rdkafka = { version = "0.38", features = ["cmake-build", "ssl", "gssapi"] }
39+
pulsar = { version = "6", default-features = false, features = ["tokio-runtime"] }
40+
futures = "0.3"
3941
crossbeam-channel = "0.5"
4042
pest = "2.7"
4143
pest_derive = "2.7"

src/runtime/input/input_provider.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,43 @@ impl InputProvider {
105105
runtime,
106106
)))
107107
}
108+
InputConfig::Pulsar {
109+
url,
110+
topic,
111+
subscription,
112+
subscription_type,
113+
extra,
114+
runtime: _,
115+
} => {
116+
use crate::runtime::input::InputRunner;
117+
use crate::runtime::input::protocol::pulsar::{PulsarConfig, PulsarProtocol};
118+
119+
if url.is_empty() {
120+
return Err(Box::new(std::io::Error::new(
121+
std::io::ErrorKind::InvalidData,
122+
format!(
123+
"Invalid pulsar url in input config (group #{}): empty (topic: {})",
124+
group_idx + 1,
125+
topic
126+
),
127+
)) as Box<dyn std::error::Error + Send>);
128+
}
129+
130+
let pulsar_config = PulsarConfig::new(
131+
url.clone(),
132+
topic.clone(),
133+
subscription.clone(),
134+
subscription_type.clone(),
135+
extra.clone(),
136+
);
137+
let runtime = input_config.input_runtime_config();
138+
Ok(Box::new(InputRunner::new(
139+
PulsarProtocol::new(pulsar_config),
140+
group_idx,
141+
input_idx,
142+
runtime,
143+
)))
144+
}
108145
}
109146
}
110147
}

src/runtime/input/protocol/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,4 @@
1111
// limitations under the License.
1212

1313
pub mod kafka;
14+
pub mod pulsar;
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
// Licensed under the Apache License, Version 2.0 (the "License");
2+
// you may not use this file except in compliance with the License.
3+
// You may obtain a copy of the License at
4+
//
5+
// http://www.apache.org/licenses/LICENSE-2.0
6+
//
7+
// Unless required by applicable law or agreed to in writing, software
8+
// distributed under the License is distributed on an "AS IS" BASIS,
9+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
// See the License for the specific language governing permissions and
11+
// limitations under the License.
12+
13+
use std::collections::HashMap;
14+
15+
/// PulsarConfig - Pulsar consumer configuration
16+
#[derive(Debug, Clone)]
17+
pub struct PulsarConfig {
18+
pub url: String,
19+
pub topic: String,
20+
pub subscription: String,
21+
pub subscription_type: Option<String>,
22+
pub properties: HashMap<String, String>,
23+
}
24+
25+
impl PulsarConfig {
26+
pub fn new(
27+
url: String,
28+
topic: String,
29+
subscription: String,
30+
subscription_type: Option<String>,
31+
properties: HashMap<String, String>,
32+
) -> Self {
33+
Self {
34+
url,
35+
topic,
36+
subscription,
37+
subscription_type,
38+
properties,
39+
}
40+
}
41+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
// Licensed under the Apache License, Version 2.0 (the "License");
2+
// you may not use this file except in compliance with the License.
3+
// You may obtain a copy of the License at
4+
//
5+
// http://www.apache.org/licenses/LICENSE-2.0
6+
//
7+
// Unless required by applicable law or agreed to in writing, software
8+
// distributed under the License is distributed on an "AS IS" BASIS,
9+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
// See the License for the specific language governing permissions and
11+
// limitations under the License.
12+
13+
pub mod config;
14+
pub mod pulsar_protocol;
15+
16+
pub use config::PulsarConfig;
17+
pub use pulsar_protocol::PulsarProtocol;
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
// Licensed under the Apache License, Version 2.0 (the "License");
2+
// you may not use this file except in compliance with the License.
3+
// You may obtain a copy of the License at
4+
//
5+
// http://www.apache.org/licenses/LICENSE-2.0
6+
//
7+
// Unless required by applicable law or agreed to in writing, software
8+
// distributed under the License is distributed on an "AS IS" BASIS,
9+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
// See the License for the specific language governing permissions and
11+
// limitations under the License.
12+
13+
use super::config::PulsarConfig;
14+
use crate::runtime::buffer_and_event::BufferOrEvent;
15+
use crate::runtime::input::input_protocol::InputProtocol;
16+
use futures::StreamExt;
17+
use pulsar::consumer::SubType;
18+
use pulsar::{Consumer, Pulsar, TokioExecutor};
19+
use std::cell::RefCell;
20+
use std::time::Duration;
21+
22+
thread_local! {
23+
static PULSAR_RT: RefCell<Option<tokio::runtime::Runtime>> = RefCell::new(None);
24+
static PULSAR_CONSUMER: RefCell<Option<Consumer<Vec<u8>, TokioExecutor>>> = RefCell::new(None);
25+
}
26+
27+
pub struct PulsarProtocol {
28+
config: PulsarConfig,
29+
}
30+
31+
impl PulsarProtocol {
32+
pub fn new(config: PulsarConfig) -> Self {
33+
Self { config }
34+
}
35+
}
36+
37+
impl InputProtocol for PulsarProtocol {
38+
fn name(&self) -> String {
39+
format!("pulsar-{}", self.config.topic)
40+
}
41+
42+
fn init(&self) -> Result<(), Box<dyn std::error::Error + Send>> {
43+
// Lazy init is done in poll() on the worker thread which owns the runtime/consumer.
44+
Ok(())
45+
}
46+
47+
fn poll(
48+
&self,
49+
timeout: Duration,
50+
) -> Result<Option<BufferOrEvent>, Box<dyn std::error::Error + Send>> {
51+
PULSAR_RT.with(|rt_cell| {
52+
PULSAR_CONSUMER.with(|consumer_cell| {
53+
let mut rt_opt = rt_cell.borrow_mut();
54+
let mut consumer_opt = consumer_cell.borrow_mut();
55+
56+
if consumer_opt.is_none() {
57+
let rt = tokio::runtime::Runtime::new()
58+
.map_err(|e| Box::new(std::io::Error::other(e)) as Box<dyn std::error::Error + Send>)?;
59+
let url = self.config.url.clone();
60+
let topic = self.config.topic.clone();
61+
let subscription = self.config.subscription.clone();
62+
let sub_type = self.config.subscription_type.as_deref().unwrap_or("Exclusive");
63+
let sub_type_enum = match sub_type.to_lowercase().as_str() {
64+
"shared" => SubType::Shared,
65+
"key_shared" => SubType::KeyShared,
66+
"failover" => SubType::Failover,
67+
_ => SubType::Exclusive,
68+
};
69+
70+
let consumer: Consumer<Vec<u8>, _> = rt
71+
.block_on(async {
72+
let pulsar = Pulsar::builder(&url, TokioExecutor).build().await?;
73+
let mut builder = pulsar
74+
.consumer()
75+
.with_topic(&topic)
76+
.with_subscription(&subscription)
77+
.with_subscription_type(sub_type_enum);
78+
let consumer = builder.build().await?;
79+
Result::<_, pulsar::Error>::Ok(consumer)
80+
})
81+
.map_err(|e| Box::new(std::io::Error::other(e)) as Box<dyn std::error::Error + Send>)?;
82+
83+
*rt_opt = Some(rt);
84+
*consumer_opt = Some(consumer);
85+
}
86+
87+
let rt = rt_opt.as_ref().unwrap();
88+
let consumer = consumer_opt.as_mut().unwrap();
89+
90+
let timeout_ms = timeout.as_millis() as u64;
91+
let topic = self.config.topic.clone();
92+
let result = rt.block_on(async {
93+
let next_fut = consumer.next();
94+
match tokio::time::timeout(Duration::from_millis(timeout_ms), next_fut).await {
95+
Ok(Some(Ok(msg))) => {
96+
let payload = msg.deserialize().unwrap_or_else(|_| msg.payload.data.clone());
97+
let _ = consumer.ack(&msg).await;
98+
Some(Ok(payload))
99+
}
100+
Ok(Some(Err(e))) => Some(Err(e)),
101+
Ok(None) | Err(_) => None,
102+
}
103+
});
104+
105+
match result {
106+
Some(Ok(payload)) => Ok(Some(BufferOrEvent::new_buffer(
107+
payload,
108+
Some(topic),
109+
false,
110+
false,
111+
))),
112+
Some(Err(e)) => Err(Box::new(std::io::Error::other(e)) as Box<dyn std::error::Error + Send>),
113+
None => Ok(None),
114+
}
115+
})
116+
})
117+
}
118+
}

src/runtime/output/output_provider.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,33 @@ impl OutputProvider {
8888
let runtime = output_config.output_runtime_config();
8989
Ok(Box::new(OutputRunner::new(protocol, output_idx, runtime)))
9090
}
91+
OutputConfig::Pulsar {
92+
url,
93+
topic,
94+
extra,
95+
runtime: _,
96+
} => {
97+
use crate::runtime::output::output_runner::OutputRunner;
98+
use crate::runtime::output::protocol::pulsar::{
99+
PulsarOutputProtocol, PulsarProducerConfig,
100+
};
101+
102+
if url.is_empty() {
103+
return Err(Box::new(std::io::Error::new(
104+
std::io::ErrorKind::InvalidData,
105+
format!(
106+
"Invalid pulsar url in output config: empty (topic: {})",
107+
topic
108+
),
109+
)) as Box<dyn std::error::Error + Send>);
110+
}
111+
112+
let pulsar_config =
113+
PulsarProducerConfig::new(url.clone(), topic.clone(), extra.clone());
114+
let protocol = PulsarOutputProtocol::new(pulsar_config);
115+
let runtime = output_config.output_runtime_config();
116+
Ok(Box::new(OutputRunner::new(protocol, output_idx, runtime)))
117+
}
91118
}
92119
}
93120
}

src/runtime/output/protocol/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,4 @@
1515
// Provides implementations of various output protocols
1616

1717
pub mod kafka;
18+
pub mod pulsar;
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
// Licensed under the Apache License, Version 2.0 (the "License");
2+
// you may not use this file except in compliance with the License.
3+
// You may obtain a copy of the License at
4+
//
5+
// http://www.apache.org/licenses/LICENSE-2.0
6+
//
7+
// Unless required by applicable law or agreed to in writing, software
8+
// distributed under the License is distributed on an "AS IS" BASIS,
9+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
// See the License for the specific language governing permissions and
11+
// limitations under the License.
12+
13+
pub mod producer_config;
14+
pub mod pulsar_protocol;
15+
16+
pub use producer_config::PulsarProducerConfig;
17+
pub use pulsar_protocol::PulsarOutputProtocol;
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
// Licensed under the Apache License, Version 2.0 (the "License");
2+
// you may not use this file except in compliance with the License.
3+
// You may obtain a copy of the License at
4+
//
5+
// http://www.apache.org/licenses/LICENSE-2.0
6+
//
7+
// Unless required by applicable law or agreed to in writing, software
8+
// distributed under the License is distributed on an "AS IS" BASIS,
9+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
// See the License for the specific language governing permissions and
11+
// limitations under the License.
12+
13+
use std::collections::HashMap;
14+
15+
/// PulsarProducerConfig - Pulsar producer configuration
16+
#[derive(Debug, Clone)]
17+
pub struct PulsarProducerConfig {
18+
pub url: String,
19+
pub topic: String,
20+
pub properties: HashMap<String, String>,
21+
}
22+
23+
impl PulsarProducerConfig {
24+
pub fn new(url: String, topic: String, properties: HashMap<String, String>) -> Self {
25+
Self {
26+
url,
27+
topic,
28+
properties,
29+
}
30+
}
31+
}

0 commit comments

Comments
 (0)