Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 126 additions & 14 deletions datadog/dogstatsd/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,12 @@


# pylint: disable=unused-import
from typing import Any, Optional, List, Text, Type, Union
from typing import Any, Optional, List, Text, Type, Union, Iterable, Callable, overload

try:
from typing import SupportsIndex
except ImportError:
SupportsIndex = int # type: ignore[assignment,misc]
# pylint: enable=unused-import

# Datadog libraries
Expand All @@ -45,9 +50,77 @@
from datadog.util.format import normalize_tags, validate_cardinality
from datadog.version import __version__


if TYPE_CHECKING:
from socket import socket as _Socket


class TagList(List[str]):
"""A list subclass that calls on_change() after any mutation."""

def __init__(self, iterable: Iterable[str] = (), on_change: Optional[Callable[[], None]] = None) -> None:
super(TagList, self).__init__(iterable)
self._on_change = on_change

def _notify(self) -> None:
if self._on_change is not None:
self._on_change()

@overload
def __setitem__(self, index: SupportsIndex, value: str) -> None: ...
@overload
def __setitem__(self, index: slice, value: Iterable[str]) -> None: ...
def __setitem__(self, index: Union[SupportsIndex, slice], value: Union[str, Iterable[str]]) -> None: # type: ignore[override]
super(TagList, self).__setitem__(index, value) # type: ignore[assignment,index]
self._notify()

def __delitem__(self, index: Union[SupportsIndex, slice]) -> None:
super(TagList, self).__delitem__(index)
self._notify()

def __iadd__(self, other: Iterable[str]) -> "TagList": # type: ignore[override,misc]
super(TagList, self).__iadd__(other)
self._notify()
return self

def __imul__(self, n: SupportsIndex) -> "TagList": # type: ignore[misc,override]
super(TagList, self).__imul__(n)
self._notify()
return self

def append(self, value: str) -> None: # type: ignore[override]
super(TagList, self).append(value)
self._notify()

def extend(self, iterable: Iterable[str]) -> None: # type: ignore[override]
super(TagList, self).extend(iterable)
self._notify()

def insert(self, index: int, value: str) -> None: # type: ignore[override]
super(TagList, self).insert(index, value)
self._notify()

def remove(self, value: str) -> None: # type: ignore[override]
super(TagList, self).remove(value)
self._notify()

def pop(self, index: SupportsIndex = -1) -> str: # type: ignore[override]
value = super(TagList, self).pop(index)
self._notify()
return value

def clear(self) -> None:
super(TagList, self).__delitem__(slice(None))
self._notify()

def sort(self, *args: Any, **kwargs: Any) -> None:
super(TagList, self).sort(*args, **kwargs)
self._notify()

def reverse(self) -> None:
super(TagList, self).reverse()
self._notify()

# Logging
log = logging.getLogger("datadog.dogstatsd")

