Skip to content

Commit a5465bd

Browse files
committed
Handle failure to publish message
1 parent d7f697b commit a5465bd

File tree

1 file changed

+40
-13
lines changed

1 file changed

+40
-13
lines changed

pulsar/client/amqp_exchange.py

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -233,15 +233,20 @@ def publish(self, name, payload):
233233
with pools.producers[connection].acquire() as producer:
234234
log.debug("%sHave producer for publishing to key %s", publish_log_prefix, key)
235235
publish_kwds = self.__prepare_publish_kwds(publish_log_prefix)
236-
producer.publish(
237-
payload,
238-
serializer='json',
239-
exchange=self.__exchange,
240-
declare=[self.__exchange],
241-
routing_key=key,
242-
**publish_kwds
243-
)
244-
log.debug("%sPublished to key %s", publish_log_prefix, key)
236+
try:
237+
producer.publish(
238+
payload,
239+
serializer='json',
240+
exchange=self.__exchange,
241+
declare=[self.__exchange],
242+
routing_key=key,
243+
**publish_kwds
244+
)
245+
log.debug("%sPublished to key %s", publish_log_prefix, key)
246+
return True
247+
except Exception as e:
248+
log.error("%sFailed to publish to key %s: %s", publish_log_prefix, key, str(e))
249+
self.__fail_publish(name, payload, e)
245250

246251
def ack_manager(self):
247252
log.debug('Acknowledgement manager thread alive')
@@ -261,15 +266,37 @@ def ack_manager(self):
261266
'republishing original message on queue %s',
262267
unack_uuid, resubmit_queue)
263268
try:
264-
self.publish(resubmit_queue, payload)
265-
self.publish_uuid_store.set_time(unack_uuid)
269+
if self.publish(resubmit_queue, payload):
270+
self.publish_uuid_store.set_time(unack_uuid)
271+
else:
272+
# If we fail to publish, we need to remove the uuid from the store
273+
# so it doesn't get republished again.
274+
self.__discard_publish_uuid(unack_uuid, failed)
266275
except self.recoverable_exceptions as e:
267276
self.__handle_io_error(e)
268277
continue
269278
except Exception:
270-
log.exception("Problem with acknowledgement manager, leaving ack_manager method in problematic state!")
279+
log.exception("Problem with acknowledgement manager, leaving ack manager in problematic state!")
271280
raise
272-
log.debug('Acknowledgement manager thread exiting')
281+
282+
def __fail_publish(self, name, payload, exception):
283+
# Send just a few safe keys if we have them:
284+
keys_to_send = [
285+
"job_id",
286+
"returncode",
287+
"stdout",
288+
"stderr",
289+
"job_stdout",
290+
"job_stderr",
291+
]
292+
new_payload = {}
293+
for key in keys_to_send:
294+
if key in payload:
295+
new_payload[key] = payload[key]
296+
# Add the original payload to the new payload
297+
new_payload["exception"] = str(exception)
298+
new_payload["status"] = "failed"
299+
self.publish(name, new_payload)
273300

274301
def __get_payload(self, uuid, failed):
275302
"""Retry reading a message from the publish_uuid_store once, delete on the second failure."""

0 commit comments

Comments
 (0)