diff --git a/Cargo.lock b/Cargo.lock index 05033c65a..50160f666 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -97,6 +97,54 @@ version = "0.2.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" +[[package]] +name = "amq-protocol" +version = "8.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a56406d1cfd9933844a083206bf10fe3748c753bf43c64d4f4453d87181c14f6" +dependencies = [ + "amq-protocol-tcp", + "amq-protocol-types", + "amq-protocol-uri", + "cookie-factory", + "nom 8.0.0", + "serde", +] + +[[package]] +name = "amq-protocol-tcp" +version = "8.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c95840a64f5529882346cf7710d0c8d6b6e6fb4ecd3fed0883d1ae97c42efc14" +dependencies = [ + "amq-protocol-uri", + "tcp-stream", + "tracing", +] + +[[package]] +name = "amq-protocol-types" +version = "8.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a7bc568496f6fd598f44bf5c7e9f8157111634a54c114cb9c087e05d282c91a" +dependencies = [ + "cookie-factory", + "nom 8.0.0", + "serde", + "serde_json", +] + +[[package]] +name = "amq-protocol-uri" +version = "8.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1d80bcd4af9f0fd2bad6def25c66d55130178749cc98000fd3de2b44991b5ed" +dependencies = [ + "amq-protocol-types", + "percent-encoding", + "url", +] + [[package]] name = "amqp_serde" version = "0.4.2" @@ -181,6 +229,45 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" +[[package]] +name = "asn1-rs" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56624a96882bb8c26d61312ae18cb45868e5a9992ea73c58e45c3101e56a1e60" +dependencies = [ + "asn1-rs-derive", + "asn1-rs-impl", + "displaydoc", + "nom 7.1.3", + "num-traits", + "rusticata-macros", + "thiserror 2.0.12", + "time", +] + +[[package]] +name = "asn1-rs-derive" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3109e49b1e4909e9db6515a30c633684d68cdeaa252f215214cb4fa1a5bfee2c" +dependencies = [ + "proc-macro2", + "quote 1.0.40", + "syn 2.0.101", + "synstructure 0.13.2", +] + +[[package]] +name = "asn1-rs-impl" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b18050c2cd6fe86c3a76584ef5e0baf286d038cda203eb6223df2cc413565f7" +dependencies = [ + "proc-macro2", + "quote 1.0.40", + "syn 2.0.101", +] + [[package]] name = "async-attributes" version = "1.1.2" @@ -245,6 +332,31 @@ dependencies = [ "tokio 1.45.1", ] +[[package]] +name = "async-global-executor" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13f937e26114b93193065fd44f507aa2e9169ad0cdabbb996920b1fe1ddea7ba" +dependencies = [ + "async-channel 2.3.1", + "async-executor", + "async-io", + "async-lock 3.4.0", + "blocking", + "futures-lite 2.6.0", +] + +[[package]] +name = "async-global-executor-trait" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9af57045d58eeb1f7060e7025a1631cbc6399e0a1d10ad6735b3d0ea7f8346ce" +dependencies = [ + "async-global-executor 3.1.0", + "async-trait", + "executor-trait", +] + [[package]] name = "async-io" version = "2.4.1" @@ -303,6 +415,18 @@ dependencies = [ "tracing", ] +[[package]] +name = "async-reactor-trait" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35ce4c5a0627f1b37c346233c34ef2263e6a58812f7c071b71d3712511421422" +dependencies = [ + "async-io", + "async-trait", + "futures-core", + "reactor-trait", +] + [[package]] name = "async-recursion" version = "1.1.1" @@ -340,7 +464,7 @@ checksum = "730294c1c08c2e0f85759590518f6333f0d5a0a766a27d519c1b244c3dfd8a24" dependencies = [ "async-attributes", "async-channel 1.9.0", - "async-global-executor", + "async-global-executor 2.4.1", "async-io", "async-lock 3.4.0", "async-process", @@ -510,7 +634,7 @@ dependencies = [ "anyhow", "arrayvec", "log", - "nom", + "nom 7.1.3", "num-rational", "v_frame", ] @@ -1256,6 +1380,15 @@ dependencies = [ "generic-array 0.14.7", ] +[[package]] +name = "block-padding" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8894febbff9f758034a5b8e12d87918f56dfc64a8e1fe757d65e29041538d93" +dependencies = [ + "generic-array 0.14.7", +] + [[package]] name = "blocking" version = "1.6.1" @@ -1386,6 +1519,15 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2698f953def977c68f935bb0dfa959375ad4638570e969e2f1e9f433cbf1af6" +[[package]] +name = "cbc" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26b52a9543ae338f279b96b0b9fed9c8093744685043739079ce85cd58f289a6" +dependencies = [ + "cipher", +] + [[package]] name = "cc" version = "1.2.26" @@ -1403,7 +1545,7 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766" dependencies = [ - "nom", + "nom 7.1.3", ] [[package]] @@ -1487,6 +1629,18 @@ dependencies = [ "cc", ] +[[package]] +name = "cms" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b77c319abfd5219629c45c34c89ba945ed3c5e49fcde9d16b6c3885f118a730" +dependencies = [ + "const-oid 0.9.6", + "der 0.7.10", + "spki 0.7.3", + "x509-cert", +] + [[package]] name = "coarsetime" version = "0.1.36" @@ -1536,7 +1690,7 @@ dependencies = [ "async-trait", "json5", "lazy_static", - "nom", + "nom 7.1.3", "pathdiff", "ron", "rust-ini", @@ -2014,7 +2168,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "79b71cca7d95d7681a4b3b9cdf63c8dbc3730d0584c2c74e31416d64a90493f4" dependencies = [ "const-oid 0.6.2", - "der_derive", + "der_derive 0.4.1", ] [[package]] @@ -2035,10 +2189,26 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e7c1832837b905bbfb5101e07cc24c8deddf52f93225eee6ead5f4d63d53ddcb" dependencies = [ "const-oid 0.9.6", + "der_derive 0.7.3", + "flagset", "pem-rfc7468 0.7.0", "zeroize", ] +[[package]] +name = "der-parser" +version = "10.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07da5016415d5a3c4dd39b11ed26f915f52fc4e0dc197d87908bc916e51bc1a6" +dependencies = [ + "asn1-rs", + "displaydoc", + "nom 7.1.3", + "num-bigint", + "num-traits", + "rusticata-macros", +] + [[package]] name = "der_derive" version = "0.4.1" @@ -2051,6 +2221,17 @@ dependencies = [ "synstructure 0.12.6", ] +[[package]] +name = "der_derive" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8034092389675178f570469e6c3b0465d3d30b4505c294a6550db47f3c17ad18" +dependencies = [ + "proc-macro2", + "quote 1.0.40", + "syn 2.0.101", +] + [[package]] name = "deranged" version = "0.4.0" @@ -2096,6 +2277,15 @@ dependencies = [ "syn 2.0.101", ] +[[package]] +name = "des" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffdd80ce8ce993de27e9f063a444a4d53ce8e8db4c1f00cc03af5ad5a9867a1e" +dependencies = [ + "cipher", +] + [[package]] name = "devise" version = "0.4.2" @@ -2384,7 +2574,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cea14ef9355e3beab063703aa9dab15afd25f0667c341310c1e5274bb1d0da18" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -2414,6 +2604,15 @@ dependencies = [ "pin-project-lite 0.2.16", ] +[[package]] +name = "executor-trait" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13c39dff9342e4e0e16ce96be751eb21a94e94a87bb2f6e63ad1961c2ce109bf" +dependencies = [ + "async-trait", +] + [[package]] name = "exr" version = "1.73.0" @@ -2522,6 +2721,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "flagset" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7ac824320a75a52197e8f2d787f6a38b6718bb6897a35142d749af3c0e8f4fe" + [[package]] name = "flate2" version = "1.1.1" @@ -2541,6 +2746,17 @@ dependencies = [ "num-traits", ] +[[package]] +name = "flume" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da0e4dd2a88388a1f4ccc7c9ce104604dab68d9f408dc34cd45823d5a9069095" +dependencies = [ + "futures-core", + "futures-sink", + "spin", +] + [[package]] name = "fnv" version = "1.0.7" @@ -3747,6 +3963,7 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "879f10e63c20629ecabbb64a8010319738c66a5cd0c29b02d63d272b03751d01" dependencies = [ + "block-padding", "generic-array 0.14.7", ] @@ -3806,7 +4023,7 @@ checksum = "e04d7f318608d35d4b61ddd75cbdaee86b023ebe2bd5a66ee0915f0bf93095a9" dependencies = [ "hermit-abi 0.5.1", "libc", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -4111,6 +4328,25 @@ dependencies = [ "log", ] +[[package]] +name = "lapin" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "792d823427d788859cf36d471a9983e52e07e1267fb50ec07ecb240ef4ecd8af" +dependencies = [ + "amq-protocol", + "async-global-executor-trait", + "async-reactor-trait", + "async-trait", + "executor-trait", + "flume", + "futures-core", + "futures-io", + "reactor-trait", + "tracing", + "waker-fn", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -4148,7 +4384,7 @@ dependencies = [ "idna 0.3.0", "mime", "native-tls", - "nom", + "nom 7.1.3", "once_cell", "quoted_printable", "socket2 0.4.10", @@ -4655,7 +4891,7 @@ dependencies = [ "md-5", "mongodb-internal-macros", "once_cell", - "pbkdf2", + "pbkdf2 0.11.0", "percent-encoding", "rand 0.8.5", "rustc_version_runtime", @@ -4758,6 +4994,15 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nom" +version = "8.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df9761775871bdef83bee530e60050f7e54b1105350d6884eb0fb4f46c2f9405" +dependencies = [ + "memchr", +] + [[package]] name = "noop_proc_macro" version = "0.3.0" @@ -4938,6 +5183,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "oid-registry" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12f40cff3dde1b6087cc5d5f5d4d65712f34016a03ed60e9c08dcc392736b5b7" +dependencies = [ + "asn1-rs", +] + [[package]] name = "once_cell" version = "1.21.3" @@ -5027,6 +5281,28 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +[[package]] +name = "p12-keystore" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3cae83056e7cb770211494a0ecf66d9fa7eba7d00977e5bb91f0e925b40b937f" +dependencies = [ + "cbc", + "cms", + "der 0.7.10", + "des", + "hex", + "hmac", + "pkcs12", + "pkcs5", + "rand 0.9.1", + "rc2", + "sha1", + "sha2", + "thiserror 2.0.12", + "x509-parser", +] + [[package]] name = "p256" version = "0.11.1" @@ -5112,6 +5388,16 @@ dependencies = [ "digest", ] +[[package]] +name = "pbkdf2" +version = "0.12.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8ed6a7761f76e3b9f92dfb0a60a6a6477c61024b775147ff0973a02653abaf2" +dependencies = [ + "digest", + "hmac", +] + [[package]] name = "pear" version = "0.2.9" @@ -5382,6 +5668,36 @@ dependencies = [ "zeroize", ] +[[package]] +name = "pkcs12" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "695b3df3d3cc1015f12d70235e35b6b79befc5fa7a9b95b951eab1dd07c9efc2" +dependencies = [ + "cms", + "const-oid 0.9.6", + "der 0.7.10", + "digest", + "spki 0.7.3", + "x509-cert", + "zeroize", +] + +[[package]] +name = "pkcs5" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e847e2c91a18bfa887dd028ec33f2fe6f25db77db3619024764914affe8b69a6" +dependencies = [ + "aes", + "cbc", + "der 0.7.10", + "pbkdf2 0.12.2", + "scrypt", + "sha2", + "spki 0.7.3", +] + [[package]] name = "pkcs8" version = "0.9.0" @@ -5936,6 +6252,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "rc2" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62c64daa8e9438b84aaae55010a93f396f8e60e3911590fcba770d04643fc1dd" +dependencies = [ + "cipher", +] + [[package]] name = "rdrand" version = "0.4.0" @@ -5945,6 +6270,18 @@ dependencies = [ "rand_core 0.3.1", ] +[[package]] +name = "reactor-trait" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b2eb4ff39ed91c79034b2856a73b07bc8afd752c981bc4433bcbeff531e275a" +dependencies = [ + "async-io", + "async-trait", + "futures-core", + "futures-io", +] + [[package]] name = "redis" version = "0.23.3" @@ -5993,7 +6330,7 @@ dependencies = [ "cookie-factory", "crc16", "log", - "nom", + "nom 7.1.3", ] [[package]] @@ -6228,12 +6565,15 @@ dependencies = [ "bincode", "fred", "futures", + "lapin", "log", "lru 0.7.8", "lru_time_cache", "once_cell", "querystring", + "rand 0.6.5", "redis-kiss", + "revolt-broker", "revolt-config", "revolt-database", "revolt-models", @@ -6247,6 +6587,19 @@ dependencies = [ "ulid 0.5.0", ] +[[package]] +name = "revolt-broker" +version = "0.8.8" +dependencies = [ + "async-std", + "lapin", + "log", + "rand 0.9.1", + "revolt-config", + "rmp-serde", + "serde", +] + [[package]] name = "revolt-config" version = "0.8.8" @@ -6296,6 +6649,7 @@ dependencies = [ "indexmap 1.9.3", "isahc", "iso8601-timestamp", + "lapin", "linkify 0.8.1", "log", "lru 0.11.1", @@ -6305,6 +6659,7 @@ dependencies = [ "rand 0.8.5", "redis-kiss", "regex", + "revolt-broker", "revolt-config", "revolt-models", "revolt-parser", @@ -6315,6 +6670,7 @@ dependencies = [ "revolt_okapi", "revolt_optional_struct", "revolt_rocket_okapi", + "rmp-serde", "rocket", "schemars", "serde", @@ -6341,6 +6697,7 @@ dependencies = [ "futures", "impl_ops", "iso8601-timestamp", + "lapin", "lettre", "linkify 0.6.0", "log", @@ -6352,6 +6709,7 @@ dependencies = [ "redis-kiss", "regex", "reqwest 0.11.27", + "revolt-broker", "revolt-config", "revolt-database", "revolt-models", @@ -6359,6 +6717,7 @@ dependencies = [ "revolt-presence", "revolt-result", "revolt_rocket_okapi", + "rmp-serde", "rocket", "rocket_authifier", "rocket_cors", @@ -6892,6 +7251,15 @@ dependencies = [ "semver", ] +[[package]] +name = "rusticata-macros" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "faf0c4a6ece9950b9abdb62b1cfcf2a68b3b67a10ba445b3bb85be2a293d0632" +dependencies = [ + "nom 7.1.3", +] + [[package]] name = "rustix" version = "0.38.44" @@ -6902,7 +7270,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.4.15", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -6915,7 +7283,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.9.4", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -6958,6 +7326,19 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls-connector" +version = "0.21.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b68f41384a0c195494f07c9bf5d020dcfca869d3b1492c9391bbc267bd9904fd" +dependencies = [ + "log", + "rustls 0.23.27", + "rustls-native-certs 0.8.1", + "rustls-pki-types", + "rustls-webpki 0.103.3", +] + [[package]] name = "rustls-native-certs" version = "0.6.3" @@ -7072,6 +7453,15 @@ version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" +[[package]] +name = "salsa20" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97a22f5af31f73a954c10289c93e8a50cc23d971e80ee446f1f6f7137a088213" +dependencies = [ + "cipher", +] + [[package]] name = "schannel" version = "0.1.27" @@ -7134,6 +7524,17 @@ dependencies = [ "tendril", ] +[[package]] +name = "scrypt" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0516a385866c09368f0b5bcd1caff3366aace790fcd46e2bb032697bb172fd1f" +dependencies = [ + "pbkdf2 0.12.2", + "salsa20", + "sha2", +] + [[package]] name = "sct" version = "0.7.1" @@ -7725,6 +8126,9 @@ name = "spin" version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] [[package]] name = "spki" @@ -8029,6 +8433,18 @@ version = "0.12.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61c41af27dd6d1e27b1b16b489db798443478cef1f06a660c96db617ba5de3b1" +[[package]] +name = "tcp-stream" +version = "0.30.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0217559d8c5a883d66838180671a74007e785689364ebe9627d517af936eaa9" +dependencies = [ + "cfg-if", + "p12-keystore", + "rustls-connector", + "rustls-pemfile 2.2.0", +] + [[package]] name = "tempfile" version = "3.20.0" @@ -8039,7 +8455,7 @@ dependencies = [ "getrandom 0.3.3", "once_cell", "rustix 1.0.7", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -9236,7 +9652,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.59.0", ] [[package]] @@ -9640,6 +10056,34 @@ dependencies = [ "tap", ] +[[package]] +name = "x509-cert" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1301e935010a701ae5f8655edc0ad17c44bad3ac5ce8c39185f75453b720ae94" +dependencies = [ + "const-oid 0.9.6", + "der 0.7.10", + "spki 0.7.3", +] + +[[package]] +name = "x509-parser" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4569f339c0c402346d4a75a9e39cf8dad310e287eef1ff56d4c68e5067f53460" +dependencies = [ + "asn1-rs", + "data-encoding", + "der-parser", + "lazy_static", + "nom 7.1.3", + "oid-registry", + "rusticata-macros", + "thiserror 2.0.12", + "time", +] + [[package]] name = "xmlparser" version = "0.13.6" diff --git a/crates/bonfire/Cargo.toml b/crates/bonfire/Cargo.toml index e447825d4..ce73ae641 100644 --- a/crates/bonfire/Cargo.toml +++ b/crates/bonfire/Cargo.toml @@ -9,6 +9,7 @@ edition = "2021" [dependencies] # util log = "*" +rand = "*" sentry = "0.31.5" lru = "0.7.6" ulid = "0.5.0" @@ -38,6 +39,7 @@ async-std = { version = "1.8.0", features = [ # core authifier = { version = "1.0.15" } revolt-result = { path = "../core/result" } +revolt-broker = { path = "../core/broker" } revolt-models = { path = "../core/models" } revolt-config = { path = "../core/config" } revolt-database = { path = "../core/database" } @@ -46,3 +48,6 @@ revolt-presence = { path = "../core/presence", features = ["redis-is-patched"] } # redis fred = { version = "8.0.1", features = ["subscriber-client"] } + +# rabbit +lapin = { version = "3.0.0" } diff --git a/crates/bonfire/src/client/core.rs b/crates/bonfire/src/client/core.rs new file mode 100644 index 000000000..647da9526 --- /dev/null +++ b/crates/bonfire/src/client/core.rs @@ -0,0 +1,129 @@ +use async_std::{net::TcpStream, sync::Mutex}; +use async_tungstenite::WebSocketStream; +use futures::{join, SinkExt, StreamExt, TryStreamExt}; +use revolt_config::report_internal_error; +use revolt_database::{ + events::{client::EventV1, server::ClientMessage}, + iso8601_timestamp::Timestamp, + Database, User, UserHint, +}; +use revolt_presence::{create_session, delete_session}; +use revolt_result::create_error; + +use crate::{ + client::{ + subscriber::client_subscriber, + worker::{client_worker, WorkerRef}, + }, + config::ProtocolConfiguration, + events::state::State, +}; + +/// Core event loop of gateway clients +pub async fn client_core( + db: &'static Database, + ws: WebSocketStream, + mut config: ProtocolConfiguration, +) { + // Split the socket for simultaneously read and write. + let (mut write, mut read) = ws.split(); + + // If the user has not provided authentication, request information. + if config.get_session_token().is_none() { + while let Ok(Some(message)) = read.try_next().await { + if let Ok(ClientMessage::Authenticate { token }) = config.decode(&message) { + config.set_session_token(token); + break; + } + } + } + + // Try to authenticate the user. + let Some(token) = config.get_session_token().as_ref() else { + write + .send(config.encode(&EventV1::Error { + data: create_error!(InvalidSession), + })) + .await + .ok(); + return; + }; + + let (user, session_id) = match User::from_token(db, token, UserHint::Any).await { + Ok(user) => user, + Err(err) => { + write + .send(config.encode(&EventV1::Error { data: err })) + .await + .ok(); + return; + } + }; + + info!( + "Authenticated user {}#{}", + user.username, user.discriminator + ); + + db.update_session_last_seen(&session_id, Timestamp::now_utc()) + .await + .ok(); + + // Create local state. + let mut state = State::from(user, session_id); + let user_id = state.cache.user_id.clone(); + + // Notify socket we have authenticated. + if report_internal_error!(write.send(config.encode(&EventV1::Authenticated)).await).is_err() { + return; + } + + // Download required data to local cache and send Ready payload. + let ready_payload = match report_internal_error!( + state + .generate_ready_payload(db, config.get_ready_payload_fields()) + .await + ) { + Ok(ready_payload) => ready_payload, + Err(_) => return, + }; + + if report_internal_error!(write.send(config.encode(&ready_payload)).await).is_err() { + return; + } + + // Create presence session. + let (first_session, session_id) = create_session(&user_id, 0).await; + + // If this was the first session, notify other users that we just went online. + if first_session { + state.broadcast_presence_change(true).await; + } + + { + let worker_ref = WorkerRef::from(&state); + let write = Mutex::new(write); + let (reload, reloaded) = async_channel::bounded(1); + let (cancel_1, cancelled_1) = async_channel::bounded(1); + let (cancel_2, cancelled_2) = async_channel::bounded(1); + + join!( + async { + client_subscriber(&write, cancelled_1, reloaded, &config, db, &mut state).await; + cancel_2.send(()).await.ok(); + }, + async { + client_worker(read, &write, cancelled_2, reload, &config, worker_ref).await; + cancel_1.send(()).await.ok(); + } + ); + } + + // Clean up presence session. + let last_session = delete_session(&user_id, session_id).await; + + // If this was the last session, notify other users that we just went offline. + if last_session { + state.broadcast_presence_change(false).await; + } +} diff --git a/crates/bonfire/src/client/mod.rs b/crates/bonfire/src/client/mod.rs new file mode 100644 index 000000000..1ef92b72a --- /dev/null +++ b/crates/bonfire/src/client/mod.rs @@ -0,0 +1,3 @@ +pub mod core; +pub mod subscriber; +pub mod worker; diff --git a/crates/bonfire/src/client/subscriber.rs b/crates/bonfire/src/client/subscriber.rs new file mode 100644 index 000000000..f3de104cc --- /dev/null +++ b/crates/bonfire/src/client/subscriber.rs @@ -0,0 +1,112 @@ +use async_channel::Receiver; +use async_std::{net::TcpStream, sync::Mutex}; +use async_tungstenite::WebSocketStream; +use authifier::AuthifierEvent; +use futures::{pin_mut, select, stream::SplitSink, FutureExt, SinkExt}; +use revolt_broker::event_stream; +use revolt_database::{events::client::EventV1, Database}; +use sentry::Level; + +use crate::{ + config::ProtocolConfiguration, + events::state::{State, SubscriptionStateChange}, +}; + +/// Event subscriber loop +pub async fn client_subscriber( + write: &Mutex, async_tungstenite::tungstenite::Message>>, + cancelled: Receiver<()>, + reloaded: Receiver<()>, + protocol_config: &ProtocolConfiguration, + db: &'static Database, + state: &mut State, +) { + let mut consumer = event_stream::Consumer::new().await; + consumer.set_topics(state.subscribed.read().await.clone()); + + let mut cancel = false; + + loop { + // Reload consumer if subscriptions change + if !matches!(state.apply_state().await, SubscriptionStateChange::None) { + consumer.set_topics(state.subscribed.read().await.clone()); + } + + // Read incoming events + loop { + let reloaded = reloaded.recv().fuse(); + let cancelled = cancelled.recv().fuse(); + let delivery = consumer.next().fuse(); + pin_mut!(delivery, reloaded, cancelled); + + select! { + _ = reloaded => { + break; + } + _ = cancelled => { + cancel = true; + break; + } + event = delivery => { + if let Some(mut event) = event { + // Handle the event + if let EventV1::Auth(auth) = &event { + if let AuthifierEvent::DeleteSession { session_id, .. } = auth { + if &state.session_id == session_id { + event = EventV1::Logout; + } + } else if let AuthifierEvent::DeleteAllSessions { + exclude_session_id, .. + } = auth + { + if let Some(excluded) = exclude_session_id { + if &state.session_id != excluded { + event = EventV1::Logout; + } + } else { + event = EventV1::Logout; + } + } + } else { + let should_send = state.handle_incoming_event_v1(db, &mut event).await; + if !should_send { + continue; + } + } + + let result = write.lock().await.send(protocol_config.encode(&event)).await; + if let Err(e) = result { + use async_tungstenite::tungstenite::Error; + if !matches!(e, Error::AlreadyClosed | Error::ConnectionClosed) { + let err = format!("Error while sending an event: {e:?}"); + warn!("{}", err); + sentry::capture_message(&err, Level::Warning); + } + + cancel = true; + break; + } + + if let EventV1::Logout = event { + info!("User {} received log out event!", state.user_id); + cancel = true; + break; + } + + break; + } else { + cancel = true; + break; + } + } + } + } + + // Break out if cancelled + if cancel { + break; + } + } + + consumer.dispose_channel().await; +} diff --git a/crates/bonfire/src/client/worker.rs b/crates/bonfire/src/client/worker.rs new file mode 100644 index 000000000..394b9af50 --- /dev/null +++ b/crates/bonfire/src/client/worker.rs @@ -0,0 +1,124 @@ +use std::{collections::HashSet, sync::Arc}; + +use async_channel::{Receiver, Sender}; +use async_std::{ + net::TcpStream, + sync::{Mutex, RwLock}, +}; +use async_tungstenite::WebSocketStream; +use futures::{ + pin_mut, select, + stream::{SplitSink, SplitStream}, + FutureExt, SinkExt, TryStreamExt, +}; +use revolt_database::events::{client::EventV1, server::ClientMessage}; +use sentry::Level; + +use crate::{config::ProtocolConfiguration, events::state::State}; + +pub struct WorkerRef { + user_id: String, + active_servers: Arc>>, + subscribed: Arc>>, +} + +impl WorkerRef { + pub fn from(state: &State) -> WorkerRef { + WorkerRef { + user_id: state.user_id.clone(), + active_servers: state.active_servers.clone(), + subscribed: state.subscribed.clone(), + } + } +} + +/// Incoming message handling +pub async fn client_worker( + mut read: SplitStream>, + write: &Mutex, async_tungstenite::tungstenite::Message>>, + cancelled: Receiver<()>, + reload: Sender<()>, + config: &ProtocolConfiguration, + state: WorkerRef, +) { + loop { + let read = read.try_next().fuse(); + let cancelled = cancelled.recv().fuse(); + pin_mut!(read, cancelled); + + select! { + _ = cancelled => { return; }, + msg = read => { + let msg = match msg { + Ok(Some(msg)) => msg, + Ok(None) => { + warn!("Received a None message!"); + return; + } + Err(e) => { + use async_tungstenite::tungstenite::Error; + if !matches!(e, Error::AlreadyClosed | Error::ConnectionClosed | Error::Protocol(_)) { + let err = format!("Error while reading an event: {e:?}"); + warn!("{}", err); + sentry::capture_message(&err, Level::Warning); + } + + return; + } + }; + + let Ok(payload) = config.decode(&msg) else { + continue; + }; + + match payload { + ClientMessage::BeginTyping { channel } => { + if !state.subscribed.read().await.contains(&channel) { + continue; + } + + EventV1::ChannelStartTyping { + id: channel.clone(), + user: state.user_id.clone(), + } + .p(channel.clone()) + .await; + } + ClientMessage::EndTyping { channel } => { + if !state.subscribed.read().await.contains(&channel) { + continue; + } + + EventV1::ChannelStopTyping { + id: channel.clone(), + user: state.user_id.clone(), + } + .p(channel.clone()) + .await; + } + ClientMessage::Subscribe { server_id } => { + let mut servers = state.active_servers.lock().await; + let has_item = servers.contains_key(&server_id); + servers.insert(server_id, ()); + + if !has_item { + // Poke the listener to adjust subscriptions + reload.send(()).await.ok(); + } + } + ClientMessage::Ping { data, responded } => { + if responded.is_none() { + write + .lock() + .await + .send(config.encode(&EventV1::Pong { data })) + .await + .ok(); + } + } + _ => {} + } + } + } + } +} diff --git a/crates/bonfire/src/events/state.rs b/crates/bonfire/src/events/state.rs index eae398019..d737ec28e 100644 --- a/crates/bonfire/src/events/state.rs +++ b/crates/bonfire/src/events/state.rs @@ -66,6 +66,7 @@ impl Default for Cache { pub struct State { pub cache: Cache, + pub user_id: String, pub session_id: String, pub private_topic: String, pub state: SubscriptionStateChange, @@ -87,6 +88,7 @@ impl State { ..Default::default() }; + let user_id = user.id.clone(); cache.users.insert(user.id.clone(), user); State { @@ -96,6 +98,7 @@ impl State { Duration::from_secs(900), 5, ))), + user_id, session_id, private_topic, state: SubscriptionStateChange::Reset, diff --git a/crates/bonfire/src/main.rs b/crates/bonfire/src/main.rs index f4e8a3870..6b4b7bff0 100644 --- a/crates/bonfire/src/main.rs +++ b/crates/bonfire/src/main.rs @@ -6,10 +6,11 @@ use revolt_presence::clear_region; #[macro_use] extern crate log; -pub mod config; -pub mod events; +pub mod client; +mod config; mod database; +mod events; mod websocket; #[async_std::main] diff --git a/crates/bonfire/src/websocket.rs b/crates/bonfire/src/websocket.rs index 913245e9c..6207c6adb 100644 --- a/crates/bonfire/src/websocket.rs +++ b/crates/bonfire/src/websocket.rs @@ -1,40 +1,11 @@ -use std::{collections::HashSet, net::SocketAddr, sync::Arc}; +use std::net::SocketAddr; -use async_tungstenite::WebSocketStream; -use authifier::AuthifierEvent; -use fred::{ - error::RedisErrorKind, - interfaces::{ClientLike, EventInterface, PubsubInterface}, - types::RedisConfig, -}; -use futures::{ - channel::oneshot, - join, pin_mut, select, - stream::{SplitSink, SplitStream}, - FutureExt, SinkExt, StreamExt, TryStreamExt, -}; -use redis_kiss::{PayloadType, REDIS_PAYLOAD_TYPE, REDIS_URI}; -use revolt_config::report_internal_error; -use revolt_database::{ - events::{client::EventV1, server::ClientMessage}, - iso8601_timestamp::Timestamp, - Database, User, UserHint, -}; -use revolt_presence::{create_session, delete_session}; +use async_std::net::TcpStream; +use futures::channel::oneshot; +use revolt_database::Database; -use async_std::{ - net::TcpStream, - sync::{Mutex, RwLock}, - task::spawn, -}; -use revolt_result::create_error; -use sentry::Level; - -use crate::config::{ProtocolConfiguration, WebsocketHandshakeCallback}; -use crate::events::state::{State, SubscriptionStateChange}; - -type WsReader = SplitStream>; -type WsWriter = SplitSink, async_tungstenite::tungstenite::Message>; +use crate::client::core::client_core; +use crate::config::WebsocketHandshakeCallback; /// Start a new WebSocket client worker given access to the database, /// the relevant TCP stream and the remote address of the client. @@ -54,7 +25,7 @@ pub async fn client(db: &'static Database, stream: TcpStream, addr: SocketAddr) }; // Verify we've received a valid config, otherwise we should just drop the connection. - let Ok(mut config) = receiver.await else { + let Ok(config) = receiver.await else { return; }; @@ -64,446 +35,5 @@ pub async fn client(db: &'static Database, stream: TcpStream, addr: SocketAddr) config.get_protocol_format() ); - // Split the socket for simultaneously read and write. - let (mut write, mut read) = ws.split(); - - // If the user has not provided authentication, request information. - if config.get_session_token().is_none() { - while let Ok(Some(message)) = read.try_next().await { - if let Ok(ClientMessage::Authenticate { token }) = config.decode(&message) { - config.set_session_token(token); - break; - } - } - } - - // Try to authenticate the user. - let Some(token) = config.get_session_token().as_ref() else { - write - .send(config.encode(&EventV1::Error { - data: create_error!(InvalidSession), - })) - .await - .ok(); - return; - }; - - let (user, session_id) = match User::from_token(db, token, UserHint::Any).await { - Ok(user) => user, - Err(err) => { - write - .send(config.encode(&EventV1::Error { data: err })) - .await - .ok(); - return; - } - }; - - info!("User {addr:?} authenticated as @{}", user.username); - - db.update_session_last_seen(&session_id, Timestamp::now_utc()) - .await - .ok(); - - // Create local state. - let mut state = State::from(user, session_id); - let user_id = state.cache.user_id.clone(); - - // Notify socket we have authenticated. - if report_internal_error!(write.send(config.encode(&EventV1::Authenticated)).await).is_err() { - return; - } - - // Download required data to local cache and send Ready payload. - let ready_payload = match report_internal_error!( - state - .generate_ready_payload(db, config.get_ready_payload_fields()) - .await - ) { - Ok(ready_payload) => ready_payload, - Err(_) => return, - }; - - if report_internal_error!(write.send(config.encode(&ready_payload)).await).is_err() { - return; - } - - // Create presence session. - let (first_session, session_id) = create_session(&user_id, 0).await; - - // If this was the first session, notify other users that we just went online. - if first_session { - state.broadcast_presence_change(true).await; - } - - { - // Setup channels and mutexes - let write = Mutex::new(write); - let subscribed = state.subscribed.clone(); - let active_servers = state.active_servers.clone(); - let (topic_signal_s, topic_signal_r) = async_channel::unbounded(); - - // TODO: this needs to be rewritten - // Create channels through which the tasks can signal to each other they need to clean up - let (kill_signal_1_s, kill_signal_1_r) = async_channel::bounded(1); - let (kill_signal_2_s, kill_signal_2_r) = async_channel::bounded(1); - - // Create a PubSub connection to poll on. - let listener = listener_with_kill_signal( - db, - &mut state, - addr, - &config, - topic_signal_r, - kill_signal_1_r, - &write, - kill_signal_2_s, - ); - - // Read from WebSocket stream. - let worker = worker_with_kill_signal( - addr, - subscribed, - active_servers, - user_id.clone(), - &config, - topic_signal_s, - kill_signal_2_r, - read, - &write, - kill_signal_1_s, - ); - - join!(listener, worker); - } - // Clean up presence session. - let last_session = delete_session(&user_id, session_id).await; - - // If this was the last session, notify other users that we just went offline. - if last_session { - state.broadcast_presence_change(false).await; - } -} - -#[allow(clippy::too_many_arguments)] -async fn listener_with_kill_signal( - db: &'static Database, - state: &mut State, - addr: SocketAddr, - config: &ProtocolConfiguration, - topic_signal_r: async_channel::Receiver<()>, - kill_signal_r: async_channel::Receiver<()>, - write: &Mutex, - kill_signal_s: async_channel::Sender<()>, -) { - listener( - db, - state, - addr, - config, - topic_signal_r, - kill_signal_r, - write, - ) - .await; - kill_signal_s.send(()).await.ok(); -} - -async fn listener( - db: &'static Database, - state: &mut State, - addr: SocketAddr, - config: &ProtocolConfiguration, - topic_signal_r: async_channel::Receiver<()>, - kill_signal_r: async_channel::Receiver<()>, - write: &Mutex, -) { - let redis_config = RedisConfig::from_url(&REDIS_URI).unwrap(); - let subscriber = match report_internal_error!( - fred::types::Builder::from_config(redis_config).build_subscriber_client() - ) { - Ok(subscriber) => subscriber, - Err(_) => return, - }; - - if report_internal_error!(subscriber.init().await).is_err() { - return; - } - - // Handle Redis connection dropping - let (clean_up_s, clean_up_r) = async_channel::bounded(1); - let clean_up_s = Arc::new(Mutex::new(clean_up_s)); - subscriber.on_error(move |err| { - if let RedisErrorKind::Canceled = err.kind() { - let clean_up_s = clean_up_s.clone(); - spawn(async move { - clean_up_s.lock().await.send(()).await.ok(); - }); - } - - Ok(()) - }); - - let mut message_rx = subscriber.message_rx(); - 'out: loop { - // Check for state changes for subscriptions. - match state.apply_state().await { - SubscriptionStateChange::Reset => { - if report_internal_error!(subscriber.unsubscribe_all().await).is_err() { - break 'out; - } - - let subscribed = state.subscribed.read().await; - for id in subscribed.iter() { - if report_internal_error!(subscriber.subscribe(id).await).is_err() { - break 'out; - } - } - - #[cfg(debug_assertions)] - info!("{addr:?} has reset their subscriptions"); - } - SubscriptionStateChange::Change { add, remove } => { - for id in remove { - #[cfg(debug_assertions)] - info!("{addr:?} unsubscribing from {id}"); - - if report_internal_error!(subscriber.unsubscribe(id).await).is_err() { - break 'out; - } - } - - for id in add { - #[cfg(debug_assertions)] - info!("{addr:?} subscribing to {id}"); - - if report_internal_error!(subscriber.subscribe(id).await).is_err() { - break 'out; - } - } - } - SubscriptionStateChange::None => {} - } - - let t1 = message_rx.recv().fuse(); - let t2 = topic_signal_r.recv().fuse(); - let t3 = kill_signal_r.recv().fuse(); - let t4 = clean_up_r.recv().fuse(); - - pin_mut!(t1, t2, t3, t4); - - select! { - _ = t4 => { - break 'out; - }, - _ = t3 => { - break 'out; - }, - _ = t2 => {}, - message = t1 => { - // Handle incoming events. - let message = match report_internal_error!(message) { - Ok(message) => message, - Err(_) => break 'out - }; - - let event = match *REDIS_PAYLOAD_TYPE { - PayloadType::Json => message - .value - .as_str() - .and_then(|s| report_internal_error!(serde_json::from_str::(s.as_ref())).ok()), - PayloadType::Msgpack => message - .value - .as_bytes() - .and_then(|b| report_internal_error!(rmp_serde::from_slice::(b)).ok()), - PayloadType::Bincode => message - .value - .as_bytes() - .and_then(|b| report_internal_error!(bincode::deserialize::(b)).ok()), - }; - - let Some(mut event) = event else { - let err = format!( - "Failed to deserialise event for {}: `{:?}`", - message.channel, - message - .value - ); - - error!("{}", err); - sentry::capture_message(&err, Level::Error); - break 'out; - }; - - if let EventV1::Auth(auth) = &event { - if let AuthifierEvent::DeleteSession { session_id, .. } = auth { - if &state.session_id == session_id { - event = EventV1::Logout; - } - } else if let AuthifierEvent::DeleteAllSessions { - exclude_session_id, .. - } = auth - { - if let Some(excluded) = exclude_session_id { - if &state.session_id != excluded { - event = EventV1::Logout; - } - } else { - event = EventV1::Logout; - } - } - } else { - let should_send = state.handle_incoming_event_v1(db, &mut event).await; - if !should_send { - continue; - } - } - - let result = write.lock().await.send(config.encode(&event)).await; - if let Err(e) = result { - use async_tungstenite::tungstenite::Error; - if !matches!(e, Error::AlreadyClosed | Error::ConnectionClosed) { - let err = format!("Error while sending an event to {addr:?}: {e:?}"); - warn!("{}", err); - sentry::capture_message(&err, Level::Warning); - } - - break 'out; - } - - if let EventV1::Logout = event { - info!("User {addr:?} received log out event!"); - break 'out; - } - } - } - } - - report_internal_error!(subscriber.quit().await).ok(); -} - -#[allow(clippy::too_many_arguments)] -async fn worker_with_kill_signal( - addr: SocketAddr, - subscribed: Arc>>, - active_servers: Arc>>, - user_id: String, - config: &ProtocolConfiguration, - topic_signal_s: async_channel::Sender<()>, - kill_signal_r: async_channel::Receiver<()>, - read: WsReader, - write: &Mutex, - kill_signal_s: async_channel::Sender<()>, -) { - worker( - addr, - subscribed, - active_servers, - user_id, - config, - topic_signal_s, - kill_signal_r, - read, - write, - ) - .await; - kill_signal_s.send(()).await.ok(); -} - -#[allow(clippy::too_many_arguments)] -async fn worker( - addr: SocketAddr, - subscribed: Arc>>, - active_servers: Arc>>, - user_id: String, - config: &ProtocolConfiguration, - topic_signal_s: async_channel::Sender<()>, - kill_signal_r: async_channel::Receiver<()>, - mut read: WsReader, - write: &Mutex, -) { - loop { - let t1 = read.try_next().fuse(); - let t2 = kill_signal_r.recv().fuse(); - - pin_mut!(t1, t2); - - select! { - _ = t2 => { - return; - }, - result = t1 => { - let msg = match result { - Ok(Some(msg)) => msg, - Ok(None) => { - warn!("Received a None message!"); - sentry::capture_message("Received a None message!", Level::Warning); - return; - } - Err(e) => { - use async_tungstenite::tungstenite::Error; - if !matches!(e, Error::AlreadyClosed | Error::ConnectionClosed) { - let err = format!("Error while reading an event from {addr:?}: {e:?}"); - warn!("{}", err); - sentry::capture_message(&err, Level::Warning); - } - - return; - } - }; - - let Ok(payload) = config.decode(&msg) else { - continue; - }; - - match payload { - ClientMessage::BeginTyping { channel } => { - if !subscribed.read().await.contains(&channel) { - continue; - } - - EventV1::ChannelStartTyping { - id: channel.clone(), - user: user_id.clone(), - } - .p(channel.clone()) - .await; - } - ClientMessage::EndTyping { channel } => { - if !subscribed.read().await.contains(&channel) { - continue; - } - - EventV1::ChannelStopTyping { - id: channel.clone(), - user: user_id.clone(), - } - .p(channel.clone()) - .await; - } - ClientMessage::Subscribe { server_id } => { - let mut servers = active_servers.lock().await; - let has_item = servers.contains_key(&server_id); - servers.insert(server_id, ()); - - if !has_item { - // Poke the listener to adjust subscriptions - topic_signal_s.send(()).await.ok(); - } - } - ClientMessage::Ping { data, responded } => { - if responded.is_none() { - write - .lock() - .await - .send(config.encode(&EventV1::Pong { data })) - .await - .ok(); - } - } - _ => {} - } - } - } - } + client_core(db, ws, config).await; } diff --git a/crates/core/broker/Cargo.toml b/crates/core/broker/Cargo.toml new file mode 100644 index 000000000..57fc8c015 --- /dev/null +++ b/crates/core/broker/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "revolt-broker" +version = "0.8.8" +edition = "2024" +license = "AGPL-3.0-or-later" +authors = ["Paul Makles "] +description = "Revolt Backend: Event Broker" + +[dependencies] +# Utility +log = "0.4" +rand = "0.9.1" + +# RabbitMQ/AMQP client +lapin = "3.0.0" + +# Async runtime +async-std = { version = "1.8.0" } + +# Serialisation +serde = "1" +rmp-serde = "1.3.0" + +# Core +revolt-config = { version = "0.8.8", path = "../config" } diff --git a/crates/core/broker/src/event_stream/consumer.rs b/crates/core/broker/src/event_stream/consumer.rs new file mode 100644 index 000000000..59d52ff10 --- /dev/null +++ b/crates/core/broker/src/event_stream/consumer.rs @@ -0,0 +1,180 @@ +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; + +use async_std::stream::StreamExt; +use lapin::{ + Channel, Connection, + options::BasicAckOptions, + types::{AMQPValue, FieldArray, FieldTable, LongLongInt}, +}; +use log::info; +use rand::Rng; +use revolt_config::{capture_internal_error, config}; +use serde::de::DeserializeOwned; + +use crate::event_stream::{create_channel, get_connection}; + +pub struct Consumer { + #[allow(dead_code)] + conn: Arc, + channel: Channel, + tag: String, + topics: HashSet, + topics_changed: bool, + consumer: Option, + offset: Option, +} + +impl Consumer { + /// Create a new event stream consumer + pub async fn new() -> Consumer { + let config = config().await; + let conn = get_connection().await; + let channel = create_channel(&conn, config.rabbit.event_stream).await; + + Consumer { + conn, + channel, + tag: rand::rng() + .sample_iter::(&rand::distr::StandardUniform) + .take(32) + .collect(), + topics: HashSet::new(), + topics_changed: false, + consumer: None, + offset: None, + } + } + + /// Update the set of topics + pub fn set_topics(&mut self, topics: HashSet) { + self.topics = topics; + self.topics_changed = true; + } + + /// Get the current consumer + pub async fn ensure_consumer(&mut self) { + if self.topics_changed { + info!("Topics changed, disposing the consumer."); + self.dispose_consumer().await; + self.topics_changed = false; + } + + if self.consumer.is_none() { + info!("Creating a new consumer, tag={}", self.tag); + let config = config().await; + + // Build arguments for consumer + let mut args: FieldTable = Default::default(); + + // Configure stream filter to select topics we are listening for + { + let mut filter: FieldArray = Default::default(); + for topic in &self.topics { + filter.push(AMQPValue::LongString(topic.as_str().into())); + } + + args.insert("x-stream-filter".into(), AMQPValue::FieldArray(filter)); + } + + // Set stream offset if applicable + if let Some(offset) = self.offset { + args.insert("x-stream-offset".into(), AMQPValue::LongLongInt(offset)); + } + + // Create the consumer + self.consumer = Some( + self.channel + .basic_consume( + &config.rabbit.event_stream.queue, + &self.tag, + Default::default(), + args, + ) + .await + .unwrap(), + ); + } + } + + /// Close the active consumer if one exists + pub async fn dispose_consumer(&mut self) { + if let Some(consumer) = self.consumer.as_ref() { + if consumer.state().is_active() { + if let Err(err) = self + .channel + .basic_cancel(&self.tag, Default::default()) + .await + { + eprintln!("Failed to close consumer! {:?}", err); + } + + // is this necessary? + // else { + // Read the consumer to the end + // while let Some(delivery) = consumer.next().await { + // let delivery = delivery.expect("error in consumer"); + // delivery.ack(BasicAckOptions::default()).await.expect("ack"); + // } + // } + } + + self.consumer = None; + } + } + + /// Close the active channel + pub async fn dispose_channel(&mut self) { + // Close the channel -- don't do this actually + capture_internal_error!(self.channel.close(0, "closing channel").await); + } + + /// Get the next item + pub async fn next(&mut self) -> Option { + self.ensure_consumer().await; + + let consumer = self.consumer.as_mut().unwrap(); + + while let Some(Ok(delivery)) = consumer.next().await { + // Acknowledgement is required + delivery.ack(BasicAckOptions::default()).await.expect("ack"); + + // Parse the delivery headers + let headers: HashMap = delivery + .properties + .headers() + .as_ref() + .map(|table| { + table + .into_iter() + .map(|(k, v)| (k.to_string(), v.clone())) + .collect() + }) + .unwrap_or_default(); + + // Keep track of the current offset + let stream_offset = headers + .get("x-stream-offset") + .expect("`x-stream-offset` not present in message!"); + + self.offset = Some(stream_offset.as_long_long_int().unwrap() + 1); + + // Client-side topic filtering (broker uses Bloom filter so may have false-positives) + let filter_value = headers + .get("x-stream-filter-value") + .expect("`x-stream-filter-value` not present in message!") + .as_long_string() + .expect("`string`") + .to_string(); + + if self.topics.contains(&filter_value) { + // Deserialise the data + return Some(rmp_serde::from_slice(&delivery.data).expect("`data`")); + } + } + + None + } +} diff --git a/crates/core/broker/src/event_stream/mod.rs b/crates/core/broker/src/event_stream/mod.rs new file mode 100644 index 000000000..e29cd0ea5 --- /dev/null +++ b/crates/core/broker/src/event_stream/mod.rs @@ -0,0 +1,7 @@ +mod consumer; +mod pool; +mod publish; + +pub use consumer::Consumer; +pub use pool::{create_channel, get_connection}; +pub use publish::publish_event; diff --git a/crates/core/broker/src/event_stream/pool.rs b/crates/core/broker/src/event_stream/pool.rs new file mode 100644 index 000000000..6870d7b6d --- /dev/null +++ b/crates/core/broker/src/event_stream/pool.rs @@ -0,0 +1,99 @@ +use async_std::sync::Mutex; +use lapin::{ + Connection, + options::QueueDeclareOptions, + types::{AMQPValue, FieldTable}, +}; +use log::{debug, warn}; +use revolt_config::{RabbitEventStream, config}; +use std::sync::Arc; + +use crate::create_client; + +/// Get a handle to the event stream +pub async fn get_connection() -> Arc { + let config = config().await; + + static CONNECTIONS: Mutex>> = Mutex::new(Vec::new()); + + let mut connections = CONNECTIONS.lock().await; + connections.retain(|item| { + if item.status().connected() { + true + } else { + warn!( + "Dropping connection with status {:?}", + item.status().state() + ); + + false + } + }); + + debug!( + "Connections: {}, Clients: {:?}", + connections.len(), + connections + .iter() + .map(Arc::strong_count) + .collect::>() + ); + + for conn in connections.iter() { + if Arc::strong_count(conn) < config.rabbit.event_stream.channels_per_conn { + return conn.clone(); + } + } + + let conn = Arc::new(create_client().await); + + connections.push(conn.clone()); + conn +} + +/// Create a channel +pub async fn create_channel( + conn: &lapin::Connection, + event_stream: RabbitEventStream, +) -> lapin::Channel { + let channel = conn.create_channel().await.unwrap(); + + let mut args: FieldTable = Default::default(); + + args.insert( + // set queue type to stream + "x-queue-type".into(), + AMQPValue::LongString("stream".into()), + ); + + args.insert( + // max. size of the stream + "x-max-length-bytes".into(), + AMQPValue::LongLongInt(event_stream.stream_max_length_bytes), + ); + + args.insert( + // size of the Bloom filter + "x-stream-filter-size-bytes".into(), + AMQPValue::LongLongInt(event_stream.filter_size_bytes), + ); + + channel + .queue_declare( + &event_stream.queue, + QueueDeclareOptions { + durable: true, + ..Default::default() + }, + args, + ) + .await + .unwrap(); + + channel + .basic_qos(event_stream.qos_prefetch, Default::default()) + .await + .unwrap(); + + channel +} diff --git a/crates/core/broker/src/event_stream/publish.rs b/crates/core/broker/src/event_stream/publish.rs new file mode 100644 index 000000000..668d6b40e --- /dev/null +++ b/crates/core/broker/src/event_stream/publish.rs @@ -0,0 +1,37 @@ +use lapin::{ + Error, + protocol::basic::AMQPProperties, + publisher_confirm::PublisherConfirm, + types::{AMQPValue, FieldTable}, +}; +use revolt_config::config; +use serde::Serialize; + +use crate::event_stream::{create_channel, get_connection}; + +/// Publish an event to the message broker +pub async fn publish_event( + channel: &str, + data: &T, +) -> Result { + let config = config().await; + + let mut headers: FieldTable = Default::default(); + headers.insert( + "x-stream-filter-value".into(), + AMQPValue::LongString(channel.into()), + ); + + let conn = get_connection().await; + + create_channel(&conn, config.rabbit.event_stream.clone()) + .await + .basic_publish( + &config.rabbit.event_stream.exchange, + &config.rabbit.event_stream.queue, + Default::default(), + &rmp_serde::to_vec_named(data).unwrap(), + AMQPProperties::default().with_headers(headers), + ) + .await +} diff --git a/crates/core/broker/src/lib.rs b/crates/core/broker/src/lib.rs new file mode 100644 index 000000000..fb2b5c4a5 --- /dev/null +++ b/crates/core/broker/src/lib.rs @@ -0,0 +1,18 @@ +use revolt_config::config; + +pub mod event_stream; + +/// Create a lapin client +pub async fn create_client() -> lapin::Connection { + let config = config().await; + + lapin::Connection::connect( + &format!( + "amqp://{}:{}@{}:{}/%2f", + config.rabbit.username, config.rabbit.password, config.rabbit.host, config.rabbit.port + ), + Default::default(), + ) + .await + .unwrap() +} diff --git a/crates/core/config/Revolt.toml b/crates/core/config/Revolt.toml index e3a04f2ae..c1e868bcd 100644 --- a/crates/core/config/Revolt.toml +++ b/crates/core/config/Revolt.toml @@ -28,6 +28,20 @@ port = 5672 username = "rabbituser" password = "rabbitpass" +[rabbit.event_stream] +# Configuration for event brokerage +# Using default/direct exchange +exchange = "" +queue = "revolt.events" +# Number of channels that can be opened per single TCP connection +channels_per_conn = 128 +# Maximum size of the stream +stream_max_length_bytes = 5_000_000_000 +# Size of the Bloom filter +filter_size_bytes = 26 +# Number of messages to prefetch +qos_prefetch = 100 + [api] [api.registration] diff --git a/crates/core/config/src/lib.rs b/crates/core/config/src/lib.rs index 7c0504997..f315afbc6 100644 --- a/crates/core/config/src/lib.rs +++ b/crates/core/config/src/lib.rs @@ -108,12 +108,24 @@ pub struct Database { pub redis: String, } +#[derive(Deserialize, Debug, Clone)] +pub struct RabbitEventStream { + pub exchange: String, + pub queue: String, + pub channels_per_conn: usize, + pub stream_max_length_bytes: i64, + pub filter_size_bytes: i64, + pub qos_prefetch: u16, +} + #[derive(Deserialize, Debug, Clone)] pub struct Rabbit { pub host: String, pub port: u16, pub username: String, pub password: String, + + pub event_stream: RabbitEventStream, } #[derive(Deserialize, Debug, Clone)] diff --git a/crates/core/database/Cargo.toml b/crates/core/database/Cargo.toml index 7f2e09730..854ce60e4 100644 --- a/crates/core/database/Cargo.toml +++ b/crates/core/database/Cargo.toml @@ -37,6 +37,7 @@ revolt-permissions = { version = "0.8.8", path = "../permissions", features = [ "bson", ] } revolt-parser = { version = "0.8.8", path = "../parser" } +revolt-broker = { version = "0.8.8", path = "../broker" } # Utility log = "0.4" @@ -56,6 +57,7 @@ isahc = { optional = true, version = "1.7", features = ["json"] } # Serialisation serde_json = "1" +rmp-serde = "1.0.0" revolt_optional_struct = "0.2.0" serde = { version = "1", features = ["derive"] } iso8601-timestamp = { version = "0.2.10", features = ["serde", "bson"] } @@ -101,3 +103,4 @@ authifier = { version = "1.0.15", features = ["rocket_impl"] } # RabbitMQ amqprs = { version = "1.7.0" } +lapin = { version = "3.0.0" } diff --git a/crates/core/database/src/events/client.rs b/crates/core/database/src/events/client.rs index a887e9f12..b3bfb4e59 100644 --- a/crates/core/database/src/events/client.rs +++ b/crates/core/database/src/events/client.rs @@ -1,4 +1,5 @@ use authifier::AuthifierEvent; +use revolt_broker::event_stream; use revolt_result::Error; use serde::{Deserialize, Serialize}; @@ -253,14 +254,16 @@ pub enum EventV1 { impl EventV1 { /// Publish helper wrapper pub async fn p(self, channel: String) { - #[cfg(not(debug_assertions))] - redis_kiss::p(channel, self).await; - #[cfg(debug_assertions)] info!("Publishing event to {channel}: {self:?}"); + let result = event_stream::publish_event(&channel, &self).await; + + #[cfg(not(debug_assertions))] + result.ok(); + #[cfg(debug_assertions)] - redis_kiss::publish(channel, self).await.unwrap(); + result.unwrap(); } /// Publish user event diff --git a/crates/delta/Cargo.toml b/crates/delta/Cargo.toml index d07b44bb9..2bf59e473 100644 --- a/crates/delta/Cargo.toml +++ b/crates/delta/Cargo.toml @@ -32,6 +32,7 @@ ulid = "0.4.1" nanoid = "0.4.0" # serde +rmp-serde = "1.0.0" serde_json = "1.0.57" serde = { version = "1.0.115", features = ["derive"] } validator = { version = "0.16", features = ["derive"] } @@ -65,10 +66,12 @@ revolt_rocket_okapi = { version = "0.10.0", features = ["swagger"] } # rabbit amqprs = { version = "1.7.0" } +lapin = { version = "3.0.0" } # core authifier = "1.0.15" revolt-config = { path = "../core/config" } +revolt-broker = { path = "../core/broker" } revolt-database = { path = "../core/database", features = [ "rocket-impl", "redis-is-patched", diff --git a/crates/delta/src/util/test.rs b/crates/delta/src/util/test.rs index 067f8bd0a..88a18f8e8 100644 --- a/crates/delta/src/util/test.rs +++ b/crates/delta/src/util/test.rs @@ -1,10 +1,13 @@ +use std::collections::HashMap; + use authifier::{ models::{Account, EmailVerification, Session}, Authifier, }; use futures::StreamExt; +use lapin::types::AMQPValue; use rand::Rng; -use redis_kiss::redis::aio::PubSub; +use revolt_broker::event_stream; use revolt_database::{ events::client::EventV1, Channel, Database, Member, Message, Server, User, AMQP, }; @@ -19,7 +22,7 @@ pub struct TestHarness { authifier: Authifier, pub db: Database, pub amqp: AMQP, - sub: PubSub, + consumer: lapin::Consumer, event_buffer: Vec<(String, EventV1)>, } @@ -31,11 +34,19 @@ impl TestHarness { .await .expect("valid rocket instance"); - let mut sub = redis_kiss::open_pubsub_connection() - .await - .expect("`PubSub`"); + let tag: String = rand::thread_rng() + .sample_iter::(&rand::distributions::Standard) + .take(32) + .collect(); + + static QUEUE_NAME: &str = "revolt.events"; + let conn = event_stream::get_connection().await; - sub.psubscribe("*").await.unwrap(); + let consumer = event_stream::create_channel(&conn, config.rabbit.event_stream) + .await + .basic_consume(QUEUE_NAME, &tag, Default::default(), Default::default()) + .await + .unwrap(); let db = client .rocket() @@ -68,7 +79,7 @@ impl TestHarness { authifier, db, amqp, - sub, + consumer, event_buffer: vec![], } } @@ -227,16 +238,33 @@ impl TestHarness { } } - let mut stream = self.sub.on_message(); - while let Some(item) = stream.next().await { - let msg_topic = item.get_channel_name(); - let payload: EventV1 = redis_kiss::decode_payload(&item).unwrap(); - - if topic == msg_topic && predicate(&payload) { + while let Some(Ok(delivery)) = self.consumer.next().await { + let headers: HashMap = delivery + .properties + .headers() + .as_ref() + .map(|table| { + table + .into_iter() + .map(|(k, v)| (k.to_string(), v.clone())) + .collect() + }) + .unwrap_or_default(); + + let filter_value = headers + .get("x-stream-filter-value") + .expect("`x-stream-filter-value` not present in message!") + .as_long_string() + .unwrap() + .to_string(); + + let payload: EventV1 = rmp_serde::from_slice(&delivery.data).unwrap(); + + if topic == filter_value && predicate(&payload) { return payload; } - self.event_buffer.push((msg_topic.to_string(), payload)); + self.event_buffer.push((filter_value, payload)); } // WARNING: if predicate is never satisfied, this will never return