Skip to content
This repository was archived by the owner on Nov 9, 2024. It is now read-only.

Commit 7e128c8

Browse files
authored
Merge 7238389 into 998e374
2 parents 998e374 + 7238389 commit 7e128c8

File tree

2 files changed

+188
-13
lines changed

2 files changed

+188
-13
lines changed

pkg/notify/amqp.go

Lines changed: 187 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,34 +2,139 @@ package notify
22

33
import (
44
"encoding/json"
5-
"fmt"
5+
"errors"
66
"github.com/analogj/lodestone-publisher/pkg/model"
77
"github.com/streadway/amqp"
8+
"log"
9+
"os"
10+
"time"
811
)
912

13+
//Based on https://github.com/streadway/amqp/blob/master/example_client_test.go
14+
1015
type AmqpNotify struct {
16+
logger *log.Logger
1117
client *amqp.Connection
1218
channel *amqp.Channel
1319
exchange string
1420
queue string
21+
22+
done chan bool
23+
notifyConnClose chan *amqp.Error
24+
notifyChanClose chan *amqp.Error
25+
notifyConfirm chan amqp.Confirmation
26+
isReady bool
1527
}
1628

29+
const (
30+
// When reconnecting to the server after connection failure
31+
reconnectDelay = 5 * time.Second
32+
33+
// When setting up the channel after a channel exception
34+
reInitDelay = 2 * time.Second
35+
36+
// When resending messages the server didn't confirm
37+
resendDelay = 5 * time.Second
38+
)
39+
40+
var (
41+
errNotConnected = errors.New("not connected to a server")
42+
errAlreadyClosed = errors.New("already closed: not connected to the server")
43+
errShutdown = errors.New("session is shutting down")
44+
)
45+
1746
func (n *AmqpNotify) Init(config map[string]string) error {
1847
n.exchange = config["exchange"]
1948
n.queue = config["queue"]
49+
n.logger = log.New(os.Stdout, "", log.LstdFlags)
50+
51+
go n.handleReconnect(config["amqp-url"])
52+
return nil
53+
}
54+
55+
// handleReconnect will wait for a connection error on
56+
// notifyConnClose, and then continuously attempt to reconnect.
57+
func (n *AmqpNotify) handleReconnect(addr string) {
58+
for {
59+
n.isReady = false
60+
log.Println("Attempting to connect")
61+
62+
conn, err := n.connect(addr)
63+
64+
if err != nil {
65+
log.Println("Failed to connect. Retrying...")
66+
67+
select {
68+
case <-n.done:
69+
return
70+
case <-time.After(reconnectDelay):
71+
}
72+
continue
73+
}
74+
75+
if done := n.handleReInit(conn); done {
76+
break
77+
}
78+
}
79+
}
80+
81+
// connect will create a new AMQP connection
82+
func (n *AmqpNotify) connect(addr string) (*amqp.Connection, error) {
83+
conn, err := amqp.Dial(addr)
84+
85+
if err != nil {
86+
return nil, err
87+
}
88+
89+
n.changeClient(conn)
90+
log.Println("Connected!")
91+
return conn, nil
92+
}
93+
94+
// handleReconnect will wait for a channel error
95+
// and then continuously attempt to re-initialize both channels
96+
func (n *AmqpNotify) handleReInit(conn *amqp.Connection) bool {
97+
for {
98+
n.isReady = false
99+
100+
err := n.init(conn)
101+
102+
if err != nil {
103+
log.Println("Failed to initialize channel. Retrying...")
104+
105+
select {
106+
case <-n.done:
107+
return true
108+
case <-time.After(reInitDelay):
109+
}
110+
continue
111+
}
112+
113+
select {
114+
case <-n.done:
115+
return true
116+
case <-n.notifyConnClose:
117+
log.Println("Connection closed. Reconnecting...")
118+
return false
119+
case <-n.notifyChanClose:
120+
log.Println("Channel closed. Re-running init...")
121+
}
122+
}
123+
}
124+
125+
// init will initialize channel & declare queue
126+
func (n *AmqpNotify) init(conn *amqp.Connection) error {
127+
ch, err := conn.Channel()
20128

21-
client, err := amqp.Dial(config["amqp-url"])
22129
if err != nil {
23130
return err
24131
}
25-
n.client = client
26132

27-
//test connection
28-
ch, err := client.Channel()
133+
err = ch.Confirm(false)
134+
29135
if err != nil {
30136
return err
31137
}
32-
n.channel = ch
33138

34139
err = ch.ExchangeDeclare(
35140
n.exchange,
@@ -56,35 +161,104 @@ func (n *AmqpNotify) Init(config map[string]string) error {
56161
return err
57162
}
58163

59-
return err
164+
n.changeChannel(ch)
165+
n.isReady = true
166+
log.Println("Setup!")
167+
168+
return nil
60169
}
61170

171+
// changeClient takes a new connection to the queue,
172+
// and updates the close listener to reflect this.
173+
func (n *AmqpNotify) changeClient(client *amqp.Connection) {
174+
n.client = client
175+
n.notifyConnClose = make(chan *amqp.Error)
176+
n.client.NotifyClose(n.notifyConnClose)
177+
}
178+
179+
// changeChannel takes a new channel to the queue,
180+
// and updates the channel listeners to reflect this.
181+
func (n *AmqpNotify) changeChannel(channel *amqp.Channel) {
182+
n.channel = channel
183+
n.notifyChanClose = make(chan *amqp.Error)
184+
n.notifyConfirm = make(chan amqp.Confirmation, 1)
185+
n.channel.NotifyClose(n.notifyChanClose)
186+
n.channel.NotifyPublish(n.notifyConfirm)
187+
}
188+
189+
// Publish will push data onto the queue, and wait for a confirm.
190+
// If no confirms are received until within the resendTimeout,
191+
// it continuously re-sends messages until a confirm is received.
192+
// This will block until the server sends a confirm. Errors are
193+
// only returned if the push action itself fails, see UnsafePush.
62194
func (n *AmqpNotify) Publish(event model.S3Event) error {
63-
fmt.Println("Publishing event..")
195+
if !n.isReady {
196+
return errors.New("failed to publish event: not connected")
197+
}
198+
199+
n.logger.Println("Publishing event..")
64200

65201
b, err := json.Marshal(event)
66202
if err != nil {
67203
return err
68204
}
69205

206+
for {
207+
err := n.unsafePublish(b)
208+
if err != nil {
209+
n.logger.Println("Publish failed. Retrying...")
210+
select {
211+
case <-n.done:
212+
return errShutdown
213+
case <-time.After(resendDelay):
214+
}
215+
continue
216+
}
217+
select {
218+
case confirm := <-n.notifyConfirm:
219+
if confirm.Ack {
220+
n.logger.Println("Publish confirmed!")
221+
return nil
222+
}
223+
case <-time.After(resendDelay):
224+
}
225+
n.logger.Println("Publish didn't confirm. Retrying...")
226+
}
227+
}
228+
229+
// UnsafePush will push to the queue without checking for
230+
// confirmation. It returns an error if it fails to connect.
231+
// No guarantees are provided for whether the server will
232+
// recieve the message.
233+
func (n *AmqpNotify) unsafePublish(data []byte) error {
234+
if !n.isReady {
235+
return errNotConnected
236+
}
70237
return n.channel.Publish(
71238
n.exchange, // exchange
72239
n.queue, // routing key
73-
false, // mandatory
74-
false, // immediate
240+
false, // Mandatory
241+
false, // Immediate
75242
amqp.Publishing{
76243
ContentType: "application/json",
77-
Body: b,
244+
Body: data,
78245
},
79246
)
80247
}
81248

82249
func (n *AmqpNotify) Close() error {
83-
if err := n.client.Close(); err != nil {
250+
if !n.isReady {
251+
return errAlreadyClosed
252+
}
253+
err := n.channel.Close()
254+
if err != nil {
84255
return err
85256
}
86-
if err := n.channel.Close(); err != nil {
257+
err = n.client.Close()
258+
if err != nil {
87259
return err
88260
}
261+
close(n.done)
262+
n.isReady = false
89263
return nil
90264
}

pkg/watch/fs.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ func (fs *FsWatcher) Start(notifyClient notify.Interface, config map[string]stri
2828
fmt.Println("ERROR", err)
2929
}
3030

31+
fmt.Println("Start watching for filesystem events")
3132
done := make(chan bool)
3233

3334
go func() {

0 commit comments

Comments
 (0)