diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c6d10b7 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +test.conf +test.php +rabbitmq-cli-consumer \ No newline at end of file diff --git a/command/command_executer.go b/command/command_executer.go index 13b1707..6e8549a 100644 --- a/command/command_executer.go +++ b/command/command_executer.go @@ -18,10 +18,14 @@ func New(errLogger, infLogger *log.Logger) *CommandExecuter { } } -func (me CommandExecuter) Execute(cmd *exec.Cmd) int { +func (me CommandExecuter) Execute(cmd *exec.Cmd, body []byte) int { me.infLogger.Println("Processing message...") + me.infLogger.Printf("Cmd: %s params: %s \n", cmd.Path, cmd.Args) out, err := cmd.CombinedOutput() + //log output php script to info + me.infLogger.Printf("Output php: %s\n", string(out[:])) + if err != nil { me.infLogger.Println("Failed. Check error log for details.") me.errLogger.Printf("Failed: %s\n", string(out[:])) @@ -29,7 +33,7 @@ func (me CommandExecuter) Execute(cmd *exec.Cmd) int { if exiterr, ok := err.(*exec.ExitError); ok { if status, ok := exiterr.Sys().(syscall.WaitStatus); ok { - return status.ExitStatus(); + return status.ExitStatus() } } diff --git a/config/config.go b/config/config.go index 798b7c6..b86b757 100644 --- a/config/config.go +++ b/config/config.go @@ -2,7 +2,6 @@ package config import ( "path/filepath" - "gopkg.in/gcfg.v1" ) @@ -33,6 +32,19 @@ type Config struct { Type string Durable bool } + Queue struct { + Key string + Name string + } + Deadexchange struct { + Name string + AutoDelete bool + Type string + Durable bool + Queue string + Retry int + RoutingKey string + } Logs struct { Error string Info string diff --git a/consumer/consumer.go b/consumer/consumer.go index addfe35..6108e11 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -11,9 +11,10 @@ import ( "net/url" "os" "time" + "strconv" - "github.com/ricbra/rabbitmq-cli-consumer/command" - "github.com/ricbra/rabbitmq-cli-consumer/config" + "github.com/andrefigueira/rabbitmq-cli-consumer/command" + "github.com/andrefigueira/rabbitmq-cli-consumer/config" "github.com/streadway/amqp" ) @@ -38,6 +39,8 @@ type Consumer struct { IncludeMetadata bool StrictExitCode bool OnFailure int + DeadLetter bool + Retry int } type Properties struct { @@ -80,6 +83,17 @@ func (c *Consumer) Consume() { c.InfLogger.Println("Succeeded registering consumer.") + var sendCh *amqp.Channel + + if c.DeadLetter { + var err error + sendCh, err = c.Connection.Channel() + if err != nil { + c.ErrLogger.Println("Could not open channel to republish failed jobs %s", err) + } + defer sendCh.Close() + } + defer c.Connection.Close() defer c.Channel.Close() @@ -95,12 +109,12 @@ func (c *Consumer) Consume() { input := d.Body if c.IncludeMetadata { + c.InfLogger.Println("reading deliveries") input, err = json.Marshal(&struct { Properties `json:"properties"` DeliveryInfo `json:"delivery_info"` Body string `json:"body"` - }{ - + } { Properties: Properties{ Headers: d.Headers, ContentType: d.ContentType, @@ -148,15 +162,68 @@ func (c *Consumer) Consume() { input = b.Bytes() } - cmd := c.Factory.Create(base64.StdEncoding.EncodeToString(input)) + if c.DeadLetter { + var retryCount int + + if d.Headers == nil { + d.Headers = make(map[string]interface{}, 0) + } + + retry, ok := d.Headers["retry_count"] + + if !ok { + retry = "1" + } + + c.InfLogger.Println(fmt.Sprintf("retry %s", retry)) + + retryCount, err = strconv.Atoi(retry.(string)) + + if err != nil { + c.ErrLogger.Fatal("could not parse retry header") + } + + c.InfLogger.Println(fmt.Sprintf("retryCount : %d max retries: %d", retryCount, c.Retry)) + + cmd := c.Factory.Create(base64.StdEncoding.EncodeToString(input)) + + if c.Executer.Execute(cmd, d.Body[:]) == 0 { + d.Ack(true) + } else if retryCount >= c.Retry { + d.Nack(true, false) + } else { + retryCount++ + d.Headers["retry_count"] = strconv.Itoa(retryCount) + + republish := amqp.Publishing{ + ContentType: d.ContentType, + ContentEncoding: d.ContentEncoding, + Timestamp: time.Now(), + Body: d.Body, + Headers: d.Headers, + } + + c.InfLogger.Println("") + + err = sendCh.Publish(d.Exchange, d.RoutingKey, false, false, republish) + + if err != nil { + c.ErrLogger.Println("error republish %s", err) + } + + d.Ack(true) + } + } else { + cmd := c.Factory.Create(base64.StdEncoding.EncodeToString(input)) - exitCode := c.Executer.Execute(cmd) + exitCode := c.Executer.Execute(cmd, d.Body[:]) - err := c.ack(d, exitCode) + err := c.ack(d, exitCode) - if err != nil { - c.ErrLogger.Fatalf("Message acknowledgement error: %v", err) - os.Exit(11) + if err != nil { + c.ErrLogger.Fatalf("Message acknowledgement error: %v", err) + os.Exit(11) + } } } }() @@ -251,6 +318,44 @@ func New(cfg *config.Config, factory *command.CommandFactory, errLogger, infLogg cfg.Exchange.Type = "direct" } + var table map[string]interface{} + deadLetter := false + + if "" != cfg.Deadexchange.Name { + infLogger.Printf("Declaring deadletter exchange \"%s\"...", cfg.Deadexchange.Name) + err = ch.ExchangeDeclare( + cfg.Deadexchange.Name, + cfg.Deadexchange.Type, + cfg.Deadexchange.Durable, + cfg.Deadexchange.AutoDelete, + false, + false, + amqp.Table{}) + + if nil != err { + return nil, errors.New(fmt.Sprintf("Failed to declare exchange: %s", err.Error())) + } + + table = make(map[string]interface{}, 0) + table["x-dead-letter-exchange"] = cfg.Deadexchange.Name + + if cfg.Deadexchange.RoutingKey != "" { + table["x-dead-letter-routing-key"] = cfg.Deadexchange.RoutingKey + } + + infLogger.Printf("Declaring error queue \"%s\"...", cfg.Deadexchange.Queue) + _, err = ch.QueueDeclare(cfg.Deadexchange.Queue, true, false, false, false, amqp.Table{}) + + // Bind queue + infLogger.Printf("Binding error queue \"%s\" to dead letter exchange \"%s\"...", cfg.Deadexchange.Queue, cfg.Deadexchange.Name) + err = ch.QueueBind(cfg.Deadexchange.Queue, cfg.Deadexchange.RoutingKey, cfg.Deadexchange.Name, false, amqp.Table{}) + + if nil != err { + return nil, errors.New(fmt.Sprintf("Failed to bind queue to dead-letter exchange: %s", err.Error())) + } + deadLetter = true + } + // Empty Exchange name means default, no need to declare if "" != cfg.Exchange.Name { infLogger.Printf("Declaring exchange \"%s\"...", cfg.Exchange.Name) @@ -260,12 +365,23 @@ func New(cfg *config.Config, factory *command.CommandFactory, errLogger, infLogg return nil, errors.New(fmt.Sprintf("Failed to declare exchange: %s", err.Error())) } + if cfg.QueueSettings.MessageTTL > 0 { + table["x-message-ttl"] = int32(cfg.QueueSettings.MessageTTL) + } + // Bind queue - infLogger.Printf("Binding queue \"%s\" to exchange \"%s\"...", cfg.RabbitMq.Queue, cfg.Exchange.Name) + infLogger.Printf("Declaring queue \"%s\"...with args: %+v", cfg.Queue.Name, table) + _, err = ch.QueueDeclare(cfg.RabbitMq.Queue, true, false, false, false, table) + + if nil != err { + return nil, errors.New(fmt.Sprintf("Failed to declare queue: %s", err.Error())) + } + + err = ch.QueueBind(cfg.RabbitMq.Queue, transformToStringValue(cfg.QueueSettings.Routingkey), transformToStringValue(cfg.Exchange.Name), false, nil) if nil != err { - return nil, errors.New(fmt.Sprintf("Failed to bind queue to exchange: %s", err.Error())) + return nil, errors.New(fmt.Sprintf("Failed to bind queue exchange: %s", err.Error())) } } @@ -279,22 +395,23 @@ func New(cfg *config.Config, factory *command.CommandFactory, errLogger, infLogg Executer: command.New(errLogger, infLogger), Compression: cfg.RabbitMq.Compression, OnFailure: cfg.RabbitMq.Onfailure, + DeadLetter: deadLetter, + Retry: cfg.Deadexchange.Retry, }, nil } func sanitizeQueueArgs(cfg *config.Config) amqp.Table { - args := make(amqp.Table) if cfg.QueueSettings.MessageTTL > 0 { args["x-message-ttl"] = int32(cfg.QueueSettings.MessageTTL) } - if cfg.QueueSettings.DeadLetterExchange != "" { - args["x-dead-letter-exchange"] = transformToStringValue(cfg.QueueSettings.DeadLetterExchange) + if cfg.Deadexchange.Name != "" { + args["x-dead-letter-exchange"] = transformToStringValue(cfg.Deadexchange.Name) - if cfg.QueueSettings.DeadLetterRoutingKey != "" { - args["x-dead-letter-routing-key"] = transformToStringValue(cfg.QueueSettings.DeadLetterRoutingKey) + if cfg.Deadexchange.RoutingKey != "" { + args["x-dead-letter-routing-key"] = transformToStringValue(cfg.Deadexchange.RoutingKey) } } diff --git a/example.conf b/example.conf index ffafb61..2b6472b 100644 --- a/example.conf +++ b/example.conf @@ -17,6 +17,14 @@ autodelete=Off type=direct durable=On +[deadexchange] +name = name +autodelete=Off +type=fanout +durable=true +queue=taskerrors +retry=3 + [queuesettings] routingkey=somekey messagettl=30000 @@ -25,4 +33,4 @@ deadLetterroutingkey=someroutingkey [logs] error = /tmp/error.log -info = /tmp/info.log +info = /tmp/info.log \ No newline at end of file diff --git a/main.go b/main.go index 90ef42a..448de95 100644 --- a/main.go +++ b/main.go @@ -6,9 +6,9 @@ import ( "os" "github.com/codegangsta/cli" - "github.com/ricbra/rabbitmq-cli-consumer/command" - "github.com/ricbra/rabbitmq-cli-consumer/config" - "github.com/ricbra/rabbitmq-cli-consumer/consumer" + "github.com/andrefigueira/rabbitmq-cli-consumer/command" + "github.com/andrefigueira/rabbitmq-cli-consumer/config" + "github.com/andrefigueira/rabbitmq-cli-consumer/consumer" ) func main() { @@ -73,7 +73,7 @@ func main() { cfg.RabbitMq.Queue = c.String("queue-name") } - factory := command.Factory(c.String("executable")) + factory := command.Factory(c.String("executable")) client, err := consumer.New(cfg, factory, errLogger, infLogger) if err != nil {