Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 44 additions & 77 deletions apisix/plugins/datadog.lua
Original file line number Diff line number Diff line change
Expand Up @@ -142,90 +142,33 @@ local function generate_tag(entry, const_tags)
end


local function send_metric_over_udp(entry, metadata)
local err_msg
local sock = udp()
local host, port = metadata.value.host, metadata.value.port

local ok, err = sock:setpeername(host, port)
if not ok then
return false, "failed to connect to UDP server: host[" .. host
.. "] port[" .. tostring(port) .. "] err: " .. err
end

-- Build all DogStatsD metrics for one entry as a single newline-delimited packet.
local function build_metrics(entry, metadata)
-- Generate prefix & suffix according dogstatsd udp data format.
local suffix = generate_tag(entry, metadata.value.constant_tags)
local prefix = metadata.value.namespace
if prefix ~= "" then
prefix = prefix .. "."
end

-- request counter
ok, err = sock:send(format("%s:%s|%s%s", prefix .. "request.counter", 1, "c", suffix))
if not ok then
err_msg = "error sending request.counter: " .. err
core.log.error("failed to report request count to dogstatsd server: host[" .. host
.. "] port[" .. tostring(port) .. "] err: " .. err)
end

-- request latency histogram
ok, err = sock:send(format("%s:%s|%s%s", prefix .. "request.latency",
entry.latency, "h", suffix))
if not ok then
err_msg = "error sending request.latency: " .. err
core.log.error("failed to report request latency to dogstatsd server: host["
.. host .. "] port[" .. tostring(port) .. "] err: " .. err)
end
local metrics = {
format("%s:%s|%s%s", prefix .. "request.counter", 1, "c", suffix),
format("%s:%s|%s%s", prefix .. "request.latency", entry.latency, "h", suffix),
}

-- upstream latency
if entry.upstream_latency then
ok, err = sock:send(format("%s:%s|%s%s", prefix .. "upstream.latency",
entry.upstream_latency, "h", suffix))
if not ok then
err_msg = "error sending upstream.latency: " .. err
core.log.error("failed to report upstream latency to dogstatsd server: host["
.. host .. "] port[" .. tostring(port) .. "] err: " .. err)
end
end

-- apisix_latency
ok, err = sock:send(format("%s:%s|%s%s", prefix .. "apisix.latency",
entry.apisix_latency, "h", suffix))
if not ok then
err_msg = "error sending apisix.latency: " .. err
core.log.error("failed to report apisix latency to dogstatsd server: host[" .. host
.. "] port[" .. tostring(port) .. "] err: " .. err)
core.table.insert(metrics, format("%s:%s|%s%s", prefix .. "upstream.latency",
entry.upstream_latency, "h", suffix))
end

-- request body size timer
ok, err = sock:send(format("%s:%s|%s%s", prefix .. "ingress.size",
entry.request.size, "ms", suffix))
if not ok then
err_msg = "error sending ingress.size: " .. err
core.log.error("failed to report req body size to dogstatsd server: host[" .. host
.. "] port[" .. tostring(port) .. "] err: " .. err)
end

-- response body size timer
ok, err = sock:send(format("%s:%s|%s%s", prefix .. "egress.size",
entry.response.size, "ms", suffix))
if not ok then
err_msg = "error sending egress.size: " .. err
core.log.error("failed to report response body size to dogstatsd server: host["
.. host .. "] port[" .. tostring(port) .. "] err: " .. err)
end
core.table.insert(metrics, format("%s:%s|%s%s", prefix .. "apisix.latency",
entry.apisix_latency, "h", suffix))
core.table.insert(metrics, format("%s:%s|%s%s", prefix .. "ingress.size",
entry.request.size, "ms", suffix))
core.table.insert(metrics, format("%s:%s|%s%s", prefix .. "egress.size",
entry.response.size, "ms", suffix))

ok, err = sock:close()
if not ok then
core.log.error("failed to close the UDP connection, host[",
host, "] port[", port, "] ", err)
end

if not err_msg then
return true
end

return false, err_msg
return concat(metrics, "\n")
end


Expand All @@ -240,16 +183,40 @@ local function push_metrics(entries)
metadata = {}
metadata.value = defaults
end
core.log.info("sending batch metrics to dogstatsd: ", metadata.value.host,
":", metadata.value.port)

local host, port = metadata.value.host, metadata.value.port
core.log.info("sending batch metrics to dogstatsd: ", host, ":", port)

-- reuse a single socket for the whole batch instead of one per entry
local sock = udp()
local ok, err = sock:setpeername(host, port)
if not ok then
return false, "failed to connect to UDP server: host[" .. host
.. "] port[" .. tostring(port) .. "] err: " .. err
end

local err_msg, first_fail
for i = 1, #entries do
local ok, err = send_metric_over_udp(entries[i], metadata)
if not ok then
return false, err, i
-- coalesce the per-request metrics into one datagram
local send_ok, send_err = sock:send(build_metrics(entries[i], metadata))
if not send_ok then
err_msg = "failed to send metrics to dogstatsd server: host[" .. host
.. "] port[" .. tostring(port) .. "] err: " .. send_err
first_fail = i
break
end
Comment on lines 199 to 207
end

ok, err = sock:close()
if not ok then
core.log.error("failed to close the UDP connection, host[",
host, "] port[", port, "] ", err)
end

if err_msg then
return false, err_msg, first_fail
end

return true
end

Expand Down
6 changes: 5 additions & 1 deletion t/lib/mock_layer4.lua
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ function _M.dogstatsd()
end
return
end
core.log.warn("message received: ", data)

-- a datagram may carry several newline-delimited metrics
for line in data:gmatch("[^\n]+") do
core.log.warn("message received: ", line)
end
end
end

Expand Down
Loading