Expand Down Expand Up @@ -442,9 +515,18 @@ def __init__(
value = os.environ.get(var, "")
if value:
env_tags.append("{name}:{value}".format(name=tag_name, value=value))

# This lock is used for all cases where client configuration is being changed: buffering,
# aggregation, sender mode.
self._config_lock = RLock()

if constant_tags is None:
constant_tags = []
self.constant_tags = constant_tags + env_tags

self._constant_tags_str = ""
self._constant_tags = TagList()
self.constant_tags = TagList(constant_tags + env_tags)

if namespace is not None:
namespace = text(namespace)
self.namespace = namespace
Expand Down Expand Up @@ -476,10 +558,6 @@ def __init__(

self._reset_buffer()

# This lock is used for all cases where client configuration is being changed: buffering,
# aggregation, sender mode.
self._config_lock = RLock()

self._disable_buffering = disable_buffering
self._disable_aggregation = disable_aggregation

Expand Down Expand Up @@ -1258,9 +1336,18 @@ def _serialize_metric(
parts.append("|@")
parts.append(text(sample_rate))

if tags:
with self._config_lock:
constant_tags_str = self._constant_tags_str

if tags or constant_tags_str:
parts.append("|#")
parts.append(",".join(normalize_tags(tags)))
if tags:
parts.append(",".join(normalize_tags(tags)))
if constant_tags_str:
parts.append(",")
parts.append(constant_tags_str)
else:
parts.append(constant_tags_str)

if self._container_id:
parts.append("|c:")
Expand Down Expand Up @@ -1314,8 +1401,6 @@ def _report(self, metric, metric_type, value, tags, sample_rate, timestamp=0, sa

validate_cardinality(cardinality)

# Resolve the full tag list
tags = self._add_constant_tags(tags)
payload = self._serialize_metric(
metric, metric_type, value, tags, sample_rate, timestamp, cardinality
)
Expand Down Expand Up @@ -1632,13 +1717,40 @@ def service_check(

self._send(string)

@staticmethod
def _normalize_and_join_tags(tags):
# type: (List[str]) -> str
"""Normalize a tag list and join into a comma-separated string."""
if tags:
return ",".join(normalize_tags(tags))

return ""

def _rebuild_constant_tags_str(self):
# type: () -> None
with self._config_lock:
self._constant_tags_str = self._normalize_and_join_tags(self._constant_tags)

@property
def constant_tags(self):
# type: () -> TagList
return self._constant_tags

@constant_tags.setter
def constant_tags(self, value):
# type: (Union[TagList, List[str]]) -> None
with self._config_lock:
self._constant_tags = TagList(value or [], on_change=self._rebuild_constant_tags_str)
self._rebuild_constant_tags_str()

def _add_constant_tags(self, tags):
# type: (Optional[List[str]]) -> Optional[List[str]]
if self.constant_tags:
if tags:
return tags + self.constant_tags
with self._config_lock:
if self._constant_tags:
if tags:
return tags + self._constant_tags

return self.constant_tags
return list(self._constant_tags)
return tags

def _is_origin_detection_enabled(self, container_id, origin_detection_enabled):
Expand Down
51 changes: 51 additions & 0 deletions tests/unit/dogstatsd/test_statsd.py
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,57 @@ def test_gauge_constant_tags_with_metric_level_tags_twice(self):
),
)

def test_constant_tags_cache_invalidated_on_mutation(self):
dogstatsd = DogStatsd(telemetry_min_flush_interval=0, disable_telemetry=True)
dogstatsd.socket = FakeSocket()

dogstatsd.constant_tags = ['original:tag']
dogstatsd.gauge('gauge', 1)
dogstatsd.flush()
self.assertEqual('gauge:1|g|#original:tag\n', dogstatsd.socket.recv())

# append
dogstatsd.constant_tags.append('new:tag')
dogstatsd.gauge('gauge', 2)
dogstatsd.flush()
self.assertEqual('gauge:2|g|#original:tag,new:tag\n', dogstatsd.socket.recv())

# sort
dogstatsd.constant_tags.sort()
dogstatsd.gauge('gauge', 3)
dogstatsd.flush()
self.assertEqual('gauge:3|g|#new:tag,original:tag\n', dogstatsd.socket.recv())

# remove
dogstatsd.constant_tags.remove('new:tag')
dogstatsd.gauge('gauge', 4)
dogstatsd.flush()
self.assertEqual('gauge:4|g|#original:tag\n', dogstatsd.socket.recv())

# __setitem__
dogstatsd.constant_tags[0] = 'replaced:tag'
dogstatsd.gauge('gauge', 5)
dogstatsd.flush()
self.assertEqual('gauge:5|g|#replaced:tag\n', dogstatsd.socket.recv())

# extend
dogstatsd.constant_tags.extend(['a:1', 'b:2'])
dogstatsd.gauge('gauge', 6)
dogstatsd.flush()
self.assertEqual('gauge:6|g|#replaced:tag,a:1,b:2\n', dogstatsd.socket.recv())

# pop
dogstatsd.constant_tags.pop()
dogstatsd.gauge('gauge', 7)
dogstatsd.flush()
self.assertEqual('gauge:7|g|#replaced:tag,a:1\n', dogstatsd.socket.recv())

# clear
dogstatsd.constant_tags.clear()
dogstatsd.gauge('gauge', 8)
dogstatsd.flush()
self.assertEqual('gauge:8|g\n', dogstatsd.socket.recv())

def test_socket_error(self):
self.statsd.socket = BrokenSocket()
with mock.patch("datadog.dogstatsd.base.log") as mock_log:
Expand Down
Loading