Skip to content

feat(logs): Handle inbound filters for logs #4938

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
**Features**:

- Add mechanism to allow ingestion only from trusted relays. ([#4772](https://github.com/getsentry/relay/pull/4772))
- Handle inbound filters for logs. (#4938)

**Bug Fixes**:

Expand Down
15 changes: 15 additions & 0 deletions relay-event-schema/src/protocol/ourlog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use relay_protocol::{Annotated, Empty, FromValue, IntoValue, Object, SkipSeriali
use std::collections::BTreeMap;
use std::fmt::{self, Display};

use relay_protocol::{Getter, Val};
use serde::{Deserialize, Serialize, Serializer};

use crate::processor::ProcessValue;
Expand Down Expand Up @@ -39,6 +40,20 @@ pub struct OurLog {
pub other: Object<Value>,
}

impl Getter for OurLog {
fn get_value(&self, path: &str) -> Option<Val<'_>> {
Some(match path.strip_prefix("event.")? {
"release" => self
.attributes
.value()?
.get_value("sentry.release")?
.as_str()?
.into(),
_ => return None,
})
}
}

/// Relay specific metadata embedded into the log item.
///
/// This metadata is purely an internal protocol extension used by Relay,
Expand Down
44 changes: 42 additions & 2 deletions relay-filter/src/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
use url::Url;

use relay_event_schema::protocol::{
Csp, Event, EventType, Exception, LogEntry, Replay, SessionAggregates, SessionUpdate, Span,
Values,
Csp, Event, EventType, Exception, LogEntry, OurLog, Replay, SessionAggregates, SessionUpdate,
Span, Values,
};

/// A data item to which filters can be applied.
Expand Down Expand Up @@ -92,6 +92,46 @@ impl Filterable for Event {
}
}

impl Filterable for OurLog {
fn csp(&self) -> Option<&Csp> {
None
}

fn exceptions(&self) -> Option<&Values<Exception>> {
None
}

fn ip_addr(&self) -> Option<&str> {
None
}

fn logentry(&self) -> Option<&LogEntry> {
None
}

fn release(&self) -> Option<&str> {
let attributes = self.attributes.value()?;
let release = attributes.get_value("sentry.release")?;
release.as_str()
}

fn transaction(&self) -> Option<&str> {
None
}

fn url(&self) -> Option<Url> {
None
}

fn user_agent(&self) -> Option<&str> {
None
}

fn header(&self, header_name: &str) -> Option<&str> {
None
}
}

impl Filterable for Replay {
fn csp(&self) -> Option<&Csp> {
None
Expand Down
5 changes: 5 additions & 0 deletions relay-server/src/processing/logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::sync::Arc;

use relay_event_schema::processor::ProcessingAction;
use relay_event_schema::protocol::OurLog;
use relay_filter::FilterStatKey;
use relay_pii::PiiConfigError;
use relay_quotas::{DataCategory, RateLimits};

Expand Down Expand Up @@ -33,6 +34,9 @@ pub enum Error {
/// A duplicated item container for logs.
#[error("duplicate log container")]
DuplicateContainer,
/// Events filtered because of inbound filters
#[error("filtered log")]
Filtered(FilterStatKey),
/// Events filtered because of a missing feature flag.
#[error("logs feature flag missing")]
FilterFeatureFlag,
Expand All @@ -59,6 +63,7 @@ impl OutcomeError for Error {
fn consume(self) -> (Option<Outcome>, Self::Error) {
let outcome = match &self {
Self::DuplicateContainer => Some(Outcome::Invalid(DiscardReason::DuplicateItem)),
Self::Filtered(filter_stat_key) => Some(Outcome::Filtered(filter_stat_key.clone())),
Self::FilterFeatureFlag => None,
Self::FilterSampling => None,
Self::RateLimited(limits) => {
Expand Down
19 changes: 19 additions & 0 deletions relay-server/src/processing/logs/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ fn expand_log_container(item: &Item, received_at: DateTime<Utc>) -> Result<Conta
}

fn process_log(log: &mut Annotated<OurLog>, meta: &RequestMeta, ctx: Context<'_>) -> Result<()> {
filter(log, meta, ctx).inspect_err(|err| {
relay_log::debug!("failed to filter log: {err}");
})?;

scrub(log, ctx).inspect_err(|err| {
relay_log::debug!("failed to scrub pii from log: {err}");
})?;
Expand All @@ -126,6 +130,21 @@ fn process_log(log: &mut Annotated<OurLog>, meta: &RequestMeta, ctx: Context<'_>
Ok(())
}

fn filter(log: &mut Annotated<OurLog>, meta: &RequestMeta, ctx: Context<'_>) -> Result<()> {
if let Some(log) = log.value() {
if let Err(filter_stat_key) = relay_filter::should_filter(
log,
meta.client_addr(),
&ctx.project_info.config.filter_settings,
ctx.global_config.filters(),
) {
return Err(Error::Filtered(filter_stat_key));
}
}

Ok(())
}

fn scrub(log: &mut Annotated<OurLog>, ctx: Context<'_>) -> Result<()> {
let pii_config = ctx
.project_info
Expand Down
153 changes: 153 additions & 0 deletions tests/integration/test_filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@
import json
from pathlib import Path
from time import sleep
from unittest import mock

from .asserts import matches

import pytest
from google.protobuf.json_format import MessageToDict

from sentry_relay.consts import DataCategory
from sentry_sdk.envelope import Envelope, Item, PayloadRef
Expand Down Expand Up @@ -630,3 +634,152 @@ def test_filters_are_applied_to_profiles(
profile = json.loads(profile["payload"])
assert profile["release"] == "[email protected]"
outcomes_consumer.assert_empty()


@pytest.mark.parametrize(
["filter_config", "should_filter"],
[
pytest.param({}, False, id="log accepted"),
pytest.param(
{"releases": {"releases": ["[email protected]"]}}, True, id="log filtered"
),
],
)
def test_filters_are_applied_to_logs(
mini_sentry,
relay_with_processing,
outcomes_consumer,
ourlogs_consumer,
filter_config,
should_filter,
# envelope,
# data_category,
):
outcomes_consumer = outcomes_consumer()
ourlogs_consumer = ourlogs_consumer()

relay = relay_with_processing()

project_id = 42
project_config = mini_sentry.add_full_project_config(project_id)
project_config["config"].setdefault("features", []).extend(
["organizations:ourlogs-ingestion"]
)

filter_settings = project_config["config"]["filterSettings"]
for key in filter_config.keys():
filter_settings[key] = filter_config[key]

timestamp = datetime.datetime.now(datetime.timezone.utc).replace(microsecond=0)

payloads = [
{
"timestamp": timestamp.timestamp(),
"trace_id": "5b8efff798038103d269b633813fc60c",
"span_id": "eee19b7ec3c1b175",
"level": "error",
"body": "foo",
"attributes": {
"sentry.release": {"value": "[email protected]", "type": "string"},
},
},
{
"timestamp": timestamp.timestamp(),
"trace_id": "5b8efff798038103d269b633813fc60c",
"span_id": "2cd0a5ed6775452c",
"level": "error",
"body": "bar",
},
]
envelope = Envelope()
envelope.add_item(
Item(
type="log",
payload=PayloadRef(json={"items": payloads}),
content_type="application/vnd.sentry.items.log+json",
headers={"item_count": len(payloads)},
)
)

relay.send_envelope(project_id, envelope)

if should_filter:
outcomes = outcomes_consumer.get_outcomes()
assert outcomes == [
{
"category": DataCategory.LOG_ITEM.value,
"org_id": 1,
"project_id": 42,
"key_id": 123,
"outcome": 1, # Filtered
"reason": "release-version",
"quantity": 1,
"timestamp": timestamp.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
},
{
"category": DataCategory.LOG_BYTE.value,
"key_id": 123,
"org_id": 1,
"outcome": 1,
"project_id": 42,
"quantity": matches(lambda x: x >= 0),
"reason": "release-version",
"timestamp": timestamp.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
},
{
"category": DataCategory.LOG_ITEM.value,
"key_id": 123,
"org_id": 1,
"outcome": 0,
"project_id": 42,
"quantity": 1,
"timestamp": timestamp.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
},
{
"category": DataCategory.LOG_BYTE.value,
"key_id": 123,
"org_id": 1,
"outcome": 0,
"project_id": 42,
"quantity": matches(lambda x: x >= 0),
"timestamp": timestamp.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
},
]
outcomes_consumer.assert_empty()

log = ourlogs_consumer.get_ourlog()
log = MessageToDict(log)
assert log["attributes"]["sentry.body"] == {"stringValue": "bar"}
assert log["traceId"] == "5b8efff798038103d269b633813fc60c"
ourlogs_consumer.assert_empty()
else:
outcomes = outcomes_consumer.get_outcomes()
assert outcomes == [
{
"category": DataCategory.LOG_ITEM.value,
"key_id": 123,
"org_id": 1,
"outcome": 0,
"project_id": 42,
"quantity": 2,
"timestamp": timestamp.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
},
{
"category": DataCategory.LOG_BYTE.value,
"key_id": 123,
"org_id": 1,
"outcome": 0,
"project_id": 42,
"quantity": matches(lambda x: x >= 0),
"timestamp": timestamp.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
},
]
outcomes_consumer.assert_empty()

logs = ourlogs_consumer.get_ourlogs()
logs = [MessageToDict(log) for log in logs]
assert logs[0]["attributes"]["sentry.body"] == {"stringValue": "foo"}
assert logs[0]["traceId"] == "5b8efff798038103d269b633813fc60c"
assert logs[1]["attributes"]["sentry.body"] == {"stringValue": "bar"}
assert logs[1]["traceId"] == "5b8efff798038103d269b633813fc60c"
ourlogs_consumer.assert_empty()