Skip to content
This repository was archived by the owner on Feb 21, 2025. It is now read-only.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
test.conf
test.php
rabbitmq-cli-consumer
8 changes: 6 additions & 2 deletions command/command_executer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,22 @@ func New(errLogger, infLogger *log.Logger) *CommandExecuter {
}
}

func (me CommandExecuter) Execute(cmd *exec.Cmd) int {
func (me CommandExecuter) Execute(cmd *exec.Cmd, body []byte) int {
me.infLogger.Println("Processing message...")
me.infLogger.Printf("Cmd: %s params: %s \n", cmd.Path, cmd.Args)
out, err := cmd.CombinedOutput()

//log output php script to info
me.infLogger.Printf("Output php: %s\n", string(out[:]))

if err != nil {
me.infLogger.Println("Failed. Check error log for details.")
me.errLogger.Printf("Failed: %s\n", string(out[:]))
me.errLogger.Printf("Error: %s\n", err)

if exiterr, ok := err.(*exec.ExitError); ok {
if status, ok := exiterr.Sys().(syscall.WaitStatus); ok {
return status.ExitStatus();
return status.ExitStatus()
}
}

Expand Down
14 changes: 13 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package config

import (
"path/filepath"

"gopkg.in/gcfg.v1"
)

Expand Down Expand Up @@ -33,6 +32,19 @@ type Config struct {
Type string
Durable bool
}
Queue struct {
Key string
Name string
}
Deadexchange struct {
Name string
AutoDelete bool
Type string
Durable bool
Queue string
Retry int
RoutingKey string
}
Logs struct {
Error string
Info string
Expand Down
151 changes: 134 additions & 17 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ import (
"net/url"
"os"
"time"
"strconv"

"github.com/ricbra/rabbitmq-cli-consumer/command"
"github.com/ricbra/rabbitmq-cli-consumer/config"
"github.com/andrefigueira/rabbitmq-cli-consumer/command"
"github.com/andrefigueira/rabbitmq-cli-consumer/config"
"github.com/streadway/amqp"
)

Expand All @@ -38,6 +39,8 @@ type Consumer struct {
IncludeMetadata bool
StrictExitCode bool
OnFailure int
DeadLetter bool
Retry int
}

type Properties struct {
Expand Down Expand Up @@ -80,6 +83,17 @@ func (c *Consumer) Consume() {

c.InfLogger.Println("Succeeded registering consumer.")

var sendCh *amqp.Channel

if c.DeadLetter {
var err error
sendCh, err = c.Connection.Channel()
if err != nil {
c.ErrLogger.Println("Could not open channel to republish failed jobs %s", err)
}
defer sendCh.Close()
}

defer c.Connection.Close()
defer c.Channel.Close()

Expand All @@ -95,12 +109,12 @@ func (c *Consumer) Consume() {
input := d.Body

if c.IncludeMetadata {
c.InfLogger.Println("reading deliveries")
input, err = json.Marshal(&struct {
Properties `json:"properties"`
DeliveryInfo `json:"delivery_info"`
Body string `json:"body"`
}{

} {
Properties: Properties{
Headers: d.Headers,
ContentType: d.ContentType,
Expand Down Expand Up @@ -148,15 +162,68 @@ func (c *Consumer) Consume() {
input = b.Bytes()
}

cmd := c.Factory.Create(base64.StdEncoding.EncodeToString(input))
if c.DeadLetter {
var retryCount int

if d.Headers == nil {
d.Headers = make(map[string]interface{}, 0)
}

retry, ok := d.Headers["retry_count"]

if !ok {
retry = "1"
}

c.InfLogger.Println(fmt.Sprintf("retry %s", retry))

retryCount, err = strconv.Atoi(retry.(string))

if err != nil {
c.ErrLogger.Fatal("could not parse retry header")
}

c.InfLogger.Println(fmt.Sprintf("retryCount : %d max retries: %d", retryCount, c.Retry))

cmd := c.Factory.Create(base64.StdEncoding.EncodeToString(input))

if c.Executer.Execute(cmd, d.Body[:]) == 0 {
d.Ack(true)
} else if retryCount >= c.Retry {
d.Nack(true, false)
} else {
retryCount++
d.Headers["retry_count"] = strconv.Itoa(retryCount)

republish := amqp.Publishing{
ContentType: d.ContentType,
ContentEncoding: d.ContentEncoding,
Timestamp: time.Now(),
Body: d.Body,
Headers: d.Headers,
}

c.InfLogger.Println("")

err = sendCh.Publish(d.Exchange, d.RoutingKey, false, false, republish)

if err != nil {
c.ErrLogger.Println("error republish %s", err)
}

d.Ack(true)
}
} else {
cmd := c.Factory.Create(base64.StdEncoding.EncodeToString(input))

exitCode := c.Executer.Execute(cmd)
exitCode := c.Executer.Execute(cmd, d.Body[:])

err := c.ack(d, exitCode)
err := c.ack(d, exitCode)

if err != nil {
c.ErrLogger.Fatalf("Message acknowledgement error: %v", err)
os.Exit(11)
if err != nil {
c.ErrLogger.Fatalf("Message acknowledgement error: %v", err)
os.Exit(11)
}
}
}
}()
Expand Down Expand Up @@ -251,6 +318,44 @@ func New(cfg *config.Config, factory *command.CommandFactory, errLogger, infLogg
cfg.Exchange.Type = "direct"
}

var table map[string]interface{}
deadLetter := false

if "" != cfg.Deadexchange.Name {
infLogger.Printf("Declaring deadletter exchange \"%s\"...", cfg.Deadexchange.Name)
err = ch.ExchangeDeclare(
cfg.Deadexchange.Name,
cfg.Deadexchange.Type,
cfg.Deadexchange.Durable,
cfg.Deadexchange.AutoDelete,
false,
false,
amqp.Table{})

if nil != err {
return nil, errors.New(fmt.Sprintf("Failed to declare exchange: %s", err.Error()))
}

table = make(map[string]interface{}, 0)
table["x-dead-letter-exchange"] = cfg.Deadexchange.Name

if cfg.Deadexchange.RoutingKey != "" {
table["x-dead-letter-routing-key"] = cfg.Deadexchange.RoutingKey
}

infLogger.Printf("Declaring error queue \"%s\"...", cfg.Deadexchange.Queue)
_, err = ch.QueueDeclare(cfg.Deadexchange.Queue, true, false, false, false, amqp.Table{})

// Bind queue
infLogger.Printf("Binding error queue \"%s\" to dead letter exchange \"%s\"...", cfg.Deadexchange.Queue, cfg.Deadexchange.Name)
err = ch.QueueBind(cfg.Deadexchange.Queue, cfg.Deadexchange.RoutingKey, cfg.Deadexchange.Name, false, amqp.Table{})

if nil != err {
return nil, errors.New(fmt.Sprintf("Failed to bind queue to dead-letter exchange: %s", err.Error()))
}
deadLetter = true
}

// Empty Exchange name means default, no need to declare
if "" != cfg.Exchange.Name {
infLogger.Printf("Declaring exchange \"%s\"...", cfg.Exchange.Name)
Expand All @@ -260,12 +365,23 @@ func New(cfg *config.Config, factory *command.CommandFactory, errLogger, infLogg
return nil, errors.New(fmt.Sprintf("Failed to declare exchange: %s", err.Error()))
}

if cfg.QueueSettings.MessageTTL > 0 {
table["x-message-ttl"] = int32(cfg.QueueSettings.MessageTTL)
}

// Bind queue
infLogger.Printf("Binding queue \"%s\" to exchange \"%s\"...", cfg.RabbitMq.Queue, cfg.Exchange.Name)
infLogger.Printf("Declaring queue \"%s\"...with args: %+v", cfg.Queue.Name, table)
_, err = ch.QueueDeclare(cfg.RabbitMq.Queue, true, false, false, false, table)

if nil != err {
return nil, errors.New(fmt.Sprintf("Failed to declare queue: %s", err.Error()))
}


err = ch.QueueBind(cfg.RabbitMq.Queue, transformToStringValue(cfg.QueueSettings.Routingkey), transformToStringValue(cfg.Exchange.Name), false, nil)

if nil != err {
return nil, errors.New(fmt.Sprintf("Failed to bind queue to exchange: %s", err.Error()))
return nil, errors.New(fmt.Sprintf("Failed to bind queue exchange: %s", err.Error()))
}
}

Expand All @@ -279,22 +395,23 @@ func New(cfg *config.Config, factory *command.CommandFactory, errLogger, infLogg
Executer: command.New(errLogger, infLogger),
Compression: cfg.RabbitMq.Compression,
OnFailure: cfg.RabbitMq.Onfailure,
DeadLetter: deadLetter,
Retry: cfg.Deadexchange.Retry,
}, nil
}

func sanitizeQueueArgs(cfg *config.Config) amqp.Table {

args := make(amqp.Table)

if cfg.QueueSettings.MessageTTL > 0 {
args["x-message-ttl"] = int32(cfg.QueueSettings.MessageTTL)
}

if cfg.QueueSettings.DeadLetterExchange != "" {
args["x-dead-letter-exchange"] = transformToStringValue(cfg.QueueSettings.DeadLetterExchange)
if cfg.Deadexchange.Name != "" {
args["x-dead-letter-exchange"] = transformToStringValue(cfg.Deadexchange.Name)

if cfg.QueueSettings.DeadLetterRoutingKey != "" {
args["x-dead-letter-routing-key"] = transformToStringValue(cfg.QueueSettings.DeadLetterRoutingKey)
if cfg.Deadexchange.RoutingKey != "" {
args["x-dead-letter-routing-key"] = transformToStringValue(cfg.Deadexchange.RoutingKey)
}
}

Expand Down
10 changes: 9 additions & 1 deletion example.conf
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ autodelete=Off
type=direct
durable=On

[deadexchange]
name = name
autodelete=Off
type=fanout
durable=true
queue=taskerrors
retry=3

[queuesettings]
routingkey=somekey
messagettl=30000
Expand All @@ -25,4 +33,4 @@ deadLetterroutingkey=someroutingkey

[logs]
error = /tmp/error.log
info = /tmp/info.log
info = /tmp/info.log
8 changes: 4 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (
"os"

"github.com/codegangsta/cli"
"github.com/ricbra/rabbitmq-cli-consumer/command"
"github.com/ricbra/rabbitmq-cli-consumer/config"
"github.com/ricbra/rabbitmq-cli-consumer/consumer"
"github.com/andrefigueira/rabbitmq-cli-consumer/command"
"github.com/andrefigueira/rabbitmq-cli-consumer/config"
"github.com/andrefigueira/rabbitmq-cli-consumer/consumer"
)

func main() {
Expand Down Expand Up @@ -73,7 +73,7 @@ func main() {
cfg.RabbitMq.Queue = c.String("queue-name")
}

factory := command.Factory(c.String("executable"))
factory := command.Factory(c.String("executable"))

client, err := consumer.New(cfg, factory, errLogger, infLogger)
if err != nil {
Expand Down