From 0458f7a41aaafffea0bc9b2fffbac7b6d5ad51e1 Mon Sep 17 00:00:00 2001 From: Tony Xiao Date: Mon, 14 Jul 2025 11:36:24 -0400 Subject: [PATCH 1/3] feat(logs): Handle inbound filters for logs This adds support for the release inbound filter on logs. We currently do not support client ip because logs do not have an ip attribute. --- relay-event-schema/src/protocol/ourlog.rs | 16 ++ relay-filter/src/interface.rs | 44 +++++- relay-server/src/processing/logs/mod.rs | 5 + relay-server/src/processing/logs/process.rs | 19 +++ tests/integration/test_filters.py | 153 ++++++++++++++++++++ 5 files changed, 235 insertions(+), 2 deletions(-) diff --git a/relay-event-schema/src/protocol/ourlog.rs b/relay-event-schema/src/protocol/ourlog.rs index 2b054518b66..4b092e59641 100644 --- a/relay-event-schema/src/protocol/ourlog.rs +++ b/relay-event-schema/src/protocol/ourlog.rs @@ -2,6 +2,7 @@ use relay_protocol::{Annotated, Empty, FromValue, IntoValue, Object, SkipSeriali use std::collections::BTreeMap; use std::fmt::{self, Display}; +use relay_protocol::{Getter, Val}; use serde::{Deserialize, Serialize, Serializer}; use crate::processor::ProcessValue; @@ -39,6 +40,21 @@ pub struct OurLog { pub other: Object, } +impl Getter for OurLog { + fn get_value(&self, path: &str) -> Option> { + println!("path {:?}", path); + Some(match path.strip_prefix("event.")? { + "release" => self + .attributes + .value()? + .get_value("sentry.release")? + .as_str()? + .into(), + _ => return None, + }) + } +} + /// Relay specific metadata embedded into the log item. /// /// This metadata is purely an internal protocol extension used by Relay, diff --git a/relay-filter/src/interface.rs b/relay-filter/src/interface.rs index 24802ace2e5..16ebed9a6e9 100644 --- a/relay-filter/src/interface.rs +++ b/relay-filter/src/interface.rs @@ -3,8 +3,8 @@ use url::Url; use relay_event_schema::protocol::{ - Csp, Event, EventType, Exception, LogEntry, Replay, SessionAggregates, SessionUpdate, Span, - Values, + Csp, Event, EventType, Exception, LogEntry, OurLog, Replay, SessionAggregates, SessionUpdate, + Span, Values, }; /// A data item to which filters can be applied. @@ -92,6 +92,46 @@ impl Filterable for Event { } } +impl Filterable for OurLog { + fn csp(&self) -> Option<&Csp> { + None + } + + fn exceptions(&self) -> Option<&Values> { + None + } + + fn ip_addr(&self) -> Option<&str> { + None + } + + fn logentry(&self) -> Option<&LogEntry> { + None + } + + fn release(&self) -> Option<&str> { + let attributes = self.attributes.value()?; + let release = attributes.get_value("sentry.release")?; + release.as_str() + } + + fn transaction(&self) -> Option<&str> { + None + } + + fn url(&self) -> Option { + None + } + + fn user_agent(&self) -> Option<&str> { + None + } + + fn header(&self, header_name: &str) -> Option<&str> { + None + } +} + impl Filterable for Replay { fn csp(&self) -> Option<&Csp> { None diff --git a/relay-server/src/processing/logs/mod.rs b/relay-server/src/processing/logs/mod.rs index 0c67b8cb903..0ed9b81d2c1 100644 --- a/relay-server/src/processing/logs/mod.rs +++ b/relay-server/src/processing/logs/mod.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use relay_event_schema::processor::ProcessingAction; use relay_event_schema::protocol::OurLog; +use relay_filter::FilterStatKey; use relay_pii::PiiConfigError; use relay_quotas::{DataCategory, RateLimits}; @@ -33,6 +34,9 @@ pub enum Error { /// A duplicated item container for logs. #[error("duplicate log container")] DuplicateContainer, + /// Events filtered because of inbound filters + #[error("filtered log")] + Filtered(FilterStatKey), /// Events filtered because of a missing feature flag. #[error("logs feature flag missing")] FilterFeatureFlag, @@ -59,6 +63,7 @@ impl OutcomeError for Error { fn consume(self) -> (Option, Self::Error) { let outcome = match &self { Self::DuplicateContainer => Some(Outcome::Invalid(DiscardReason::DuplicateItem)), + Self::Filtered(filter_stat_key) => Some(Outcome::Filtered(filter_stat_key.clone())), Self::FilterFeatureFlag => None, Self::FilterSampling => None, Self::RateLimited(limits) => { diff --git a/relay-server/src/processing/logs/process.rs b/relay-server/src/processing/logs/process.rs index 3e4bb8e2a8d..c82433cc09e 100644 --- a/relay-server/src/processing/logs/process.rs +++ b/relay-server/src/processing/logs/process.rs @@ -115,6 +115,10 @@ fn expand_log_container(item: &Item, received_at: DateTime) -> Result, meta: &RequestMeta, ctx: Context<'_>) -> Result<()> { + filter(log, meta, ctx).inspect_err(|err| { + relay_log::debug!("failed to filter log: {err}"); + })?; + scrub(log, ctx).inspect_err(|err| { relay_log::debug!("failed to scrub pii from log: {err}"); })?; @@ -126,6 +130,21 @@ fn process_log(log: &mut Annotated, meta: &RequestMeta, ctx: Context<'_> Ok(()) } +fn filter(log: &mut Annotated, meta: &RequestMeta, ctx: Context<'_>) -> Result<()> { + if let Some(log) = log.value() { + if let Err(filter_stat_key) = relay_filter::should_filter( + log, + meta.client_addr(), + &ctx.project_info.config.filter_settings, + ctx.global_config.filters(), + ) { + return Err(Error::Filtered(filter_stat_key)); + } + } + + Ok(()) +} + fn scrub(log: &mut Annotated, ctx: Context<'_>) -> Result<()> { let pii_config = ctx .project_info diff --git a/tests/integration/test_filters.py b/tests/integration/test_filters.py index b36431c00d0..7f0e26bd965 100644 --- a/tests/integration/test_filters.py +++ b/tests/integration/test_filters.py @@ -2,8 +2,12 @@ import json from pathlib import Path from time import sleep +from unittest import mock + +from .asserts import matches import pytest +from google.protobuf.json_format import MessageToDict from sentry_relay.consts import DataCategory from sentry_sdk.envelope import Envelope, Item, PayloadRef @@ -630,3 +634,152 @@ def test_filters_are_applied_to_profiles( profile = json.loads(profile["payload"]) assert profile["release"] == "foobar@1.0" outcomes_consumer.assert_empty() + + +@pytest.mark.parametrize( + ["filter_config", "should_filter"], + [ + pytest.param({}, False, id="log accepted"), + pytest.param( + {"releases": {"releases": ["foobar@1.0"]}}, True, id="log filtered" + ), + ], +) +def test_filters_are_applied_to_logs( + mini_sentry, + relay_with_processing, + outcomes_consumer, + ourlogs_consumer, + filter_config, + should_filter, + # envelope, + # data_category, +): + outcomes_consumer = outcomes_consumer() + ourlogs_consumer = ourlogs_consumer() + + relay = relay_with_processing() + + project_id = 42 + project_config = mini_sentry.add_full_project_config(project_id) + project_config["config"].setdefault("features", []).extend( + ["organizations:ourlogs-ingestion"] + ) + + filter_settings = project_config["config"]["filterSettings"] + for key in filter_config.keys(): + filter_settings[key] = filter_config[key] + + timestamp = datetime.datetime.now(datetime.timezone.utc).replace(microsecond=0) + + payloads = [ + { + "timestamp": timestamp.timestamp(), + "trace_id": "5b8efff798038103d269b633813fc60c", + "span_id": "eee19b7ec3c1b175", + "level": "error", + "body": "foo", + "attributes": { + "sentry.release": {"value": "foobar@1.0", "type": "string"}, + }, + }, + { + "timestamp": timestamp.timestamp(), + "trace_id": "5b8efff798038103d269b633813fc60c", + "span_id": "2cd0a5ed6775452c", + "level": "error", + "body": "bar", + }, + ] + envelope = Envelope() + envelope.add_item( + Item( + type="log", + payload=PayloadRef(json={"items": payloads}), + content_type="application/vnd.sentry.items.log+json", + headers={"item_count": len(payloads)}, + ) + ) + + relay.send_envelope(project_id, envelope) + + if should_filter: + outcomes = outcomes_consumer.get_outcomes() + assert outcomes == [ + { + "category": DataCategory.LOG_ITEM.value, + "org_id": 1, + "project_id": 42, + "key_id": 123, + "outcome": 1, # Filtered + "reason": "release-version", + "quantity": 1, + "timestamp": timestamp.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + }, + { + "category": DataCategory.LOG_BYTE.value, + "key_id": 123, + "org_id": 1, + "outcome": 1, + "project_id": 42, + "quantity": matches(lambda x: x >= 0), + "reason": "release-version", + "timestamp": timestamp.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + }, + { + "category": DataCategory.LOG_ITEM.value, + "key_id": 123, + "org_id": 1, + "outcome": 0, + "project_id": 42, + "quantity": 1, + "timestamp": timestamp.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + }, + { + "category": DataCategory.LOG_BYTE.value, + "key_id": 123, + "org_id": 1, + "outcome": 0, + "project_id": 42, + "quantity": matches(lambda x: x >= 0), + "timestamp": timestamp.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + }, + ] + outcomes_consumer.assert_empty() + + log = ourlogs_consumer.get_ourlog() + log = MessageToDict(log) + assert log["attributes"]["sentry.body"] == {"stringValue": "bar"} + assert log["traceId"] == "5b8efff798038103d269b633813fc60c" + ourlogs_consumer.assert_empty() + else: + outcomes = outcomes_consumer.get_outcomes() + assert outcomes == [ + { + "category": DataCategory.LOG_ITEM.value, + "key_id": 123, + "org_id": 1, + "outcome": 0, + "project_id": 42, + "quantity": 2, + "timestamp": timestamp.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + }, + { + "category": DataCategory.LOG_BYTE.value, + "key_id": 123, + "org_id": 1, + "outcome": 0, + "project_id": 42, + "quantity": matches(lambda x: x >= 0), + "timestamp": timestamp.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + }, + ] + outcomes_consumer.assert_empty() + + logs = ourlogs_consumer.get_ourlogs() + logs = [MessageToDict(log) for log in logs] + assert logs[0]["attributes"]["sentry.body"] == {"stringValue": "foo"} + assert logs[0]["traceId"] == "5b8efff798038103d269b633813fc60c" + assert logs[1]["attributes"]["sentry.body"] == {"stringValue": "bar"} + assert logs[1]["traceId"] == "5b8efff798038103d269b633813fc60c" + ourlogs_consumer.assert_empty() From a1c4524f226193be0dc247acc5416eaecbf9a532 Mon Sep 17 00:00:00 2001 From: Tony Xiao Date: Mon, 14 Jul 2025 11:38:57 -0400 Subject: [PATCH 2/3] update changlog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e25b738c834..e205ffc62ec 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ **Features**: - Add mechanism to allow ingestion only from trusted relays. ([#4772](https://github.com/getsentry/relay/pull/4772)) +- Handle inbound filters for logs. (#4938) **Bug Fixes**: From b6dd5f97c98c1bf7077f6ed6e7e34cf53cb82890 Mon Sep 17 00:00:00 2001 From: Tony Xiao Date: Mon, 14 Jul 2025 12:05:27 -0400 Subject: [PATCH 3/3] remove println --- relay-event-schema/src/protocol/ourlog.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/relay-event-schema/src/protocol/ourlog.rs b/relay-event-schema/src/protocol/ourlog.rs index 4b092e59641..2a2b29800d1 100644 --- a/relay-event-schema/src/protocol/ourlog.rs +++ b/relay-event-schema/src/protocol/ourlog.rs @@ -42,7 +42,6 @@ pub struct OurLog { impl Getter for OurLog { fn get_value(&self, path: &str) -> Option> { - println!("path {:?}", path); Some(match path.strip_prefix("event.")? { "release" => self .attributes