package mq
import (
"context"
"fmt"
"github.com/furdarius/rabbitroutine"
amqp "github.com/rabbitmq/amqp091-go"
"testing"
"time"
)
func TestProductDelayMessage(t *testing.T) {
ctx := context.Background()
url := "amqp://admin:admin@127.0.0.1:5672/test"
routingKey := "test.delay.routingKey"
exchange := "test.delay.exchange"
queue := "test.delay.queue"
conn := rabbitroutine.NewConnector(rabbitroutine.Config{
// Max reconnect attempts
//ReconnectAttempts: 20,
// How long wait between reconnect
Wait: 1 * time.Second,
})
pool := rabbitroutine.NewPool(conn)
ensurePub := rabbitroutine.NewEnsurePublisher(pool)
pub := rabbitroutine.NewRetryPublisher(
ensurePub,
rabbitroutine.PublishMaxAttemptsSetup(3),
rabbitroutine.PublishDelaySetup(rabbitroutine.LinearDelay(10*time.Millisecond)),
)
go func() {
err := conn.Dial(ctx, url)
if err != nil {
panic(fmt.Sprintf("failed to establish RabbitMQ connection: %v", err))
}
}()
ch, err := conn.Channel(ctx)
if err != nil {
panic(fmt.Sprintf("failed to create channel: %v", err))
}
err = ch.ExchangeDeclare(exchange,
"x-delayed-message",
true,
false,
false,
false,
amqp.Table{
"x-delayed-type": "direct",
})
if err != nil {
panic(fmt.Sprintf("failed to declare exchange: %v", err))
}
_, err = ch.QueueDeclare(queue,
true,
false,
false,
false,
nil)
if err != nil {
panic(fmt.Sprintf("failed to declare queue: %v", err))
}
err = ch.QueueBind(queue,
routingKey,
exchange,
false, nil)
if err != nil {
panic(fmt.Sprintf("failed to bind queue: %v", err))
}
msg := amqp.Publishing{
Body: []byte("test"),
DeliveryMode: 2,
Headers: amqp.Table{
"x-delay": 5000, // 设置消息的延时时间
},
}
err = pub.Publish(ctx, exchange, routingKey, msg)
fmt.Println("result:", err)
}
when i use plugin: https://github.com/rabbitmq/rabbitmq-delayed-message-exchange, publish return error :queue not bound
here is my code:
Reproduction steps
Expected behavior
the delay message publish success, but Publish func return error