Skip to content

Commit 97a093d

Browse files
committed
zmq bot api
* wire up event subscription via zmq to notify connections of new events * add zmq handler to inject http rpc calls via zmq authenticated by zmq curve pubkey
1 parent e4c559a commit 97a093d

File tree

2 files changed

+180
-5
lines changed

2 files changed

+180
-5
lines changed

sogs/events.py

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
from collections import defaultdict
2+
3+
from oxenmq import AuthLevel
4+
5+
from . import model
6+
from .omq import omq
7+
from .web import app
8+
9+
from binascii import hexlify
10+
from types import Iterable
11+
12+
from oxenc import bt_serialize
13+
14+
from .routes.subrequest import make_subrequest
15+
16+
from flask import g
17+
18+
# pools for event propagation
19+
_pools = defaultdict(list)
20+
21+
status_OK = 'OK'
22+
status_ERR = 'ERROR'
23+
24+
# the events we are able to subscribe to
25+
EVENTS = ('message', 'joined', 'parted', 'banned', 'unbanned', 'deleted', 'uploaded')
26+
27+
28+
def event_name_valid(eventname):
29+
""" return True if this event name is something well formed """
30+
return eventname in EVENTS
31+
32+
33+
def _user_from_conn(conn):
34+
"""
35+
make a model.User from a connection using it's curve pubkey as the session id.
36+
"""
37+
return model.User(session_id='05' + hexlify(conn.pubkey).decode())
38+
39+
40+
def _propagate_event(eventname, *args):
41+
""" propagate an event to everyone who cares about it """
42+
assert event_name_valid(eventname)
43+
global omq, _pools
44+
sent = 0
45+
for conn in _pools[eventname]:
46+
omq.send(conn, f'sogs.event.{eventname}', *(bt_serialize(a) for a in args))
47+
sent += 1
48+
if sent:
49+
app.logger.info(f"sent {eventname} to {sent} subscribers")
50+
51+
52+
_category = omq.add_category('sogs', AuthLevel.basic)
53+
54+
55+
def api(f, *, name=None, minargs=None):
56+
""" set up a request handler for zmq for a function with name of the endpoint """
57+
assert name is not None
58+
59+
def _handle_request(msg):
60+
try:
61+
if minargs and len(msg.data) < minargs:
62+
raise ValueError(f"Not enough arguments, got {len(msg.data)} expected 2 or more")
63+
app.logger.debug(f"zmq request: {name} for {msg.conn}")
64+
g.user = _user_from_conn(msg.conn)
65+
retval = f(*msg.data, conn=msg.conn)
66+
if retval is None:
67+
msg.reply(status_OK)
68+
elif isinstance(retval, tuple):
69+
msg.reply(status_OK, *retval)
70+
else:
71+
msg.reply(status_OK, bt_serialize(retval))
72+
except Exception as ex:
73+
app.logger.error(f"{f.__name__} raised exception: {ex}")
74+
msg.reply(status_ERR, f'{ex}')
75+
finally:
76+
g.user = None
77+
78+
global _category
79+
_category.add_request_command(name, _handle_request)
80+
app.logger.info(f"register zmq api handler: sogs.{name}")
81+
return f
82+
83+
84+
def _collect_bytes(iterable: Iterable[bytes]):
85+
""" collect all bytes from an iterable of bytes and put it into one big bytes instance """
86+
data = bytes()
87+
for part in iterable:
88+
data += part
89+
return data
90+
91+
92+
@api(name='sub', minargs=1)
93+
def handle_subscribe(*events, conn=None):
94+
""" subscribe connection to many events """
95+
sub = set()
96+
for ev in events:
97+
name = ev.decode('ascii')
98+
if not event_name_valid(name):
99+
raise Exception(f"invalid event type: {name}")
100+
sub += name
101+
102+
global _pools
103+
for name in sub:
104+
_pools[name].append(conn)
105+
app.logger.debug(f"sub {conn} to {len(sub)} events")
106+
107+
108+
@api(name='unsub', minargs=1)
109+
def handle_unsubscribe(*events, conn=None):
110+
""" unsub connection to many events """
111+
unsub = set()
112+
for ev in events:
113+
name = ev.decode('ascii')
114+
if not event_name_valid(name):
115+
raise Exception(f"invalid event type: {name}")
116+
unsub += name
117+
118+
global _pools
119+
for name in unsub:
120+
_pools[name].remove(conn)
121+
app.logger.debug(f"unsub {conn} to {len(unsub)} events")
122+
123+
124+
@api(name="request", minargs=2)
125+
def handle_rpc_call(method, path, body=None, *, conn=None):
126+
""" make a sub request via zmq """
127+
ctype = None
128+
# guess content type
129+
if body:
130+
if body[0] in (b'{', b'['):
131+
ctype = 'application/json'
132+
else:
133+
ctype = 'application/octet-stream'
134+
135+
resp = make_subrequest(
136+
method.decode('ascii'), path.decode('ascii'), content_type=ctype, body=body
137+
)
138+
return resp.status_code, _collect_bytes(resp.response)
139+
140+
141+
class _Notify:
142+
""" Holder type for all event notification functions """
143+
144+
145+
notify = _Notify()
146+
147+
# set up event notifiers
148+
for ev in EVENTS:
149+
setattr(notify, ev, lambda *args: _propagate_event(ev, *args))

