Skip to content

Commit abac878

Browse files
committed
fix linter
1 parent 62b8033 commit abac878

15 files changed

+443
-300
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Initial implementation of topic reader
2+
13
## 3.0.1b3 ##
24
* Fix error of check retriable error for idempotent operations (error exist since 2.12.1)
35

tests/conftest.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,9 @@ def topic_path(endpoint, topic_consumer) -> str:
135135
@pytest.mark.asyncio()
136136
async def topic_with_messages(driver, topic_path):
137137
pass
138-
writer = driver.topic_client.topic_writer(topic_path, producer_and_message_group_id="fixture-producer-id")
138+
writer = driver.topic_client.topic_writer(
139+
topic_path, producer_and_message_group_id="fixture-producer-id"
140+
)
139141
await writer.write_with_ack(
140142
ydb.TopicWriterMessage(data="123".encode()),
141143
ydb.TopicWriterMessage(data="456".encode()),

tests/topics/test_topic_reader.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
import pytest
22

3-
from ydb._topic_reader.topic_reader_asyncio import PublicAsyncIOReader
4-
from ydb import TopicReaderSettings
5-
63

74
@pytest.mark.asyncio
85
class TestTopicWriterAsyncIO:
9-
async def test_read_message(self, driver, topic_path, topic_with_messages, topic_consumer):
6+
async def test_read_message(
7+
self, driver, topic_path, topic_with_messages, topic_consumer
8+
):
109
reader = driver.topic_client.topic_reader(topic_consumer, topic_path)
1110

1211
assert await reader.receive_batch() is not None

ydb/_topic_reader/datatypes.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,4 +87,7 @@ def end_offset(self) -> int:
8787
@property
8888
def is_alive(self) -> bool:
8989
state = self._partition_session.state
90-
return state == PartitionSession.State.Active or state == PartitionSession.State.GracefulShutdown
90+
return (
91+
state == PartitionSession.State.Active
92+
or state == PartitionSession.State.GracefulShutdown
93+
)

ydb/_topic_reader/topic_reader.py

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,18 @@
1-
import abc
21
import concurrent.futures
32
import enum
4-
import io
53
import datetime
64
from dataclasses import dataclass
75
from typing import (
86
Union,
97
Optional,
108
List,
11-
Mapping,
12-
Callable,
139
Iterable,
14-
AsyncIterable,
15-
AsyncContextManager,
16-
Any, Dict,
1710
)
1811

19-
from ydb import RetrySettings
20-
from ydb._topic_wrapper.common import OffsetsRange, TokenGetterFuncType
21-
from ydb._topic_wrapper.reader import StreamReadMessage
12+
from ..table import RetrySettings
13+
from .datatypes import ICommittable, PublicBatch, PublicMessage
14+
from .._topic_wrapper.common import OffsetsRange, TokenGetterFuncType
15+
from .._topic_wrapper.reader import StreamReadMessage
2216

2317

2418
class Selector:
@@ -47,7 +41,9 @@ async def sessions_stat(self) -> List["SessionStat"]:
4741
"""
4842
raise NotImplementedError()
4943

50-
def messages(self, *, timeout: Union[float, None] = None) -> Iterable["PublicMessage"]:
44+
def messages(
45+
self, *, timeout: Union[float, None] = None
46+
) -> Iterable[PublicMessage]:
5147
"""
5248
todo?
5349
@@ -59,7 +55,7 @@ def messages(self, *, timeout: Union[float, None] = None) -> Iterable["PublicMes
5955
"""
6056
raise NotImplementedError()
6157

62-
def receive_message(self, *, timeout: Union[float, None] = None) -> "PublicMessage":
58+
def receive_message(self, *, timeout: Union[float, None] = None) -> PublicMessage:
6359
"""
6460
Block until receive new message
6561
It has no async_ version for prevent lost messages, use async_wait_message as signal for new batches available.
@@ -85,7 +81,7 @@ def batches(
8581
max_messages: Union[int, None] = None,
8682
max_bytes: Union[int, None] = None,
8783
timeout: Union[float, None] = None,
88-
) -> Iterable["PublicBatch"]:
84+
) -> Iterable[PublicBatch]:
8985
"""
9086
Block until receive new batch.
9187
It has no async_ version for prevent lost messages, use async_wait_message as signal for new batches available.
@@ -101,7 +97,7 @@ def receive_batch(
10197
max_messages: Union[int, None] = None,
10298
max_bytes: Union[int, None],
10399
timeout: Union[float, None] = None,
104-
) -> Union["PublicBatch", None]:
100+
) -> Union[PublicBatch, None]:
105101
"""
106102
Get one messages batch from reader
107103
It has no async_ version for prevent lost messages, use async_wait_message as signal for new batches available.
@@ -111,7 +107,7 @@ def receive_batch(
111107
"""
112108
raise NotImplementedError()
113109

114-
def commit(self, mess: "ICommittable"):
110+
def commit(self, mess: ICommittable):
115111
"""
116112
Put commit message to internal buffer.
117113
@@ -121,7 +117,7 @@ def commit(self, mess: "ICommittable"):
121117
raise NotImplementedError()
122118

123119
def commit_with_ack(
124-
self, mess: "ICommittable"
120+
self, mess: ICommittable
125121
) -> Union["CommitResult", List["CommitResult"]]:
126122
"""
127123
write commit message to a buffer and wait ack from the server.
@@ -131,7 +127,7 @@ def commit_with_ack(
131127
raise NotImplementedError()
132128

133129
def async_commit_with_ack(
134-
self, mess: "ICommittable"
130+
self, mess: ICommittable
135131
) -> Union["CommitResult", List["CommitResult"]]:
136132
"""
137133
write commit message to a buffer and return Future for wait result.
@@ -184,7 +180,7 @@ def _init_message(self) -> StreamReadMessage.InitRequest:
184180
consumer=self.consumer,
185181
)
186182

187-
def _retry_settings(self)->RetrySettings:
183+
def _retry_settings(self) -> RetrySettings:
188184
return RetrySettings(idempotent=True)
189185

190186

0 commit comments

Comments
 (0)