From 9bf636c7592562afc7a3ddeb6f31807955823fd0 Mon Sep 17 00:00:00 2001 From: "Anthr@X" Date: Mon, 7 Apr 2025 04:18:30 +1000 Subject: [PATCH 1/7] add aws sqs alert --- elastalert/alerters/sqs.py | 60 +++++++++++++ tests/sqs_test.py | 177 +++++++++++++++++++++++++++++++++++++ 2 files changed, 237 insertions(+) create mode 100644 elastalert/alerters/sqs.py create mode 100644 tests/sqs_test.py diff --git a/elastalert/alerters/sqs.py b/elastalert/alerters/sqs.py new file mode 100644 index 000000000..8d451a737 --- /dev/null +++ b/elastalert/alerters/sqs.py @@ -0,0 +1,60 @@ +import boto3 +import json + +from elastalert.alerts import Alerter +from elastalert.util import elastalert_logger, EAException + + +class SqsAlerter(Alerter): + """Send alert using AWS SQS service""" + + required_options = frozenset(["sqs_queue_url"]) + + def __init__(self, *args): + super(SqsAlerter, self).__init__(*args) + self.sqs_queue_url = self.rule.get("sqs_queue_url", None) + self.sqs_aws_access_key_id = self.rule.get("sqs_aws_access_key_id") + self.sqs_aws_secret_access_key = self.rule.get("sqs_aws_secret_access_key") + self.sqs_aws_region = self.rule.get("sqs_aws_region", "us-east-1") + self.profile = self.rule.get("sqs_aws_profile", None) + + def alert(self, matches): + # Create the alert as a JSON object + alert_data = { + "rule_name": self.rule["name"], + "matches": matches, + } + alert_text = self.create_alert_body(matches) + # SQS message body limit is 256000 KB, set to 128KB to be safe + if len(alert_text) > 128000: + alert_text = alert_text[:128000] + alert_text += "\n*message was cropped according to SQS limits!*" + alert_data["text"] = alert_text + body = json.dumps(alert_data) # Convert the alert data to JSON + + # If the body is still too long, remove the text field + if len(body) > 256000: + alert_data["text"] = "Text message omitted due to SQS size limit." + body = json.dumps(alert_data) + try: + if self.profile is None: + session = boto3.Session( + aws_access_key_id=self.sqs_aws_access_key_id, + aws_secret_access_key=self.sqs_aws_secret_access_key, + region_name=self.sqs_aws_region, + ) + else: + session = boto3.Session(profile_name=self.profile) + + sqs_client = session.client("sqs") + + sqs_client.send_message( + QueueUrl=self.sqs_queue_url, + MessageBody=body, # Send the JSON body directly + ) + except Exception as e: + raise EAException("Error sending Amazon SQS: %s" % e) + elastalert_logger.info("Sent Amazon SQS message to %s" % (self.sqs_queue_url)) + + def get_info(self): + return {"type": "sqs"} diff --git a/tests/sqs_test.py b/tests/sqs_test.py new file mode 100644 index 000000000..290d5b806 --- /dev/null +++ b/tests/sqs_test.py @@ -0,0 +1,177 @@ +import pytest + +from elastalert.alerters.sqs import SqsAlerter +from elastalert.loaders import FileRulesLoader +from elastalert.util import EAException + + +def test_sqs_getinfo(): + rule = { + "name": "Test Rule", + "type": "any", + "sqs_queue_url": "https://sqs.us-east-1.amazonaws.com/123456789012/my-queue", + "sqs_aws_access_key_id": "access key id", + "sqs_aws_secret_access_key": "secret access key", + "alert": [], + } + rules_loader = FileRulesLoader({}) + rules_loader.load_modules(rule) + alert = SqsAlerter(rule) + + expected_data = {"type": "sqs"} + actual_data = alert.get_info() + assert expected_data == actual_data + + +@pytest.mark.parametrize( + "sqs_queue_url, expected_data", + [ + ("", "Missing required option(s): sqs_queue_url"), + ("https://sqs.us-east-1.amazonaws.com/123456789012/my-queue", {"type": "sqs"}), + ], +) +def test_sqs_required_error(sqs_queue_url, expected_data): + try: + rule = {"name": "Test Rule", "type": "any", "alert": []} + + if sqs_queue_url: + rule["sqs_queue_url"] = sqs_queue_url + + rules_loader = FileRulesLoader({}) + rules_loader.load_modules(rule) + alert = SqsAlerter(rule) + + actual_data = alert.get_info() + assert expected_data == actual_data + except Exception as ea: + assert expected_data in str(ea) + + +def test_sqs_ea_exception(): + with pytest.raises(EAException) as ea: + rule = { + "name": "Test Rule", + "type": "any", + "sqs_queue_url": "https://sqs.us-east-1.amazonaws.com/123456789012/my-queue", + "sqs_aws_access_key_id": "access key id", + "sqs_aws_secret_access_key": "secret access key", + "alert": [], + } + match = { + "@timestamp": "2021-01-10T00:00:00", + "sender_ip": "1.1.1.1", + "hostname": "aProbe", + } + rules_loader = FileRulesLoader({}) + rules_loader.load_modules(rule) + alert = SqsAlerter(rule) + alert.alert([match]) + + assert "Error sending Amazon SQS: " in str(ea) + + +def test_sqs_message_size_limit(): + rule = { + "name": "Test Rule", + "type": "any", + "sqs_queue_url": "https://sqs.us-east-1.amazonaws.com/123456789012/my-queue", + "sqs_aws_access_key_id": "access key id", + "sqs_aws_secret_access_key": "secret access key", + "alert": [], + } + + # Create a large match object to test message size limiting + match = { + "@timestamp": "2021-01-10T00:00:00", + "sender_ip": "1.1.1.1", + "hostname": "aProbe", + "large_field": "x" * 200000, # Create a field larger than the 128KB limit + } + + rules_loader = FileRulesLoader({}) + rules_loader.load_modules(rule) + alert = SqsAlerter(rule) + + # Mock the create_alert_body method to return a large string + original_method = alert.create_alert_body + alert.create_alert_body = lambda matches: "x" * 150000 + + # Mock boto3 session and SQS client to avoid actual AWS calls + class MockSQSClient: + def send_message(self, QueueUrl, MessageBody): + # Verify message body contains truncation message + assert len(MessageBody) <= 256000 + assert ( + "message was cropped according to SQS limits" in MessageBody + or "Text message omitted due to SQS size limit" in MessageBody + ) + return {} + + class MockSession: + def __init__( + self, + aws_access_key_id=None, + aws_secret_access_key=None, + region_name=None, + profile_name=None, + **kwargs + ): + pass + + def client(self, service_name): + return MockSQSClient() + + import boto3 + + original_session = boto3.Session + boto3.Session = MockSession + + try: + # This should not raise an exception + alert.alert([match]) + finally: + # Restore original methods + alert.create_alert_body = original_method + boto3.Session = original_session + + +def test_sqs_aws_profile(): + rule = { + "name": "Test Rule", + "type": "any", + "sqs_queue_url": "https://sqs.us-east-1.amazonaws.com/123456789012/my-queue", + "sqs_aws_profile": "test-profile", + "alert": [], + } + + rules_loader = FileRulesLoader({}) + rules_loader.load_modules(rule) + alert = SqsAlerter(rule) + + # Mock boto3 session to verify profile is used + session_args = {} + + class MockSession: + def __init__(self, **kwargs): + nonlocal session_args + session_args = kwargs + + def client(self, service_name): + class MockClient: + def send_message(self, QueueUrl, MessageBody): + return {} + + return MockClient() + + import boto3 + + original_session = boto3.Session + boto3.Session = MockSession + + try: + match = {"@timestamp": "2021-01-10T00:00:00"} + alert.alert([match]) + assert "profile_name" in session_args + assert session_args["profile_name"] == "test-profile" + finally: + boto3.Session = original_session From 0ffb870015b92d435c720d9c11162da2b5073664 Mon Sep 17 00:00:00 2001 From: "Anthr@X" Date: Mon, 7 Apr 2025 04:24:44 +1000 Subject: [PATCH 2/7] move sqs_test.py --- tests/{ => alerters}/sqs_test.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename tests/{ => alerters}/sqs_test.py (100%) diff --git a/tests/sqs_test.py b/tests/alerters/sqs_test.py similarity index 100% rename from tests/sqs_test.py rename to tests/alerters/sqs_test.py From 5d19ca8e926e99813be50d7634eae447c9a38f8c Mon Sep 17 00:00:00 2001 From: "Anthr@X" Date: Mon, 2 Mar 2026 13:07:01 +1100 Subject: [PATCH 3/7] Add SQS Alerter and update schema for AWS SQS integration - Introduced SqsAlerter class for handling alerts via AWS SQS. - Updated schema.yaml to include SQS configuration properties. - Adjusted SqsAlerter to ensure message body adheres to SQS size limits. --- elastalert/alerters/sqs.py | 6 +++--- elastalert/loaders.py | 2 ++ elastalert/schema.yaml | 7 +++++++ 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/elastalert/alerters/sqs.py b/elastalert/alerters/sqs.py index 8d451a737..e7b3340ee 100644 --- a/elastalert/alerters/sqs.py +++ b/elastalert/alerters/sqs.py @@ -25,17 +25,17 @@ def alert(self, matches): "matches": matches, } alert_text = self.create_alert_body(matches) - # SQS message body limit is 256000 KB, set to 128KB to be safe + # SQS message body limit is 256 KB; crop text at 128 KB to be safe if len(alert_text) > 128000: alert_text = alert_text[:128000] alert_text += "\n*message was cropped according to SQS limits!*" alert_data["text"] = alert_text - body = json.dumps(alert_data) # Convert the alert data to JSON + body = json.dumps(alert_data, default=str) # If the body is still too long, remove the text field if len(body) > 256000: alert_data["text"] = "Text message omitted due to SQS size limit." - body = json.dumps(alert_data) + body = json.dumps(alert_data, default=str) try: if self.profile is None: session = boto3.Session( diff --git a/elastalert/loaders.py b/elastalert/loaders.py index 9e664f73c..a27d59930 100644 --- a/elastalert/loaders.py +++ b/elastalert/loaders.py @@ -51,6 +51,7 @@ from elastalert.alerters.pagerduty import PagerDutyAlerter from elastalert.alerters.slack import SlackAlerter from elastalert.alerters.sns import SnsAlerter +from elastalert.alerters.sqs import SqsAlerter from elastalert.alerters.teams import MsTeamsAlerter from elastalert.alerters.powerautomate import MsPowerAutomateAlerter from elastalert.alerters.yzj import YzjAlerter @@ -114,6 +115,7 @@ class RulesLoader(object): 'debug': elastalert.alerters.debug.DebugAlerter, 'command': elastalert.alerters.command.CommandAlerter, 'sns': SnsAlerter, + 'sqs': SqsAlerter, 'ms_teams': MsTeamsAlerter, 'ms_power_automate': MsPowerAutomateAlerter, 'slack': SlackAlerter, diff --git a/elastalert/schema.yaml b/elastalert/schema.yaml index 64241ec05..7a22a315a 100644 --- a/elastalert/schema.yaml +++ b/elastalert/schema.yaml @@ -442,6 +442,13 @@ properties: sns_aws_region: {type: string} sns_aws_profile: {type: string} + ### AWS SQS + sqs_queue_url: {type: string} + sqs_aws_access_key_id: {type: string} + sqs_aws_secret_access_key: {type: string} + sqs_aws_region: {type: string} + sqs_aws_profile: {type: string} + ### Chatwork chatwork_apikey: {type: string} chatwork_room_id: {type: string} From 1b355248544e9786293b1d586245b170f434ada8 Mon Sep 17 00:00:00 2001 From: "Anthr@X" Date: Mon, 2 Mar 2026 13:13:59 +1100 Subject: [PATCH 4/7] Add AWS SQS alerter Add new SqsAlerter for sending alerts via AWS SQS queues, modeled after the existing SnsAlerter. Supports authentication via access key/secret or named AWS profile, and handles SQS 256 KB message size limit with automatic text truncation. - New alerter class in elastalert/alerters/sqs.py - Registered 'sqs' alert type in loaders.py - Added SQS options to schema.yaml - Added unit tests in tests/alerters/sqs_test.py Made-with: Cursor --- elastalert/alerters/sqs.py | 60 +++++++++++++ elastalert/loaders.py | 2 + elastalert/schema.yaml | 7 ++ tests/alerters/sqs_test.py | 177 +++++++++++++++++++++++++++++++++++++ 4 files changed, 246 insertions(+) create mode 100644 elastalert/alerters/sqs.py create mode 100644 tests/alerters/sqs_test.py diff --git a/elastalert/alerters/sqs.py b/elastalert/alerters/sqs.py new file mode 100644 index 000000000..e7b3340ee --- /dev/null +++ b/elastalert/alerters/sqs.py @@ -0,0 +1,60 @@ +import boto3 +import json + +from elastalert.alerts import Alerter +from elastalert.util import elastalert_logger, EAException + + +class SqsAlerter(Alerter): + """Send alert using AWS SQS service""" + + required_options = frozenset(["sqs_queue_url"]) + + def __init__(self, *args): + super(SqsAlerter, self).__init__(*args) + self.sqs_queue_url = self.rule.get("sqs_queue_url", None) + self.sqs_aws_access_key_id = self.rule.get("sqs_aws_access_key_id") + self.sqs_aws_secret_access_key = self.rule.get("sqs_aws_secret_access_key") + self.sqs_aws_region = self.rule.get("sqs_aws_region", "us-east-1") + self.profile = self.rule.get("sqs_aws_profile", None) + + def alert(self, matches): + # Create the alert as a JSON object + alert_data = { + "rule_name": self.rule["name"], + "matches": matches, + } + alert_text = self.create_alert_body(matches) + # SQS message body limit is 256 KB; crop text at 128 KB to be safe + if len(alert_text) > 128000: + alert_text = alert_text[:128000] + alert_text += "\n*message was cropped according to SQS limits!*" + alert_data["text"] = alert_text + body = json.dumps(alert_data, default=str) + + # If the body is still too long, remove the text field + if len(body) > 256000: + alert_data["text"] = "Text message omitted due to SQS size limit." + body = json.dumps(alert_data, default=str) + try: + if self.profile is None: + session = boto3.Session( + aws_access_key_id=self.sqs_aws_access_key_id, + aws_secret_access_key=self.sqs_aws_secret_access_key, + region_name=self.sqs_aws_region, + ) + else: + session = boto3.Session(profile_name=self.profile) + + sqs_client = session.client("sqs") + + sqs_client.send_message( + QueueUrl=self.sqs_queue_url, + MessageBody=body, # Send the JSON body directly + ) + except Exception as e: + raise EAException("Error sending Amazon SQS: %s" % e) + elastalert_logger.info("Sent Amazon SQS message to %s" % (self.sqs_queue_url)) + + def get_info(self): + return {"type": "sqs"} diff --git a/elastalert/loaders.py b/elastalert/loaders.py index 9e664f73c..a27d59930 100644 --- a/elastalert/loaders.py +++ b/elastalert/loaders.py @@ -51,6 +51,7 @@ from elastalert.alerters.pagerduty import PagerDutyAlerter from elastalert.alerters.slack import SlackAlerter from elastalert.alerters.sns import SnsAlerter +from elastalert.alerters.sqs import SqsAlerter from elastalert.alerters.teams import MsTeamsAlerter from elastalert.alerters.powerautomate import MsPowerAutomateAlerter from elastalert.alerters.yzj import YzjAlerter @@ -114,6 +115,7 @@ class RulesLoader(object): 'debug': elastalert.alerters.debug.DebugAlerter, 'command': elastalert.alerters.command.CommandAlerter, 'sns': SnsAlerter, + 'sqs': SqsAlerter, 'ms_teams': MsTeamsAlerter, 'ms_power_automate': MsPowerAutomateAlerter, 'slack': SlackAlerter, diff --git a/elastalert/schema.yaml b/elastalert/schema.yaml index 64241ec05..7a22a315a 100644 --- a/elastalert/schema.yaml +++ b/elastalert/schema.yaml @@ -442,6 +442,13 @@ properties: sns_aws_region: {type: string} sns_aws_profile: {type: string} + ### AWS SQS + sqs_queue_url: {type: string} + sqs_aws_access_key_id: {type: string} + sqs_aws_secret_access_key: {type: string} + sqs_aws_region: {type: string} + sqs_aws_profile: {type: string} + ### Chatwork chatwork_apikey: {type: string} chatwork_room_id: {type: string} diff --git a/tests/alerters/sqs_test.py b/tests/alerters/sqs_test.py new file mode 100644 index 000000000..290d5b806 --- /dev/null +++ b/tests/alerters/sqs_test.py @@ -0,0 +1,177 @@ +import pytest + +from elastalert.alerters.sqs import SqsAlerter +from elastalert.loaders import FileRulesLoader +from elastalert.util import EAException + + +def test_sqs_getinfo(): + rule = { + "name": "Test Rule", + "type": "any", + "sqs_queue_url": "https://sqs.us-east-1.amazonaws.com/123456789012/my-queue", + "sqs_aws_access_key_id": "access key id", + "sqs_aws_secret_access_key": "secret access key", + "alert": [], + } + rules_loader = FileRulesLoader({}) + rules_loader.load_modules(rule) + alert = SqsAlerter(rule) + + expected_data = {"type": "sqs"} + actual_data = alert.get_info() + assert expected_data == actual_data + + +@pytest.mark.parametrize( + "sqs_queue_url, expected_data", + [ + ("", "Missing required option(s): sqs_queue_url"), + ("https://sqs.us-east-1.amazonaws.com/123456789012/my-queue", {"type": "sqs"}), + ], +) +def test_sqs_required_error(sqs_queue_url, expected_data): + try: + rule = {"name": "Test Rule", "type": "any", "alert": []} + + if sqs_queue_url: + rule["sqs_queue_url"] = sqs_queue_url + + rules_loader = FileRulesLoader({}) + rules_loader.load_modules(rule) + alert = SqsAlerter(rule) + + actual_data = alert.get_info() + assert expected_data == actual_data + except Exception as ea: + assert expected_data in str(ea) + + +def test_sqs_ea_exception(): + with pytest.raises(EAException) as ea: + rule = { + "name": "Test Rule", + "type": "any", + "sqs_queue_url": "https://sqs.us-east-1.amazonaws.com/123456789012/my-queue", + "sqs_aws_access_key_id": "access key id", + "sqs_aws_secret_access_key": "secret access key", + "alert": [], + } + match = { + "@timestamp": "2021-01-10T00:00:00", + "sender_ip": "1.1.1.1", + "hostname": "aProbe", + } + rules_loader = FileRulesLoader({}) + rules_loader.load_modules(rule) + alert = SqsAlerter(rule) + alert.alert([match]) + + assert "Error sending Amazon SQS: " in str(ea) + + +def test_sqs_message_size_limit(): + rule = { + "name": "Test Rule", + "type": "any", + "sqs_queue_url": "https://sqs.us-east-1.amazonaws.com/123456789012/my-queue", + "sqs_aws_access_key_id": "access key id", + "sqs_aws_secret_access_key": "secret access key", + "alert": [], + } + + # Create a large match object to test message size limiting + match = { + "@timestamp": "2021-01-10T00:00:00", + "sender_ip": "1.1.1.1", + "hostname": "aProbe", + "large_field": "x" * 200000, # Create a field larger than the 128KB limit + } + + rules_loader = FileRulesLoader({}) + rules_loader.load_modules(rule) + alert = SqsAlerter(rule) + + # Mock the create_alert_body method to return a large string + original_method = alert.create_alert_body + alert.create_alert_body = lambda matches: "x" * 150000 + + # Mock boto3 session and SQS client to avoid actual AWS calls + class MockSQSClient: + def send_message(self, QueueUrl, MessageBody): + # Verify message body contains truncation message + assert len(MessageBody) <= 256000 + assert ( + "message was cropped according to SQS limits" in MessageBody + or "Text message omitted due to SQS size limit" in MessageBody + ) + return {} + + class MockSession: + def __init__( + self, + aws_access_key_id=None, + aws_secret_access_key=None, + region_name=None, + profile_name=None, + **kwargs + ): + pass + + def client(self, service_name): + return MockSQSClient() + + import boto3 + + original_session = boto3.Session + boto3.Session = MockSession + + try: + # This should not raise an exception + alert.alert([match]) + finally: + # Restore original methods + alert.create_alert_body = original_method + boto3.Session = original_session + + +def test_sqs_aws_profile(): + rule = { + "name": "Test Rule", + "type": "any", + "sqs_queue_url": "https://sqs.us-east-1.amazonaws.com/123456789012/my-queue", + "sqs_aws_profile": "test-profile", + "alert": [], + } + + rules_loader = FileRulesLoader({}) + rules_loader.load_modules(rule) + alert = SqsAlerter(rule) + + # Mock boto3 session to verify profile is used + session_args = {} + + class MockSession: + def __init__(self, **kwargs): + nonlocal session_args + session_args = kwargs + + def client(self, service_name): + class MockClient: + def send_message(self, QueueUrl, MessageBody): + return {} + + return MockClient() + + import boto3 + + original_session = boto3.Session + boto3.Session = MockSession + + try: + match = {"@timestamp": "2021-01-10T00:00:00"} + alert.alert([match]) + assert "profile_name" in session_args + assert session_args["profile_name"] == "test-profile" + finally: + boto3.Session = original_session From 6c4c6ef54051a5942161959ffabfe77e33e5bfb1 Mon Sep 17 00:00:00 2001 From: "Anthr@X" Date: Fri, 6 Mar 2026 01:34:48 +1100 Subject: [PATCH 5/7] Add AWS SQS alerter documentation and update CHANGELOG - Documented the new AWS SQS alerter in alerts.rst, detailing its usage and configuration options. - Updated elastalert.rst to include AWS SQS in the list of supported alert types. - Enhanced the SqsAlerter implementation to infer AWS region from the SQS queue URL and adjusted message size handling. - Updated unit tests to verify new functionality, including region inference and message size limits. --- CHANGELOG.md | 2 +- docs/source/alerts.rst | 41 +++++++++++++++++++++++++++ docs/source/elastalert.rst | 1 + elastalert/alerters/sqs.py | 44 +++++++++++++++++++++++------ tests/alerters/sqs_test.py | 57 ++++++++++++++++++++++++++++++++++---- 5 files changed, 130 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 962a8b40b..b22746470 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,7 @@ - None ## New features -- None +- Add AWS SQS alerter for sending alerts to Amazon Simple Queue Service queues - @AnthraX1 ## Other changes - None diff --git a/docs/source/alerts.rst b/docs/source/alerts.rst index 3a6d43786..9f5227425 100644 --- a/docs/source/alerts.rst +++ b/docs/source/alerts.rst @@ -544,6 +544,47 @@ Example When to use aws_profile usage:: sns_topic_arn: 'arn:aws:sns:us-east-1:123456789:somesnstopic' sns_aws_profile: 'default' +AWS SQS (Amazon Simple Queue Service) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The AWS SQS alerter will send an alert message to an AWS SQS queue as a JSON object containing the rule name, matches, and alert text. +The AWS SQS alerter uses boto3 and can use credentials in the rule yaml, in a standard AWS credential and config files, or +via environment variables. See http://docs.aws.amazon.com/cli/latest/userguide/cli-chap-getting-started.html for details. + +Messages that exceed the SQS 1 MB size limit will be automatically truncated. + +If ``sqs_aws_region`` is not set in the rule, the region will be automatically inferred from the SQS queue URL (for example, ``https://sqs.eu-west-1.amazonaws.com/...`` will use ``eu-west-1``). + +AWS SQS requires one option: + +``sqs_queue_url``: The URL of the SQS queue. For example, ``https://sqs.us-east-1.amazonaws.com/123456789012/my-queue`` + +Optional: + +``sqs_aws_access_key_id``: An access key to connect to SQS with. + +``sqs_aws_secret_access_key``: The secret key associated with the access key. + +``sqs_aws_region``: The AWS region in which the SQS resource is located. Default is us-east-1 + +``sqs_aws_profile``: The AWS profile to use. If none specified, the default will be used. + +Example when not using aws_profile:: + + alert: + - sqs + sqs_queue_url: 'https://sqs.us-east-1.amazonaws.com/123456789012/my-queue' + sqs_aws_access_key_id: 'XXXXXXXXXXXXXXXXXX' + sqs_aws_secret_access_key: 'YYYYYYYYYYYYYYYYYYYY' + sqs_aws_region: 'us-east-1' + +Example when using aws_profile:: + + alert: + - sqs + sqs_queue_url: 'https://sqs.us-east-1.amazonaws.com/123456789012/my-queue' + sqs_aws_profile: 'default' + Chatwork ~~~~~~~~ diff --git a/docs/source/elastalert.rst b/docs/source/elastalert.rst index 62147c5b5..70ffc45e0 100755 --- a/docs/source/elastalert.rst +++ b/docs/source/elastalert.rst @@ -31,6 +31,7 @@ Currently, we have support built in for these alert types: - Alertmanager - AWS SES (Amazon Simple Email Service) - AWS SNS (Amazon Simple Notification Service) +- AWS SQS (Amazon Simple Queue Service) - Chatwork - Command - Datadog diff --git a/elastalert/alerters/sqs.py b/elastalert/alerters/sqs.py index e7b3340ee..486ddd80b 100644 --- a/elastalert/alerters/sqs.py +++ b/elastalert/alerters/sqs.py @@ -1,10 +1,26 @@ import boto3 import json +from urllib.parse import urlparse from elastalert.alerts import Alerter from elastalert.util import elastalert_logger, EAException +def _get_region_from_sqs_url(queue_url, default_region="us-east-1"): + """Infer the AWS region from an SQS queue URL like + https://sqs.us-east-1.amazonaws.com/123456789012/my-queue. + Falls back to default_region if it cannot be determined. + """ + try: + host = urlparse(queue_url).hostname or "" + parts = host.split(".") + if len(parts) >= 3 and parts[0] == "sqs": + return parts[1] + except Exception: + pass + return default_region + + class SqsAlerter(Alerter): """Send alert using AWS SQS service""" @@ -15,7 +31,12 @@ def __init__(self, *args): self.sqs_queue_url = self.rule.get("sqs_queue_url", None) self.sqs_aws_access_key_id = self.rule.get("sqs_aws_access_key_id") self.sqs_aws_secret_access_key = self.rule.get("sqs_aws_secret_access_key") - self.sqs_aws_region = self.rule.get("sqs_aws_region", "us-east-1") + explicit_region = self.rule.get("sqs_aws_region") + if explicit_region: + self.sqs_aws_region = explicit_region + else: + # If no region is configured explicitly, derive it from the queue URL. + self.sqs_aws_region = _get_region_from_sqs_url(self.sqs_queue_url or "") self.profile = self.rule.get("sqs_aws_profile", None) def alert(self, matches): @@ -25,18 +46,20 @@ def alert(self, matches): "matches": matches, } alert_text = self.create_alert_body(matches) - # SQS message body limit is 256 KB; crop text at 128 KB to be safe - if len(alert_text) > 128000: - alert_text = alert_text[:128000] + # SQS message body limit is 1 MB; crop text at ~800KB to be safe + if len(alert_text) > 800000: + alert_text = alert_text[:800000] alert_text += "\n*message was cropped according to SQS limits!*" alert_data["text"] = alert_text body = json.dumps(alert_data, default=str) # If the body is still too long, remove the text field - if len(body) > 256000: + if len(body) > 1048576: alert_data["text"] = "Text message omitted due to SQS size limit." body = json.dumps(alert_data, default=str) try: + # Always create the session in the configured region. SQS does not + # infer the region from the queue URL; the client region must match. if self.profile is None: session = boto3.Session( aws_access_key_id=self.sqs_aws_access_key_id, @@ -44,17 +67,20 @@ def alert(self, matches): region_name=self.sqs_aws_region, ) else: - session = boto3.Session(profile_name=self.profile) + session = boto3.Session( + profile_name=self.profile, + region_name=self.sqs_aws_region, + ) sqs_client = session.client("sqs") - sqs_client.send_message( + response = sqs_client.send_message( QueueUrl=self.sqs_queue_url, - MessageBody=body, # Send the JSON body directly + MessageBody=body, ) except Exception as e: raise EAException("Error sending Amazon SQS: %s" % e) - elastalert_logger.info("Sent Amazon SQS message to %s" % (self.sqs_queue_url)) + elastalert_logger.info("Sent Amazon SQS message to %s, MessageId: %s" % (self.sqs_queue_url, response.get("MessageId"))) def get_info(self): return {"type": "sqs"} diff --git a/tests/alerters/sqs_test.py b/tests/alerters/sqs_test.py index 290d5b806..b4602cb35 100644 --- a/tests/alerters/sqs_test.py +++ b/tests/alerters/sqs_test.py @@ -85,22 +85,23 @@ def test_sqs_message_size_limit(): "@timestamp": "2021-01-10T00:00:00", "sender_ip": "1.1.1.1", "hostname": "aProbe", - "large_field": "x" * 200000, # Create a field larger than the 128KB limit + "large_field": "x" * 200000, # Create a field contributing to a large body } rules_loader = FileRulesLoader({}) rules_loader.load_modules(rule) alert = SqsAlerter(rule) - # Mock the create_alert_body method to return a large string + # Mock the create_alert_body method to return a large string that exceeds + # the 800 KB truncation threshold so we hit the truncation path. original_method = alert.create_alert_body - alert.create_alert_body = lambda matches: "x" * 150000 + alert.create_alert_body = lambda matches: "x" * 900000 # Mock boto3 session and SQS client to avoid actual AWS calls class MockSQSClient: def send_message(self, QueueUrl, MessageBody): # Verify message body contains truncation message - assert len(MessageBody) <= 256000 + assert len(MessageBody) <= 1048576 assert ( "message was cropped according to SQS limits" in MessageBody or "Text message omitted due to SQS size limit" in MessageBody @@ -148,7 +149,7 @@ def test_sqs_aws_profile(): rules_loader.load_modules(rule) alert = SqsAlerter(rule) - # Mock boto3 session to verify profile is used + # Mock boto3 session to verify profile and region are used session_args = {} class MockSession: @@ -173,5 +174,51 @@ def send_message(self, QueueUrl, MessageBody): alert.alert([match]) assert "profile_name" in session_args assert session_args["profile_name"] == "test-profile" + # Region should be inferred from the queue URL (us-east-1) + assert "region_name" in session_args + assert session_args["region_name"] == "us-east-1" + finally: + boto3.Session = original_session + + +def test_sqs_region_inferred_from_url_when_not_set(): + # If sqs_aws_region is not provided, the region should be inferred from + # the SQS queue URL host. + rule = { + "name": "Test Rule", + "type": "any", + "sqs_queue_url": "https://sqs.eu-west-1.amazonaws.com/123456789012/my-queue", + "sqs_aws_profile": "test-profile", + "alert": [], + } + + rules_loader = FileRulesLoader({}) + rules_loader.load_modules(rule) + alert = SqsAlerter(rule) + + session_args = {} + + class MockSession: + def __init__(self, **kwargs): + nonlocal session_args + session_args = kwargs + + def client(self, service_name): + class MockClient: + def send_message(self, QueueUrl, MessageBody): + return {} + + return MockClient() + + import boto3 + + original_session = boto3.Session + boto3.Session = MockSession + + try: + match = {"@timestamp": "2021-01-10T00:00:00"} + alert.alert([match]) + assert session_args.get("profile_name") == "test-profile" + assert session_args.get("region_name") == "eu-west-1" finally: boto3.Session = original_session From e6778760bfbbbdadd5ca29df4992a5041f4f2b5e Mon Sep 17 00:00:00 2001 From: "Anthr@X" Date: Sat, 7 Mar 2026 21:56:02 +1100 Subject: [PATCH 6/7] Update CHANGELOG and refactor SQS region inference logic - Updated CHANGELOG to include a reference to the AWS SQS alerter pull request. - Refactored the _get_region_from_sqs_url function in sqs.py to remove unnecessary try-except block for improved error handling. --- CHANGELOG.md | 2 +- elastalert/alerters/sqs.py | 11 ++++------- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5b4edfee1..96656dc82 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,7 @@ - None ## New features -- Add AWS SQS alerter for sending alerts to Amazon Simple Queue Service queues - @AnthraX1 +- Add AWS SQS alerter for sending alerts to Amazon Simple Queue Service queues - [#1750]https://github.com/jertel/elastalert2/pull/1750 - @AnthraX1 ## Other changes - [Docs] Clarified Slack webhook URL documentation as it related to legacy vs app webhooks - [#1745](https://github.com/jertel/elastalert2/pull/1745) - @jertel diff --git a/elastalert/alerters/sqs.py b/elastalert/alerters/sqs.py index 486ddd80b..9c7d1522d 100644 --- a/elastalert/alerters/sqs.py +++ b/elastalert/alerters/sqs.py @@ -11,13 +11,10 @@ def _get_region_from_sqs_url(queue_url, default_region="us-east-1"): https://sqs.us-east-1.amazonaws.com/123456789012/my-queue. Falls back to default_region if it cannot be determined. """ - try: - host = urlparse(queue_url).hostname or "" - parts = host.split(".") - if len(parts) >= 3 and parts[0] == "sqs": - return parts[1] - except Exception: - pass + host = urlparse(queue_url).hostname or "" + parts = host.split(".") + if len(parts) >= 3 and parts[0] == "sqs": + return parts[1] return default_region From ca61341ede4d2c45d286d16c3d77bf34fa7c3cf4 Mon Sep 17 00:00:00 2001 From: "Anthr@X" Date: Sat, 7 Mar 2026 21:59:46 +1100 Subject: [PATCH 7/7] Update CHANGELOG to format AWS SQS alerter reference correctly --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 96656dc82..b9a5ce82c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,7 @@ - None ## New features -- Add AWS SQS alerter for sending alerts to Amazon Simple Queue Service queues - [#1750]https://github.com/jertel/elastalert2/pull/1750 - @AnthraX1 +- Add AWS SQS alerter for sending alerts to Amazon Simple Queue Service queues - [#1750](https://github.com/jertel/elastalert2/pull/1750) - @AnthraX1 ## Other changes - [Docs] Clarified Slack webhook URL documentation as it related to legacy vs app webhooks - [#1745](https://github.com/jertel/elastalert2/pull/1745) - @jertel