sogs/mule.py

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import traceback
22
import oxenmq
3-
from oxenc import bt_deserialize
43
import time
54
from datetime import timedelta
65
import functools
@@ -9,6 +8,7 @@
98
from . import cleanup
109
from . import config
1110
from . import omq as o
11+
from .events import notify
1212

1313
# This is the uwsgi "mule" that handles things not related to serving HTTP requests:
1414
# - it holds the oxenmq instance (with its own interface into sogs)
@@ -52,6 +52,10 @@ def setup_omq():
5252
for addr in listen:
5353
omq.listen(addr, curve=True, allow_connection=allow_conn)
5454
app.logger.info(f"OxenMQ listening on {addr}")
55+
if not listen:
56+
app.logger.warn(
57+
"OxenMQ did not listen on any curve addresses, the bot API is not accessable anywhere."
58+
)
5559

5660
# Internal socket for workers to talk to us:
5761
omq.listen(config.OMQ_INTERNAL, curve=False, allow_connection=admin_conn)
@@ -64,6 +68,10 @@ def setup_omq():
6468
worker.add_command("message_posted", message_posted)
6569
worker.add_command("messages_deleted", messages_deleted)
6670
worker.add_command("message_edited", message_edited)
71+
worker.add_command("user_joined", user_joined)
72+
worker.add_command("user_banned", user_banned)
73+
worker.add_command("user_unbanned", user_unbanned)
74+
worker.add_command("file_uploaded", file_uploaded)
6775

6876
app.logger.debug("Mule starting omq")
6977
omq.start()
@@ -88,14 +96,32 @@ def wrapper(*args, **kwargs):
8896

8997
@log_exceptions
9098
def message_posted(m: oxenmq.Message):
91-
id = bt_deserialize(m.data()[0])
92-
app.logger.warning(f"FIXME: mule -- message posted stub, id={id}")
99+
notify.message(*m.data())
93100

94101

95102
@log_exceptions
96103
def messages_deleted(m: oxenmq.Message):
97-
ids = bt_deserialize(m.data()[0])
98-
app.logger.warning(f"FIXME: mule -- message delete stub, deleted messages: {ids}")
104+
notify.deleted(*m.data())
105+
106+
107+
@log_exceptions
108+
def user_banned(m: oxenmq.Message):
109+
notify.banned(*m.data())
110+
111+
112+
@log_exceptions
113+
def user_unbanned(m: oxenmq.Message):
114+
notify.unbannd(*m.data())
115+
116+
117+
@log_exceptions
118+
def user_joined(m: oxenmq.Message):
119+
notify.joined(*m.data())
120+
121+
122+
@log_exceptions
123+
def file_uploaded(m: oxenmq.Message):
124+
notify.uploaded(*m.data())
99125

100126

101127
@log_exceptions

0 commit comments

Comments
 (0)