@@ -127,9 +127,16 @@ func (s *server) internallyForwardMessage(ctx context.Context, req *messagingpb.
127
127
}
128
128
129
129
rendezvousRecord , err := s .data .GetRendezvous (ctx , streamKey )
130
- if err == nil {
130
+ switch err {
131
+ case nil :
131
132
log := log .WithField ("receiver_address" , rendezvousRecord .Address )
132
133
134
+ // Expired rendezvous record that likely wasn't cleaned up. Avoid forwarding,
135
+ // since we expect a broken state.
136
+ if time .Since (rendezvousRecord .ExpiresAt ) >= 0 {
137
+ return nil
138
+ }
139
+
133
140
// We got lucky and the receiver's stream is on the same RPC server as
134
141
// where the message is created. No forwarding between servers is required.
135
142
// Note that we always use the rendezvous record as the source of truth
@@ -150,12 +157,6 @@ func (s *server) internallyForwardMessage(ctx context.Context, req *messagingpb.
150
157
return nil
151
158
}
152
159
153
- // Expired rendezvous record that likely wasn't cleaned up. Avoid forwarding,
154
- // since we expect a broken state.
155
- if time .Since (rendezvousRecord .ExpiresAt ) >= 0 {
156
- return nil
157
- }
158
-
159
160
client , err := getInternalMessagingClient (rendezvousRecord .Address )
160
161
if err != nil {
161
162
log .WithError (err ).Warn ("failure creating internal grpc messaging client" )
@@ -188,7 +189,11 @@ func (s *server) internallyForwardMessage(ctx context.Context, req *messagingpb.
188
189
log .WithField ("result" , resp .Result ).Warn ("non-OK result sending redirected request" )
189
190
return err
190
191
}
191
- } else if err != rendezvous .ErrNotFound {
192
+
193
+ case rendezvous .ErrNotFound :
194
+ log .Debug ("dropping message without rendezvous record" )
195
+
196
+ default :
192
197
log .WithError (err ).Warn ("failure getting rendezvous record" )
193
198
return err
194
199
}
0 commit comments