@@ -73,22 +73,21 @@ func (a *Address) Value() (string, int) {
73
73
}
74
74
75
75
type CLI struct {
76
- Listen Address `kong:"short='a',help='NetFlow/IPFIX listen address:port',default='0.0.0.0:2055'"`
77
- Reuse bool `kong:"help='Enable SO_REUSEPORT for NetFlow/IPFIX listen port'"`
78
-
79
- Metrics Address `kong:"short='m',help='Metrics listen address',default='0.0.0.0:8080'"`
80
-
81
- ListenZmq string `kong:"short='z',help='proto://IP:Port to listen on for ZMQ connections',default='tcp://*:5556'"`
82
- Topic string `kong:"help='ZMQ Topic',default='flow'"`
83
- SourceId SourceId `kong:"help='NetFlow SourceId (0-255)',default=0"`
84
- Compress bool `kong:"help='Compress ZMQ JSON data',xor='zmq-data'"`
85
- Protobuf bool `kong:"help='Use ProtoBuff instead of JSQN for ZMQ',xor='zmq-data'"`
86
- TLV bool `kong:"help='Use TLV instead of JSQN for ZMQ (needed for ntopng 6.4 and later)',xor='zmq-data'"`
87
-
88
- Workers int `kong:"short='w',help='Number of NetFlow workers',default=2"`
89
- LogLevel string `kong:"short='l',help='Log level [error|warn|info|debug|trace]',default='info',enum='error,warn,info,debug,trace'"`
90
- LogFormat string `kong:"short='f',help='Log format [default|json]',default='default',enum='default,json'"`
91
- Version bool `kong:"short='v',help='Print version and copyright info'"`
76
+ Listen Address `short:"a" help:"NetFlow/IPFIX listen address:port" default:"0.0.0.0:2055"`
77
+ Reuse bool `help:"Enable SO_REUSEPORT for NetFlow/IPFIX listen port"`
78
+
79
+ Metrics Address `short:"m" help:"Metrics listen address" default:"0.0.0.0:8080"`
80
+
81
+ ListenZmq string `short:"z" help:"proto://IP:Port to listen on for ZMQ connections" default:"tcp://*:5556"`
82
+ Topic string `help:"ZMQ Topic" default:"flow"`
83
+ SourceId SourceId `help:"NetFlow SourceId (0-255)" default:"0"`
84
+ Format string `short:"f" help:"Output format [tlv|json|jcompress|proto] for ZMQ." enum:"tlv,json,jcompress,proto" default:"tlv"`
85
+ Workers int `short:"w" help:"Number of NetFlow workers" default:"2"`
86
+
87
+ LogLevel string `short:"l" help:"Log level [error|warn|info|debug|trace]" default:"info" enum:"error,warn,info,debug,trace"`
88
+ LogFormat string `help:"Log format [default|json]" default:"default" enum:"default,json"`
89
+
90
+ Version bool `short:"v" help:"Print version and copyright info"`
92
91
}
93
92
94
93
func LoadMappingYaml () (* protoproducer.ProducerConfig , error ) {
@@ -139,25 +138,33 @@ func main() {
139
138
var msgType localtransport.MsgFormat
140
139
var formatter * format.Format
141
140
142
- if rctx . cli . Protobuf {
143
- msgType = localtransport . PBUF
144
- log . Fatal ( "Protobuf not yet supported with goflow2" )
145
- } else if rctx . cli . TLV {
141
+ compress := false // For now, only compressing JSON.
142
+
143
+ switch rctx . cli . Format {
144
+ case "tlv" :
146
145
msgType = localtransport .TLV
147
146
formatter , err = format .FindFormat ("ntoptlv" )
148
147
log .Info ("Using ntopng TLV format for ZMQ" )
149
- } else {
148
+ case "protobuf" :
149
+ msgType = localtransport .PBUF
150
+ log .Fatal ("Protobuf not yet supported with goflow2" )
151
+ case "jcompress" :
152
+ compress = true
153
+ log .Info ("Using ntopng compressed JSON format for ZMQ" )
154
+ fallthrough
155
+ case "json" :
150
156
msgType = localtransport .JSON
151
157
formatter , err = format .FindFormat ("ntopjson" )
152
158
log .Info ("Using ntopng JSON format for ZMQ" )
159
+ default :
160
+ log .Fatal ("Unknown output format" )
153
161
}
154
162
155
163
if err != nil {
156
- log .Error ("Avail formatters:" , format .GetFormats ())
157
- log .Fatal ("error formatter" , err )
164
+ log .Fatal ("Avail formatters:" , format .GetFormats (), err )
158
165
}
159
166
160
- localtransport .RegisterZmq (rctx .cli .ListenZmq , msgType , int (rctx .cli .SourceId ), rctx . cli . Compress )
167
+ localtransport .RegisterZmq (rctx .cli .ListenZmq , msgType , int (rctx .cli .SourceId ), compress )
161
168
162
169
transporter , err := transport .FindTransport ("zmq" )
163
170
if err != nil {
0 commit comments