-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
99 lines (79 loc) · 2.69 KB
/
main.py
File metadata and controls
99 lines (79 loc) · 2.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import grpc
import time
import threading
import plugin_pb2
import plugin_pb2_grpc
def connect():
channel = grpc.insecure_channel("127.0.0.1:50051")
stub = plugin_pb2_grpc.PluginServiceStub(channel)
ack_queue = []
ack_cv = threading.Condition()
# Generator that yields outgoing ack messages
def request_stream():
while True:
with ack_cv:
if ack_queue:
msg = ack_queue.pop(0)
yield msg
else:
ack_cv.wait(timeout=0.1)
# Open streaming RPC
stream = stub.Session(request_stream())
print("Connected, listening for events...")
try:
for msg in stream:
# INSERT
if msg.HasField("insert"):
ev = msg.insert
print(f"INSERT {ev.schema}.{ev.table}: {ev.json_payload}")
with ack_cv:
ack_queue.append(
plugin_pb2.ClientMessage(
ack=plugin_pb2.ClientAck(pg_lsn=ev.pg_lsn)
)
)
ack_cv.notify()
# UPDATE
elif msg.HasField("update"):
ev = msg.update
print(f"UPDATE {ev.schema}.{ev.table}: {ev.json_payload}")
with ack_cv:
ack_queue.append(
plugin_pb2.ClientMessage(
ack=plugin_pb2.ClientAck(pg_lsn=ev.pg_lsn)
)
)
ack_cv.notify()
# DELETE
elif msg.HasField("delete"):
ev = msg.delete
print(f"DELETE {ev.schema}.{ev.table}: {ev.json_payload}")
with ack_cv:
ack_queue.append(
plugin_pb2.ClientMessage(
ack=plugin_pb2.ClientAck(pg_lsn=ev.pg_lsn)
)
)
ack_cv.notify()
# TRUNCATE
elif msg.HasField("truncate"):
ev = msg.truncate
print(f"TRUNCATE {ev.schema}.{ev.table}")
with ack_cv:
ack_queue.append(
plugin_pb2.ClientMessage(
ack=plugin_pb2.ClientAck(pg_lsn=ev.pg_lsn)
)
)
ack_cv.notify()
except grpc.RpcError as e:
print("Stream error:", e.details())
finally:
print("Stream closed, reconnecting...")
channel.close()
def wait(ms):
time.sleep(ms / 1000)
if __name__ == "__main__":
while True:
connect()
wait(2000)