Skip to content

Commit ec7ae43

Browse files
committed
fix
1 parent 062b3fb commit ec7ae43

3 files changed

Lines changed: 12 additions & 0 deletions

File tree

src/sql/connector/sink/kafka.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ impl SinkProvider for KafkaSinkConnector {
140140
client_configs.remove(opt::CHECKPOINT_INTERVAL_MS);
141141
client_configs.remove(opt::PIPELINE_PARALLELISM);
142142
client_configs.remove(opt::KEY_BY_PARALLELISM);
143+
client_configs.remove(opt::FORMAT);
143144

144145
Ok(ConnectorConfig::KafkaSink(KafkaSinkConfig {
145146
topic,

src/sql/connector/source/kafka.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ impl SourceProvider for KafkaSourceConnector {
163163
client_configs.remove(opt::CHECKPOINT_INTERVAL_MS);
164164
client_configs.remove(opt::PIPELINE_PARALLELISM);
165165
client_configs.remove(opt::KEY_BY_PARALLELISM);
166+
client_configs.remove(opt::FORMAT);
166167

167168
Ok(ConnectorConfig::KafkaSource(KafkaSourceConfig {
168169
topic,

tests/integration/test/wasm/python_sdk/conftest.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,9 +92,19 @@ def __init__(self, client: FsClient):
9292
self._client = client
9393
self._registered: Set[str] = set()
9494

95+
def __contains__(self, name: str) -> bool:
96+
return name in self._registered
97+
9598
def append(self, name: str) -> None:
9699
self._registered.add(name)
97100

101+
def extend(self, names) -> None:
102+
for name in names:
103+
self._registered.add(name)
104+
105+
def remove(self, name: str) -> None:
106+
self._registered.discard(name)
107+
98108
def register(self, name: str) -> None:
99109
self._registered.add(name)
100110

0 commit comments

Comments
 (0)