|
1 | 1 | (ns system.components.riemann-client
|
2 | 2 | (:require [com.stuartsierra.component :as component]
|
3 |
| - [riemann.client :as r])) |
| 3 | + [riemann.client :as r] |
| 4 | + [clojure.tools.logging :as log]) |
| 5 | + (:import [io.riemann.riemann.client OverloadedException])) |
4 | 6 |
|
5 | 7 | (defrecord RiemannClient [host port transport]
|
6 | 8 | component/Lifecycle
|
7 | 9 | (start [component]
|
8 | 10 | (let [client (case transport
|
9 | 11 | :tcp (r/tcp-client {:host host :port port})
|
10 |
| - :udp (r/udp-client {:host host :port port}))] |
11 |
| - (assoc component :client client))) |
| 12 | + :udp (r/udp-client {:host host :port port})) |
| 13 | + a (-> (agent {}) |
| 14 | + (add-watch :key (fn [_k _r _os ns] (try (deref (:promise ns) 5000 ::timeout) |
| 15 | + (catch Exception e (log/error (.getMessage e))))))) |
| 16 | + f (fn [state client event] |
| 17 | + (let [v (try |
| 18 | + (r/send-event client event) |
| 19 | + (catch OverloadedException e (log/error (.getMessage e))))] |
| 20 | + (assoc state :promise v)))] |
| 21 | + (assoc component :client client :send-fn (partial send a f client)))) |
12 | 22 | (stop [component]
|
13 | 23 | (if-let [client (:client component)]
|
14 | 24 | (assoc component :client (r/close! client))
|
15 | 25 | component)))
|
16 | 26 |
|
17 |
| -(defn new-riemann-client [& {:keys [host port transport] :or {host "127.0.0.1" port 5555 transport :tcp}}] |
| 27 | +(defn new-riemann-client |
| 28 | + "Returns a Riemann client. |
| 29 | +
|
| 30 | + `send-fn` is a function that accepts a Riemann struct, which it will |
| 31 | + send in an agent threadpool (asynchronously). The promise that the |
| 32 | + Riemann `send-event` returns will be derefed in a watcher (also on |
| 33 | + the threadpool), and will log errors if any occur." |
| 34 | + [& {:keys [host port transport] :or {host "127.0.0.1" port 5555 transport :tcp}}] |
18 | 35 | (map->RiemannClient {:host host :port port :transport transport}))
|
0 commit comments