From 6c201becea9b23c4074134c09d1add44a5cacb55 Mon Sep 17 00:00:00 2001 From: INADA Naoki Date: Thu, 12 Jun 2014 03:34:10 +0900 Subject: [PATCH 1/6] Fix race between flush and Emit. --- plugins/out_forward.go | 70 ++++++++++++++++++++++-------------------- 1 file changed, 37 insertions(+), 33 deletions(-) diff --git a/plugins/out_forward.go b/plugins/out_forward.go index 606ba56..252ad40 100644 --- a/plugins/out_forward.go +++ b/plugins/out_forward.go @@ -1,28 +1,31 @@ package plugins import ( - "github.com/moriyoshi/ik" "bytes" - "github.com/ugorji/go/codec" "log" "net" "reflect" "strconv" "time" + + "github.com/moriyoshi/ik" + "github.com/ugorji/go/codec" ) type ForwardOutput struct { - factory *ForwardOutputFactory - logger *log.Logger - codec *codec.MsgpackHandle - bind string - enc *codec.Encoder - conn net.Conn - buffer bytes.Buffer + factory *ForwardOutputFactory + logger *log.Logger + codec *codec.MsgpackHandle + bind string + enc *codec.Encoder + conn net.Conn + buffer bytes.Buffer + emitCh chan []ik.FluentRecordSet + flush_interval int } func (output *ForwardOutput) encodeEntry(tag string, record ik.TinyFluentRecord) error { - v := []interface{} { tag, record.Timestamp, record.Data } + v := []interface{}{tag, record.Timestamp, record.Data} if output.enc == nil { output.enc = codec.NewEncoder(&output.buffer, output.codec) } @@ -34,7 +37,7 @@ func (output *ForwardOutput) encodeEntry(tag string, record ik.TinyFluentRecord) } func (output *ForwardOutput) encodeRecordSet(recordSet ik.FluentRecordSet) error { - v := []interface{} { recordSet.Tag, recordSet.Records } + v := []interface{}{recordSet.Tag, recordSet.Records} if output.enc == nil { output.enc = codec.NewEncoder(&output.buffer, output.codec) } @@ -69,19 +72,12 @@ func (output *ForwardOutput) flush() error { return nil } -func (output *ForwardOutput) run_flush(flush_interval int) { - ticker := time.NewTicker(time.Duration(flush_interval) * time.Second) - go func() { - for { - select { - case <-ticker.C: - output.flush() - } - } - }() +func (output *ForwardOutput) Emit(recordSet []ik.FluentRecordSet) error { + output.emitCh <- recordSet + return nil } -func (output *ForwardOutput) Emit(recordSet []ik.FluentRecordSet) error { +func (output *ForwardOutput) emit(recordSet []ik.FluentRecordSet) error { for _, recordSet := range recordSet { err := output.encodeRecordSet(recordSet) if err != nil { @@ -97,8 +93,17 @@ func (output *ForwardOutput) Factory() ik.Plugin { } func (output *ForwardOutput) Run() error { - time.Sleep(1000000000) - return ik.Continue + ticker := time.NewTicker(time.Duration(output.flush_interval) * time.Second) + for { + select { + case rs := <-output.emitCh: + if err := output.emit(rs); err != nil { + output.logger.Printf("%#v", err) + } + case <-ticker.C: + output.flush() + } + } } func (output *ForwardOutput) Shutdown() error { @@ -108,17 +113,18 @@ func (output *ForwardOutput) Shutdown() error { type ForwardOutputFactory struct { } -func newForwardOutput(factory *ForwardOutputFactory, logger *log.Logger, bind string) (*ForwardOutput, error) { +func newForwardOutput(factory *ForwardOutputFactory, logger *log.Logger, bind string, flush_interval int) *ForwardOutput { _codec := codec.MsgpackHandle{} _codec.MapType = reflect.TypeOf(map[string]interface{}(nil)) _codec.RawToString = false _codec.StructToArray = true return &ForwardOutput{ - factory: factory, - logger: logger, - codec: &_codec, - bind: bind, - }, nil + factory: factory, + logger: logger, + codec: &_codec, + bind: bind, + flush_interval: flush_interval, + } } func (factory *ForwardOutputFactory) Name() string { @@ -144,9 +150,7 @@ func (factory *ForwardOutputFactory) New(engine ik.Engine, config *ik.ConfigElem return nil, err } bind := host + ":" + netPort - output, err := newForwardOutput(factory, engine.Logger(), bind) - output.run_flush(flush_interval) - return output, err + return newForwardOutput(factory, engine.Logger(), bind, flush_interval), nil } func (factory *ForwardOutputFactory) BindScorekeeper(scorekeeper *ik.Scorekeeper) { From 6f193a7917ad696601dd6bfb31915c82cfd80aab Mon Sep 17 00:00:00 2001 From: INADA Naoki Date: Thu, 12 Jun 2014 12:27:10 +0900 Subject: [PATCH 2/6] Async flushing and safe shutdown. --- plugins/out_forward.go | 82 +++++++++++++++++++++++------------------- 1 file changed, 46 insertions(+), 36 deletions(-) diff --git a/plugins/out_forward.go b/plugins/out_forward.go index 252ad40..e6dc4cf 100644 --- a/plugins/out_forward.go +++ b/plugins/out_forward.go @@ -6,6 +6,7 @@ import ( "net" "reflect" "strconv" + "sync" "time" "github.com/moriyoshi/ik" @@ -13,15 +14,16 @@ import ( ) type ForwardOutput struct { - factory *ForwardOutputFactory - logger *log.Logger - codec *codec.MsgpackHandle - bind string - enc *codec.Encoder - conn net.Conn - buffer bytes.Buffer - emitCh chan []ik.FluentRecordSet - flush_interval int + factory *ForwardOutputFactory + logger *log.Logger + codec *codec.MsgpackHandle + bind string + enc *codec.Encoder + buffer bytes.Buffer + emitCh chan []ik.FluentRecordSet + shutdown chan (chan error) + flushInterval int + flushWg sync.WaitGroup } func (output *ForwardOutput) encodeEntry(tag string, record ik.TinyFluentRecord) error { @@ -49,26 +51,23 @@ func (output *ForwardOutput) encodeRecordSet(recordSet ik.FluentRecordSet) error } func (output *ForwardOutput) flush() error { - if output.conn == nil { + buffer := output.buffer + output.buffer = bytes.Buffer{} + + output.flushWg.Add(1) + go func() { + defer output.flushWg.Done() conn, err := net.Dial("tcp", output.bind) if err != nil { - output.logger.Printf("%#v", err.Error()) - return err - } else { - output.conn = conn + output.logger.Printf("%#v", err) + return } - } - n, err := output.buffer.WriteTo(output.conn) - if err != nil { - output.logger.Printf("Write failed. size: %d, buf size: %d, error: %#v", n, output.buffer.Len(), err.Error()) - output.conn = nil - return err - } - if n > 0 { - output.logger.Printf("Forwarded: %d bytes (left: %d bytes)\n", n, output.buffer.Len()) - } - output.conn.Close() - output.conn = nil + defer conn.Close() + + if n, err := buffer.WriteTo(conn); err != nil { + output.logger.Printf("Write failed. size: %d, buf size: %d, error: %#v", n, output.buffer.Len(), err.Error()) + } + }() return nil } @@ -93,7 +92,7 @@ func (output *ForwardOutput) Factory() ik.Plugin { } func (output *ForwardOutput) Run() error { - ticker := time.NewTicker(time.Duration(output.flush_interval) * time.Second) + ticker := time.NewTicker(time.Duration(output.flushInterval) * time.Second) for { select { case rs := <-output.emitCh: @@ -102,28 +101,39 @@ func (output *ForwardOutput) Run() error { } case <-ticker.C: output.flush() + case finish := <-output.shutdown: + close(output.emitCh) + output.flush() + output.flushWg.Wait() + finish <- nil + return nil } } } func (output *ForwardOutput) Shutdown() error { - return nil + finish := make(chan error) + output.shutdown <- finish + return <-finish } type ForwardOutputFactory struct { } -func newForwardOutput(factory *ForwardOutputFactory, logger *log.Logger, bind string, flush_interval int) *ForwardOutput { +func newForwardOutput(factory *ForwardOutputFactory, logger *log.Logger, bind string, flushInterval int) *ForwardOutput { _codec := codec.MsgpackHandle{} _codec.MapType = reflect.TypeOf(map[string]interface{}(nil)) _codec.RawToString = false _codec.StructToArray = true return &ForwardOutput{ - factory: factory, - logger: logger, - codec: &_codec, - bind: bind, - flush_interval: flush_interval, + factory: factory, + logger: logger, + codec: &_codec, + bind: bind, + emitCh: make(chan []ik.FluentRecordSet), + shutdown: make(chan chan error), + flushInterval: flushInterval, + flushWg: sync.WaitGroup{}, } } @@ -144,13 +154,13 @@ func (factory *ForwardOutputFactory) New(engine ik.Engine, config *ik.ConfigElem if !ok { flush_interval_str = "60" } - flush_interval, err := strconv.Atoi(flush_interval_str) + flushInterval, err := strconv.Atoi(flush_interval_str) if err != nil { engine.Logger().Print(err.Error()) return nil, err } bind := host + ":" + netPort - return newForwardOutput(factory, engine.Logger(), bind, flush_interval), nil + return newForwardOutput(factory, engine.Logger(), bind, flushInterval), nil } func (factory *ForwardOutputFactory) BindScorekeeper(scorekeeper *ik.Scorekeeper) { From a08486b4ea91640051234a627a2c81bfe4c13b92 Mon Sep 17 00:00:00 2001 From: INADA Naoki Date: Thu, 12 Jun 2014 22:11:09 +0900 Subject: [PATCH 3/6] Run() should not be an infinite loop. --- plugins/out_forward.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/plugins/out_forward.go b/plugins/out_forward.go index e6dc4cf..12dbb4e 100644 --- a/plugins/out_forward.go +++ b/plugins/out_forward.go @@ -55,7 +55,7 @@ func (output *ForwardOutput) flush() error { output.buffer = bytes.Buffer{} output.flushWg.Add(1) - go func() { + go func() { // TODO: static goroutine for flushing. defer output.flushWg.Done() conn, err := net.Dial("tcp", output.bind) if err != nil { @@ -92,6 +92,11 @@ func (output *ForwardOutput) Factory() ik.Plugin { } func (output *ForwardOutput) Run() error { + time.Sleep(time.Second) + return ik.Continue +} + +func (output *ForwardOutput) mainLoop() { ticker := time.NewTicker(time.Duration(output.flushInterval) * time.Second) for { select { @@ -106,7 +111,7 @@ func (output *ForwardOutput) Run() error { output.flush() output.flushWg.Wait() finish <- nil - return nil + return } } } @@ -160,7 +165,9 @@ func (factory *ForwardOutputFactory) New(engine ik.Engine, config *ik.ConfigElem return nil, err } bind := host + ":" + netPort - return newForwardOutput(factory, engine.Logger(), bind, flushInterval), nil + output := newForwardOutput(factory, engine.Logger(), bind, flushInterval) + go output.mainLoop() + return output, nil } func (factory *ForwardOutputFactory) BindScorekeeper(scorekeeper *ik.Scorekeeper) { From dad4afb8dde6b12460e4f8cc01070df9bc950122 Mon Sep 17 00:00:00 2001 From: INADA Naoki Date: Thu, 12 Jun 2014 22:50:42 +0900 Subject: [PATCH 4/6] Fix bug --- plugins/out_forward.go | 27 ++++++++++++--------------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/plugins/out_forward.go b/plugins/out_forward.go index 12dbb4e..30a60bc 100644 --- a/plugins/out_forward.go +++ b/plugins/out_forward.go @@ -2,6 +2,7 @@ package plugins import ( "bytes" + "fmt" "log" "net" "reflect" @@ -19,7 +20,7 @@ type ForwardOutput struct { codec *codec.MsgpackHandle bind string enc *codec.Encoder - buffer bytes.Buffer + buffer *bytes.Buffer emitCh chan []ik.FluentRecordSet shutdown chan (chan error) flushInterval int @@ -29,30 +30,23 @@ type ForwardOutput struct { func (output *ForwardOutput) encodeEntry(tag string, record ik.TinyFluentRecord) error { v := []interface{}{tag, record.Timestamp, record.Data} if output.enc == nil { - output.enc = codec.NewEncoder(&output.buffer, output.codec) + output.enc = codec.NewEncoder(output.buffer, output.codec) } - err := output.enc.Encode(v) - if err != nil { - return err - } - return err + return output.enc.Encode(v) } func (output *ForwardOutput) encodeRecordSet(recordSet ik.FluentRecordSet) error { v := []interface{}{recordSet.Tag, recordSet.Records} if output.enc == nil { - output.enc = codec.NewEncoder(&output.buffer, output.codec) - } - err := output.enc.Encode(v) - if err != nil { - return err + output.enc = codec.NewEncoder(output.buffer, output.codec) } - return err + return output.enc.Encode(v) } func (output *ForwardOutput) flush() error { buffer := output.buffer - output.buffer = bytes.Buffer{} + output.buffer = &bytes.Buffer{} + output.enc = nil output.flushWg.Add(1) go func() { // TODO: static goroutine for flushing. @@ -65,7 +59,7 @@ func (output *ForwardOutput) flush() error { defer conn.Close() if n, err := buffer.WriteTo(conn); err != nil { - output.logger.Printf("Write failed. size: %d, buf size: %d, error: %#v", n, output.buffer.Len(), err.Error()) + output.logger.Printf("Write failed. size: %d, buf size: %d, error: %#v", n, buffer.Len(), err) } }() return nil @@ -93,6 +87,7 @@ func (output *ForwardOutput) Factory() ik.Plugin { func (output *ForwardOutput) Run() error { time.Sleep(time.Second) + // TODO: Should return something when finished? return ik.Continue } @@ -101,6 +96,7 @@ func (output *ForwardOutput) mainLoop() { for { select { case rs := <-output.emitCh: + fmt.Println("output: ", rs) if err := output.emit(rs); err != nil { output.logger.Printf("%#v", err) } @@ -135,6 +131,7 @@ func newForwardOutput(factory *ForwardOutputFactory, logger *log.Logger, bind st logger: logger, codec: &_codec, bind: bind, + buffer: &bytes.Buffer{}, emitCh: make(chan []ik.FluentRecordSet), shutdown: make(chan chan error), flushInterval: flushInterval, From c2d1615738771902096ff6128505974ecba00d31 Mon Sep 17 00:00:00 2001 From: INADA Naoki Date: Thu, 12 Jun 2014 22:54:17 +0900 Subject: [PATCH 5/6] Fix naming convension --- plugins/out_forward.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/plugins/out_forward.go b/plugins/out_forward.go index 30a60bc..98dabe3 100644 --- a/plugins/out_forward.go +++ b/plugins/out_forward.go @@ -152,11 +152,11 @@ func (factory *ForwardOutputFactory) New(engine ik.Engine, config *ik.ConfigElem if !ok { netPort = "24224" } - flush_interval_str, ok := config.Attrs["flush_interval"] + flushIntervalStr, ok := config.Attrs["flush_interval"] if !ok { - flush_interval_str = "60" + flushIntervalStr = "60" } - flushInterval, err := strconv.Atoi(flush_interval_str) + flushInterval, err := strconv.Atoi(flushIntervalStr) if err != nil { engine.Logger().Print(err.Error()) return nil, err From d2c05b19bb49e387bf74c516bf6080131a151c13 Mon Sep 17 00:00:00 2001 From: INADA Naoki Date: Fri, 13 Jun 2014 08:03:51 +0900 Subject: [PATCH 6/6] Skip flush() when buffer is empty. --- plugins/out_forward.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/plugins/out_forward.go b/plugins/out_forward.go index 98dabe3..849cb10 100644 --- a/plugins/out_forward.go +++ b/plugins/out_forward.go @@ -44,6 +44,9 @@ func (output *ForwardOutput) encodeRecordSet(recordSet ik.FluentRecordSet) error } func (output *ForwardOutput) flush() error { + if output.buffer.Len() == 0 { + return nil + } buffer := output.buffer output.buffer = &bytes.Buffer{} output.enc = nil