@@ -122,6 +122,9 @@ class Pubsub(Service, IPubsub):
122122 strict_signing : bool
123123 sign_key : PrivateKey
124124
125+ # Set of blacklisted peer IDs
126+ blacklisted_peers : set [ID ]
127+
125128 event_handle_peer_queue_started : trio .Event
126129 event_handle_dead_peer_queue_started : trio .Event
127130
@@ -201,6 +204,9 @@ def __init__(
201204
202205 self .counter = int (time .time ())
203206
207+ # Set of blacklisted peer IDs
208+ self .blacklisted_peers = set ()
209+
204210 self .event_handle_peer_queue_started = trio .Event ()
205211 self .event_handle_dead_peer_queue_started = trio .Event ()
206212
@@ -320,6 +326,82 @@ def get_msg_validators(self, msg: rpc_pb2.Message) -> tuple[TopicValidator, ...]
320326 if topic in self .topic_validators
321327 )
322328
329+ def add_to_blacklist (self , peer_id : ID ) -> None :
330+ """
331+ Add a peer to the blacklist.
332+ When a peer is blacklisted:
333+ - Any existing connection to that peer is immediately closed and removed
334+ - The peer is removed from all topic subscription mappings
335+ - Future connection attempts from this peer will be rejected
336+ - Messages forwarded by or originating from this peer will be dropped
337+ - The peer will not be able to participate in pubsub communication
338+
339+ :param peer_id: the peer ID to blacklist
340+ """
341+ self .blacklisted_peers .add (peer_id )
342+ logger .debug ("Added peer %s to blacklist" , peer_id )
343+ self .manager .run_task (self ._teardown_if_connected , peer_id )
344+
345+ async def _teardown_if_connected (self , peer_id : ID ) -> None :
346+ """Close their stream and remove them if connected"""
347+ stream = self .peers .get (peer_id )
348+ if stream is not None :
349+ try :
350+ await stream .reset ()
351+ except Exception :
352+ pass
353+ del self .peers [peer_id ]
354+ # Also remove from any subscription maps:
355+ for _topic , peerset in self .peer_topics .items ():
356+ if peer_id in peerset :
357+ peerset .discard (peer_id )
358+
359+ def remove_from_blacklist (self , peer_id : ID ) -> None :
360+ """
361+ Remove a peer from the blacklist.
362+ Once removed from the blacklist:
363+ - The peer can establish new connections to this node
364+ - Messages from this peer will be processed normally
365+ - The peer can participate in topic subscriptions and message forwarding
366+
367+ :param peer_id: the peer ID to remove from blacklist
368+ """
369+ self .blacklisted_peers .discard (peer_id )
370+ logger .debug ("Removed peer %s from blacklist" , peer_id )
371+
372+ def is_peer_blacklisted (self , peer_id : ID ) -> bool :
373+ """
374+ Check if a peer is blacklisted.
375+
376+ :param peer_id: the peer ID to check
377+ :return: True if peer is blacklisted, False otherwise
378+ """
379+ return peer_id in self .blacklisted_peers
380+
381+ def clear_blacklist (self ) -> None :
382+ """
383+ Clear all peers from the blacklist.
384+ This removes all blacklist restrictions, allowing previously blacklisted
385+ peers to:
386+ - Establish new connections
387+ - Send and forward messages
388+ - Participate in topic subscriptions
389+
390+ """
391+ self .blacklisted_peers .clear ()
392+ logger .debug ("Cleared all peers from blacklist" )
393+
394+ def get_blacklisted_peers (self ) -> set [ID ]:
395+ """
396+ Get a copy of the current blacklisted peers.
397+ Returns a snapshot of all currently blacklisted peer IDs. These peers
398+ are completely isolated from pubsub communication - their connections
399+ are rejected and their messages are dropped.
400+
401+ :return: a set containing all blacklisted peer IDs
402+ """
403+ return self .blacklisted_peers .copy ()
404+
323405 async def stream_handler (self , stream : INetStream ) -> None :
324406 """
325407 Stream handler for pubsub. Gets invoked whenever a new stream is
@@ -346,6 +428,10 @@ async def wait_until_ready(self) -> None:
346428 await self .event_handle_dead_peer_queue_started .wait ()
347429
348430 async def _handle_new_peer (self , peer_id : ID ) -> None :
431+ if self .is_peer_blacklisted (peer_id ):
432+ logger .debug ("Rejecting blacklisted peer %s" , peer_id )
433+ return
434+
349435 try :
350436 stream : INetStream = await self .host .new_stream (peer_id , self .protocols )
351437 except SwarmException as error :
@@ -359,7 +445,6 @@ async def _handle_new_peer(self, peer_id: ID) -> None:
359445 except StreamClosed :
360446 logger .debug ("Fail to add new peer %s: stream closed" , peer_id )
361447 return
362- # TODO: Check if the peer in black list.
363448 try :
364449 self .router .add_peer (peer_id , stream .get_protocol ())
365450 except Exception as error :
@@ -609,9 +694,20 @@ async def push_msg(self, msg_forwarder: ID, msg: rpc_pb2.Message) -> None:
609694 """
610695 logger .debug ("attempting to publish message %s" , msg )
611696
612- # TODO: Check if the `source` is in the blacklist. If yes, reject.
697+ # Check if the message forwarder (source) is in the blacklist. If yes, reject.
698+ if self .is_peer_blacklisted (msg_forwarder ):
699+ logger .debug (
700+ "Rejecting message from blacklisted source peer %s" , msg_forwarder
701+ )
702+ return
613703
614- # TODO: Check if the `from` is in the blacklist. If yes, reject.
704+ # Check if the message originator (from) is in the blacklist. If yes, reject.
705+ msg_from_peer = ID (msg .from_id )
706+ if self .is_peer_blacklisted (msg_from_peer ):
707+ logger .debug (
708+ "Rejecting message from blacklisted originator peer %s" , msg_from_peer
709+ )
710+ return
615711
616712 # If the message is processed before, return(i.e., don't further process the message) # noqa: E501
617713 if self ._is_msg_seen (msg ):
0 commit comments