Skip to content

Commit 4f604bb

Browse files
Log an error with XGBosot failures and reduced the info level logs of ReliableMessage (#2659)
Co-authored-by: Yuan-Ting Hsieh (謝沅廷) <[email protected]>
1 parent 83a968e commit 4f604bb

File tree

2 files changed

+26
-20
lines changed

2 files changed

+26
-20
lines changed

nvflare/apis/utils/reliable_message.py

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ def process(self, request: Shareable, fl_ctx: FLContext) -> Shareable:
111111
self.tx_timeout = request.get_header(HEADER_TX_TIMEOUT)
112112

113113
# start processing
114-
ReliableMessage.info(fl_ctx, f"started processing request of topic {self.topic}")
114+
ReliableMessage.debug(fl_ctx, f"started processing request of topic {self.topic}")
115115
self.executor.submit(self._do_request, request, fl_ctx)
116116
return _status_reply(STATUS_IN_PROCESS) # ack
117117
elif self.result:
@@ -143,14 +143,14 @@ def process(self, request: Shareable, fl_ctx: FLContext) -> Shareable:
143143
ReliableMessage.error(fl_ctx, f"aborting processing since exceeded max tx time {self.tx_timeout}")
144144
return _status_reply(STATUS_ABORTED)
145145
else:
146-
ReliableMessage.info(fl_ctx, "got query: request is in-process")
146+
ReliableMessage.debug(fl_ctx, "got query: request is in-process")
147147
return _status_reply(STATUS_IN_PROCESS)
148148

149149
def _try_reply(self, fl_ctx: FLContext):
150150
engine = fl_ctx.get_engine()
151151
self.replying = True
152152
start_time = time.time()
153-
ReliableMessage.info(fl_ctx, f"try to send reply back to {self.source}: {self.per_msg_timeout=}")
153+
ReliableMessage.debug(fl_ctx, f"try to send reply back to {self.source}: {self.per_msg_timeout=}")
154154
ack = engine.send_aux_request(
155155
targets=[self.source],
156156
topic=TOPIC_RELIABLE_REPLY,
@@ -164,15 +164,15 @@ def _try_reply(self, fl_ctx: FLContext):
164164
if rc == ReturnCode.OK:
165165
# reply sent successfully!
166166
self.reply_time = time.time()
167-
ReliableMessage.info(fl_ctx, f"sent reply successfully in {time_spent} secs")
167+
ReliableMessage.debug(fl_ctx, f"sent reply successfully in {time_spent} secs")
168168
else:
169169
ReliableMessage.error(
170170
fl_ctx, f"failed to send reply in {time_spent} secs: {rc=}; will wait for requester to query"
171171
)
172172

173173
def _do_request(self, request: Shareable, fl_ctx: FLContext):
174174
start_time = time.time()
175-
ReliableMessage.info(fl_ctx, "invoking request handler")
175+
ReliableMessage.debug(fl_ctx, "invoking request handler")
176176
try:
177177
result = self.request_handler_f(self.topic, request, fl_ctx)
178178
except Exception as e:
@@ -184,7 +184,7 @@ def _do_request(self, request: Shareable, fl_ctx: FLContext):
184184
result.set_header(HEADER_OP, OP_REPLY)
185185
result.set_header(HEADER_TOPIC, self.topic)
186186
self.result = result
187-
ReliableMessage.info(fl_ctx, f"finished request handler in {time.time()-start_time} secs")
187+
ReliableMessage.debug(fl_ctx, f"finished request handler in {time.time()-start_time} secs")
188188
self._try_reply(fl_ctx)
189189

190190

@@ -274,7 +274,7 @@ def _receive_request(cls, topic: str, request: Shareable, fl_ctx: FLContext):
274274
cls.error(fl_ctx, f"no handler registered for request {rm_topic=}")
275275
return make_reply(ReturnCode.TOPIC_UNKNOWN)
276276
receiver = cls._get_or_create_receiver(rm_topic, request, handler_f)
277-
cls.info(fl_ctx, f"received request {rm_topic=}")
277+
cls.debug(fl_ctx, f"received request {rm_topic=}")
278278
return receiver.process(request, fl_ctx)
279279
elif op == OP_QUERY:
280280
receiver = cls._req_receivers.get(tx_id)
@@ -297,7 +297,7 @@ def _receive_reply(cls, topic: str, request: Shareable, fl_ctx: FLContext):
297297
cls.error(fl_ctx, "received reply but we are no longer waiting for it")
298298
else:
299299
assert isinstance(receiver, _ReplyReceiver)
300-
cls.info(fl_ctx, f"received reply in {time.time()-receiver.tx_start_time} secs - set waiter")
300+
cls.debug(fl_ctx, f"received reply in {time.time()-receiver.tx_start_time} secs - set waiter")
301301
receiver.process(request)
302302
return make_reply(ReturnCode.OK)
303303

@@ -492,7 +492,7 @@ def _send_request(
492492
return make_reply(ReturnCode.COMMUNICATION_ERROR)
493493

494494
if num_tries > 0:
495-
cls.info(fl_ctx, f"retry #{num_tries} sending request: {per_msg_timeout=}")
495+
cls.debug(fl_ctx, f"retry #{num_tries} sending request: {per_msg_timeout=}")
496496

497497
ack = engine.send_aux_request(
498498
targets=[target],
@@ -509,23 +509,23 @@ def _send_request(
509509
# the reply is already the result - we are done!
510510
# this could happen when we didn't get positive ack for our first request, and the result was
511511
# already produced when we did the 2nd request (this request).
512-
cls.info(fl_ctx, f"C1: received result in {time.time()-receiver.tx_start_time} seconds; {rc=}")
512+
cls.debug(fl_ctx, f"C1: received result in {time.time()-receiver.tx_start_time} seconds; {rc=}")
513513
return ack
514514

515515
# the ack is a status report - check status
516516
status = ack.get_header(HEADER_STATUS)
517517
if status and status != STATUS_NOT_RECEIVED:
518518
# status should never be STATUS_NOT_RECEIVED, unless there is a bug in the receiving logic
519519
# STATUS_NOT_RECEIVED is only possible during "query" phase.
520-
cls.info(fl_ctx, f"received status ack: {rc=} {status=}")
520+
cls.debug(fl_ctx, f"received status ack: {rc=} {status=}")
521521
break
522522

523523
if time.time() + cls._query_interval - receiver.tx_start_time >= tx_timeout:
524524
cls.error(fl_ctx, f"aborting send_request since it will exceed {tx_timeout=}")
525525
return make_reply(ReturnCode.COMMUNICATION_ERROR)
526526

527527
# we didn't get a positive ack - wait a short time and re-send the request.
528-
cls.info(fl_ctx, f"unsure the request was received ({rc=}): will retry in {cls._query_interval} secs")
528+
cls.debug(fl_ctx, f"unsure the request was received ({rc=}): will retry in {cls._query_interval} secs")
529529
num_tries += 1
530530
start = time.time()
531531
while time.time() - start < cls._query_interval:
@@ -534,7 +534,7 @@ def _send_request(
534534
return make_reply(ReturnCode.TASK_ABORTED)
535535
time.sleep(0.1)
536536

537-
cls.info(fl_ctx, "request was received by the peer - will query for result")
537+
cls.debug(fl_ctx, "request was received by the peer - will query for result")
538538
return cls._query_result(target, abort_signal, fl_ctx, receiver)
539539

540540
@classmethod
@@ -566,7 +566,7 @@ def _query_result(
566566
# we already received result sent by the target.
567567
# Note that we don't wait forever here - we only wait for _query_interval, so we could
568568
# check other condition and/or send query to ask for result.
569-
cls.info(fl_ctx, f"C2: received result in {time.time()-receiver.tx_start_time} seconds")
569+
cls.debug(fl_ctx, f"C2: received result in {time.time()-receiver.tx_start_time} seconds")
570570
return receiver.result
571571

572572
if abort_signal and abort_signal.triggered:
@@ -580,7 +580,7 @@ def _query_result(
580580
# send a query. The ack of the query could be the result itself, or a status report.
581581
# Note: the ack could be the result because we failed to receive the result sent by the target earlier.
582582
num_tries += 1
583-
cls.info(fl_ctx, f"query #{num_tries}: try to get result from {target}: {per_msg_timeout=}")
583+
cls.debug(fl_ctx, f"query #{num_tries}: try to get result from {target}: {per_msg_timeout=}")
584584
ack = engine.send_aux_request(
585585
targets=[target],
586586
topic=TOPIC_RELIABLE_REQUEST,
@@ -594,7 +594,7 @@ def _query_result(
594594
op = ack.get_header(HEADER_OP)
595595
if op == OP_REPLY:
596596
# the ack is result itself!
597-
cls.info(fl_ctx, f"C3: received result in {time.time()-receiver.tx_start_time} seconds")
597+
cls.debug(fl_ctx, f"C3: received result in {time.time()-receiver.tx_start_time} seconds")
598598
return ack
599599

600600
status = ack.get_header(HEADER_STATUS)
@@ -606,6 +606,6 @@ def _query_result(
606606
cls.error(fl_ctx, f"peer {target} aborted processing!")
607607
return _error_reply(ReturnCode.EXECUTION_EXCEPTION, "Aborted")
608608

609-
cls.info(fl_ctx, f"will retry query in {cls._query_interval} secs: {rc=} {status=} {op=}")
609+
cls.debug(fl_ctx, f"will retry query in {cls._query_interval} secs: {rc=} {status=} {op=}")
610610
else:
611-
cls.info(fl_ctx, f"will retry query in {cls._query_interval} secs: {rc=}")
611+
cls.debug(fl_ctx, f"will retry query in {cls._query_interval} secs: {rc=}")

nvflare/app_opt/xgboost/histogram_based_v2/adaptors/adaptor.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,16 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14-
14+
import logging
1515
import multiprocessing
1616
import os
1717
import sys
1818
import threading
1919
import time
2020
from abc import ABC, abstractmethod
2121

22+
from xgboost.core import XGBoostError
23+
2224
from nvflare.apis.fl_component import FLComponent
2325
from nvflare.apis.fl_context import FLContext
2426
from nvflare.apis.signal import Signal
@@ -46,6 +48,7 @@ def __init__(self, app_name: str, runner, in_process: bool, workspace: Workspace
4648
self.started = True
4749
self.stopped = False
4850
self.exit_code = 0
51+
self.logger = logging.getLogger(self.__class__.__name__)
4952

5053
def start(self, ctx: dict):
5154
"""Start the runner and wait for it to finish.
@@ -67,8 +70,11 @@ def start(self, ctx: dict):
6770
self.runner.run(ctx)
6871
self.stopped = True
6972
except Exception as e:
70-
secure_log_traceback()
7173
self.error = f"Exception starting {self.app_name} runner: {secure_format_exception(e)}"
74+
self.logger.error(self.error)
75+
# XGBoost already prints a traceback
76+
if not isinstance(e, XGBoostError):
77+
secure_log_traceback()
7278
self.started = False
7379
self.exit_code = Constant.EXIT_CODE_CANT_START
7480
self.stopped = True

0 commit comments

Comments
 (0)