diff --git a/README.md b/README.md index 04e27db..beeddba 100644 --- a/README.md +++ b/README.md @@ -59,6 +59,15 @@ INFO:saluki: 0 - low:7515, high:7551, num_messages:36 `saluki play mybroker:9092/source_topic mybroker:9092/dest_topic -t 1762209990 1762209992` - This will forward messages between the two given timestamps. +## `howl` - Produce fake run-like messages + +`saluki-howl` emits `ev44` events, `pl72` run starts, and `6s4t` run stops to Kafka, in a format which +look somewhat like a real run. + +``` +saluki howl mybroker:9092 SOME_PREFIX +``` + # Developer setup `pip install -e .[dev]` diff --git a/pyproject.toml b/pyproject.toml index a74c110..2326bf2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,7 +9,8 @@ dependencies = [ "ess-streaming-data-types", "confluent-kafka>=2.12.1", # for produce_batch in play() "python-dateutil", - "tzdata" + "tzdata", + "numpy", ] readme = {file = "README.md", content-type = "text/markdown"} license-files = ["LICENSE"] diff --git a/src/saluki/howl.py b/src/saluki/howl.py new file mode 100644 index 0000000..27991cf --- /dev/null +++ b/src/saluki/howl.py @@ -0,0 +1,197 @@ +import json +import logging +import time +import uuid + +import numpy as np +from confluent_kafka import Producer +from streaming_data_types import serialise_6s4t, serialise_ev44, serialise_pl72 +from streaming_data_types.run_start_pl72 import DetectorSpectrumMap + +logger = logging.getLogger("saluki") + +RNG = np.random.default_rng() + + +def generate_fake_events( + msg_id: int, + events_per_message: int, + tof_peak: float, + tof_sigma: float, + det_min: int, + det_max: int, + timestamp: float, +) -> bytes: + detector_ids = RNG.integers(low=det_min, high=det_max, size=events_per_message) + tofs = np.maximum(0.0, RNG.normal(loc=tof_peak, scale=tof_sigma, size=events_per_message)) + tofs.sort() + + return serialise_ev44( + source_name="saluki", + reference_time=[timestamp * 1_000_000_000], + message_id=msg_id, + reference_time_index=[0], + time_of_flight=tofs, + pixel_id=detector_ids, + ) + + +def generate_run_start(det_max: int) -> bytes: + det_spec_map = DetectorSpectrumMap( + detector_ids=np.arange(0, det_max, dtype=np.int32), + spectrum_numbers=np.arange(0, det_max, dtype=np.int32), + n_spectra=det_max, + ) + return serialise_pl72( + start_time=int(time.time() * 1000), + stop_time=None, + run_name=f"saluki-howl-{uuid.uuid4()}", + instrument_name="saluki-howl", + nexus_structure=json.dumps({}), + job_id=str(uuid.uuid4()), + filename=str(uuid.uuid4()), + detector_spectrum_map=det_spec_map, + ) + + +def generate_run_stop() -> bytes: + return serialise_6s4t( + stop_time=int(time.time() * 1000), + job_id=str(uuid.uuid4()), + ) + + +def make_producer(broker: str) -> Producer: + return Producer( + { + "bootstrap.servers": broker, + "queue.buffering.max.kbytes": 1024 * 1024, + "queue.buffering.max.messages": 100000, + "linger.ms": 10, + "batch.num.messages": 10_000, + "max.in.flight.requests.per.connection": 32, + "acks": 1, + }, + ) + + +def produce_messages( + producer: Producer, + topic_prefix: str, + frame: int, + events_per_message: int, + messages_per_frame: int, + frames_per_run: int, + tof_peak: float, + tof_sigma: float, + det_min: int, + det_max: int, +) -> None: + now = time.time() + ev44 = generate_fake_events( + frame, + events_per_message, + tof_peak, + tof_sigma, + det_min, + det_max, + timestamp=now, + ) + + for _ in range(messages_per_frame): + producer.produce( + topic=f"{topic_prefix}_rawEvents", + key=None, + value=ev44, + timestamp=int(now * 1000), + ) + producer.poll(0) + + if frames_per_run != 0 and frame % frames_per_run == 0: + logger.info(f"Starting new run after {frames_per_run} simulated frames") + producer.produce( + topic=f"{topic_prefix}_runInfo", + key=None, + value=generate_run_stop(), + timestamp=int(now * 1000), + ) + producer.produce( + topic=f"{topic_prefix}_runInfo", + key=None, + value=generate_run_start(det_max), + timestamp=int(now * 1000), + ) + + +def howl( + broker: str, + topic_prefix: str, + events_per_message: int, + messages_per_frame: int, + frames_per_second: int, + frames_per_run: int, + tof_peak: float, + tof_sigma: float, + det_min: int, + det_max: int, +) -> None: # pragma: no cover (infinite loop) + """ + Send messages vaguely resembling a run to Kafka. + """ + producer = make_producer(broker) + + target_frame_time = 1 / frames_per_second + + frames = 0 + + ev44_size = len( + generate_fake_events( + 0, + events_per_message, + tof_peak, + tof_sigma, + det_min, + det_max, + timestamp=time.time(), + ) + ) + rate_bytes_per_sec = ev44_size * messages_per_frame * frames_per_second + rate_mbit_per_sec = (rate_bytes_per_sec / 1024**2) * 8 + + logger.info( + f"Attempting to simulate data rate: {rate_mbit_per_sec:.3f} Mbit/s " + f"({rate_mbit_per_sec / 8:.3f} MiB/s)" + ) + logger.info(f"Each ev44 is {ev44_size} bytes") + + producer.produce( + topic=f"{topic_prefix}_runInfo", + key=None, + value=generate_run_start(det_max), + ) + + target_time = time.time() + + while True: + target_time += target_frame_time + frames += 1 + + produce_messages( + producer, + topic_prefix, + frames, + events_per_message, + messages_per_frame, + frames_per_run, + tof_peak, + tof_sigma, + det_min, + det_max, + ) + + sleep_time = target_time - time.time() + + if sleep_time > 0: + time.sleep(sleep_time) + else: + logger.warning(f"saluki-howl running {abs(sleep_time):.3f} seconds behind schedule") diff --git a/src/saluki/listen.py b/src/saluki/listen.py index 82a205d..e7c942e 100644 --- a/src/saluki/listen.py +++ b/src/saluki/listen.py @@ -26,8 +26,10 @@ def listen( { "bootstrap.servers": broker, "group.id": f"saluki-listen-{uuid.uuid4()}", - "auto.offset.reset": "latest", "enable.auto.commit": False, + "fetch.message.max.bytes": 512 * 1024**2, # 512MB + "fetch.max.bytes": 512 * 1024**2, # 512MB + "max.partition.fetch.bytes": 512 * 1024**2, # 512MB } ) c.subscribe([topic]) diff --git a/src/saluki/main.py b/src/saluki/main.py index 1bcf97b..e54500f 100644 --- a/src/saluki/main.py +++ b/src/saluki/main.py @@ -3,6 +3,7 @@ import sys from saluki.consume import consume +from saluki.howl import howl from saluki.listen import listen from saluki.play import play from saluki.sniff import sniff @@ -15,6 +16,7 @@ _CONSUME = "consume" _PLAY = "play" _SNIFF = "sniff" +_HOWL = "howl" def main() -> None: @@ -52,7 +54,9 @@ def main() -> None: _SNIFF, help="sniff - broker metadata", parents=[common_options] ) sniff_parser.add_argument( - "broker", type=str, help="broker, optionally suffixed with a topic name to filter to" + "broker", + type=str, + help="broker, optionally suffixed with a topic name to filter to", ) consumer_parser = argparse.ArgumentParser(add_help=False) @@ -65,7 +69,9 @@ def main() -> None: ) consumer_mode_parser = sub_parsers.add_parser( - _CONSUME, help="consumer mode", parents=[topic_parser, consumer_parser, common_options] + _CONSUME, + help="consumer mode", + parents=[topic_parser, consumer_parser, common_options], ) consumer_mode_parser.add_argument( "-m", @@ -120,6 +126,43 @@ def main() -> None: nargs=2, ) + howl_parser = sub_parsers.add_parser( + _HOWL, + help="replay mode - replay data into another topic", + parents=[common_options], + ) + howl_parser.add_argument("broker", type=str, help="Kafka broker URL") + howl_parser.add_argument("topic_prefix", type=str, help="Topic prefix e.g. INSTNAME") + howl_parser.add_argument( + "--events-per-message", + type=int, + help="Events per ev44 to simulate", + default=100, + ) + howl_parser.add_argument( + "--messages-per-frame", + type=int, + help="Number of ev44 per frame to simulate", + default=20, + ) + howl_parser.add_argument( + "--frames-per-second", type=int, help="Frames per second to simulate", default=1 + ) + howl_parser.add_argument( + "--frames-per-run", + type=int, + help="Frames to take before beginning new run (0 to run forever)", + default=0, + ) + howl_parser.add_argument( + "--tof-peak", type=float, help="Time-of-flight peak (ns)", default=10_000_000 + ) + howl_parser.add_argument( + "--tof-sigma", type=float, help="Time-of-flight sigma (ns)", default=2_000_000 + ) + howl_parser.add_argument("--det-min", type=int, help="Minimum detector ID", default=0) + howl_parser.add_argument("--det-max", type=int, help="Maximum detector ID", default=1000) + if len(sys.argv) == 1: parser.print_help() sys.exit(1) @@ -169,6 +212,19 @@ def main() -> None: except RuntimeError: logger.debug(f"Sniffing whole broker {args.broker}") sniff(args.broker) + elif args.command == _HOWL: + howl( + args.broker, + args.topic_prefix, + events_per_message=args.events_per_message, + messages_per_frame=args.messages_per_frame, + frames_per_second=args.frames_per_second, + frames_per_run=args.frames_per_run, + tof_peak=args.tof_peak, + tof_sigma=args.tof_sigma, + det_min=args.det_min, + det_max=args.det_max, + ) if __name__ == "__main__": diff --git a/src/saluki/play.py b/src/saluki/play.py index 1f058e8..1bb7119 100644 --- a/src/saluki/play.py +++ b/src/saluki/play.py @@ -70,7 +70,8 @@ def play( logger.debug(f"finished consuming {num_messages} messages") consumer.close() producer.produce_batch( - dest_topic, [{"key": message.key(), "value": message.value()} for message in msgs] + dest_topic, + [{"key": message.key(), "value": message.value()} for message in msgs], ) logger.debug(f"flushing producer. len(p): {len(producer)}") producer.flush(timeout=10) diff --git a/src/saluki/utils.py b/src/saluki/utils.py index d033a57..62845ef 100644 --- a/src/saluki/utils.py +++ b/src/saluki/utils.py @@ -34,7 +34,9 @@ def fallback_deserialiser(payload: bytes) -> str: def deserialise_and_print_messages( - msgs: List[Message], partition: int | None, schemas_to_filter_to: list[str] | None = None + msgs: List[Message], + partition: int | None, + schemas_to_filter_to: list[str] | None = None, ) -> None: for msg in msgs: try: diff --git a/tests/test_howl.py b/tests/test_howl.py new file mode 100644 index 0000000..8a2ff23 --- /dev/null +++ b/tests/test_howl.py @@ -0,0 +1,90 @@ +from unittest.mock import ANY, MagicMock, call + +import numpy as np +from confluent_kafka.cimpl import Producer +from streaming_data_types import deserialise_6s4t, deserialise_ev44, deserialise_pl72 + +from saluki.howl import ( + generate_fake_events, + generate_run_start, + generate_run_stop, + make_producer, + produce_messages, +) + + +def test_generate_run_start(): + pl72 = deserialise_pl72(generate_run_start(50000)) + det_spec_map = pl72.detector_spectrum_map + assert det_spec_map is not None + assert det_spec_map.n_spectra == 50000 + + +def test_generate_run_stop(): + deserialise_6s4t(generate_run_stop()) + + +def test_generate_events(): + ev44 = deserialise_ev44( + generate_fake_events( + msg_id=123, + events_per_frame=1, + tof_peak=12345, + tof_sigma=0, + det_min=5, + det_max=6, + ) + ) + assert ev44.message_id == 123 + assert ev44.pixel_id == np.array([5], dtype=np.int32) + assert ev44.time_of_flight == np.array([12345], dtype=np.int32) + + +def test_make_producer(): + # Just test it doesn't crash - can't usefully test much more than that + make_producer("127.0.0.1") + + +def test_produce_event_messages(): + producer = MagicMock(spec=Producer) + + produce_messages( + producer, + "some_prefix", + frame=1, + frames_per_run=10, + events_per_frame=1, + tof_peak=12345, + tof_sigma=0, + det_min=5, + det_max=6, + ) + + producer.produce.assert_called_once_with( + topic="some_prefix_events", key=None, value=ANY, timestamp=ANY + ) + + +def test_produce_runinfo_messages(): + producer = MagicMock(spec=Producer) + + produce_messages( + producer, + "some_prefix", + frame=10, + frames_per_run=10, + events_per_frame=1, + tof_peak=12345, + tof_sigma=0, + det_min=5, + det_max=6, + ) + + # event followed by run stop and run start pair. + producer.produce.assert_has_calls( + [ + call(topic="some_prefix_events", key=None, value=ANY, timestamp=ANY), + call(topic="some_prefix_runInfo", key=None, value=ANY, timestamp=ANY), + call(topic="some_prefix_runInfo", key=None, value=ANY, timestamp=ANY), + ] + ) diff --git a/tests/test_utils.py b/tests/test_utils.py index 263c280..cf19970 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -78,7 +78,9 @@ def test_deserialising_message_which_raises_does_not_stop_loop(mock_message): assert logger.info.call_count == 1 -def test_deserialising_with_schema_list_ignores_messages_with_schema_not_in_list(mock_message): +def test_deserialising_with_schema_list_ignores_messages_with_schema_not_in_list( + mock_message, +): with patch("saluki.utils.logger") as logger: ok_message = Mock(spec=Message) ok_message.value.return_value = serialise_fc00(config_change=1, streams=[]) # type: ignore @@ -179,7 +181,8 @@ def test_uri_with_no_topic(): @pytest.mark.parametrize( - "timestamp", ["2025-11-19T15:27:11", "2025-11-19T15:27:11Z", "2025-11-19T15:27:11+00:00"] + "timestamp", + ["2025-11-19T15:27:11", "2025-11-19T15:27:11Z", "2025-11-19T15:27:11+00:00"], ) def test_parses_datetime_properly_with_string(timestamp): assert dateutil_parsable_or_unix_timestamp(timestamp) == 1763566031000