-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathPeer.py
More file actions
1439 lines (1280 loc) · 67.7 KB
/
Peer.py
File metadata and controls
1439 lines (1280 loc) · 67.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import ast
import csv
import json
import logging
import os
import queue
import random
import shutil
import socket
import struct
import threading
import time
import uuid
from collections import OrderedDict
from datetime import datetime, timezone
from email import message
from pathlib import Path
from pydoc import text
from typing import Any, Dict, List, Optional, Tuple
from Peer_utils import get_broadcast_address
BROADCAST_PORT = 9999 # Dedicated port for UDP broadcasts
BROADCAST_IP = '127.0.0.1'
MCAST_GRP = '224.1.1.1'
MCAST_PORT = 5007
class Peer():
"""Distributed sensor network peer.
Each `Peer` participates in discovery, membership management, bully leader
election, heartbeats, and totally ordered multicast for sensor data.
Attributes:
peer_id (str): Auto-generated UUID for this peer.
address (str): IP address where the peer listens for TCP connections.
port (int): TCP port for incoming connections.
isGroupLeader (bool): Whether this peer is the current leader.
groupView (dict): Known peers and metadata.
orderedPeerList (list[str]): Ordered list of peer IDs.
leader_id (str | None): UUID of current leader, if known.
"""
def __init__(self, address: str, port: int) -> None:
"""Initialize a peer instance.
Args:
address (str): Local IP address for this peer.
port (int): TCP port for this peer.
"""
# Auto-generate a unique ID per peer
self.peer_id = str(uuid.uuid4())
self.logger = self._setup_logger()
self.address = address
self.partOfNetwork = False
self.port = port
self.connection_dict = {}
self.isGroupLeader = False
self.tcp_thread = None
self.udp_thread = None
self.groupView = {}
self.orderedPeerList = []
self.successor = None # Changed to dict: {'peer_id':, 'address':, 'port':}
self.election_active = False
self.leader_id = None
self.last_leader_heartbeat = time.time()
self.heartbeat_interval = 5 # seconds
self.heartbeat_timeout = 20 # seconds
self.heartbeat_thread = None
self.leader_check_thread = None
self.pending_acks = {}
self.sequencer_peer_id = None # To be set when known
self.sequencer_sequence_number = None
self.local_sequence_number = 0
self.lock = threading.Lock()
self.multicast_socket = None
self.incoming_queue = queue.PriorityQueue()
self.expected_seq = 1
self.delivered_messages = []
self.delivered_seq_set = set()
self.seen_seq = set()
self.sent_history = OrderedDict()
self.history_max = 1000
self.missing_seq_last_request = {}
self.missing_seq_retry_interval = 2.0
self.multicast_thread_active = False
self.view_id = 0 # To track changes in group view
self.peer_last_heartbeat = {} # Map peer_id -> last_ack_time
self.election_timeout = 5
self.received_ok = False
self.seq_sync_active = False
self.seq_sync_responses = {}
self.view_sync_inflight = False
self.failed_peers = set()
# Sensor data config
self.sensor_interval_seconds = 15
self.sensor_thread = None
# Each peer writes into its own data directory for replication
self.data_dir = Path.cwd() / "data" / self.peer_id
# Per-connection TCP receive buffers for newline-framed messages
self.conn_buffers = {}
# Broadcast IP derived from local address (fallback to localhost)
self.broadcast_ip = (
"127.0.0.1" if self.address.startswith("127.") else get_broadcast_address(self.address)
)
def _setup_logger(self) -> logging.LoggerAdapter:
"""Configure per-peer logging to console and file."""
log_level = os.getenv("PEER_LOG_LEVEL", "INFO").upper()
logger_name = f"Peer.{self.peer_id[:8]}"
base_logger = logging.getLogger(logger_name)
base_logger.setLevel(logging.DEBUG)
if not base_logger.handlers:
Path("logs").mkdir(exist_ok=True)
fmt = logging.Formatter(
"[%(asctime)s][%(levelname)s][peer_id=%(peer_id)s] %(message)s",
"%H:%M:%S",
)
file_handler = logging.FileHandler(Path("logs") / f"{self.peer_id}.log", encoding="utf-8")
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(fmt)
console_handler = logging.StreamHandler()
console_handler.setLevel(getattr(logging, log_level, logging.INFO))
console_handler.setFormatter(fmt)
base_logger.addHandler(file_handler)
base_logger.addHandler(console_handler)
base_logger.propagate = False
return logging.LoggerAdapter(base_logger, {"peer_id": self.peer_id})
def _is_important_log(self, msg: str) -> bool:
"""Heuristic to surface only important INFO logs by default."""
important_prefixes = (
"Peer ID:",
"No response received. Assuming role of Group Leader.",
"Waiting to join the network",
"Peer ",
"Leader failure detected",
"Leader heartbeat timeout",
"Bully election",
"No OK received.",
"Received COORDINATOR",
"TCP and UDP listeners started.",
"Sensor thread started",
"Sequencer sync complete.",
)
return msg.startswith(important_prefixes)
def _log(self, msg: str, level: str = "DEBUG") -> None:
"""Log with filtering so INFO stays readable in demos."""
try:
text = str(msg)
except Exception:
text = "<unprintable message>"
lvl = (level or "DEBUG").upper()
if "ERROR" in text or text.startswith("[Error]"):
lvl = "ERROR"
elif "Failed" in text or "Exception" in text or "WARNING" in text or text.startswith("Warning"):
lvl = "WARNING"
elif lvl == "DEBUG" and self._is_important_log(text):
lvl = "INFO"
if lvl == "DEBUG":
self.logger.debug(text)
elif lvl == "INFO":
self.logger.info(text)
elif lvl == "WARNING":
self.logger.warning(text)
else:
self.logger.error(text)
def __str__(self) -> str:
"""Return a concise string representation of the peer.
Returns:
str: Summary including `peer_id`, `address`, and `port`.
"""
return f"Peer ID: {self.peer_id}, Address: {self.address}, Port: {self.port}"
def start(self) -> None:
"""Start the peer lifecycle.
Resets local data directory, starts listening threads, attempts to
join the network via broadcast, and initializes heartbeat/election
monitoring.
"""
self._log(f"Peer ID: {self.peer_id}")
# Reset this peer's data directory at startup for clean runs
try:
self.reset_data_dir()
except Exception as e:
self._log(f"Failed to reset data dir on start: {e}")
self.start_listening_threads()
self.broadcast_new_peer_request(self.broadcast_ip, 9999)
time_after_broadcast = time.time()
#how does a peer know it has been added to the group?
#maybe once it receives successor information from leader?
# if no reply is received within 5 seconds, assume leader
time.sleep(5) # wait for potential responses
while not self.partOfNetwork:
if time.time() - time_after_broadcast > 5:
self._log("No response received. Assuming role of Group Leader.")
self.isGroupLeader = True
self.leader_id = self.peer_id
# self.sequencer_peer_id = self.peer_id
# self.sequencer_sequence_number = 0
self.partOfNetwork = False
self.groupView[self.peer_id] = {
'address': self.address,
'port': self.port,
'role': 'leader'
}
self.orderedPeerList.append(self.peer_id)
break
#CONTINUE BROADCASTING UNTIL PART OF NETWORK OR DISCOVER ANOTHER PEER
while not self.partOfNetwork:
# self._log("Waiting to join the network...")
self.broadcast_new_peer_request(self.broadcast_ip, 9999)
time.sleep(5)
time.sleep(2) # WAIT A BIT TO ENSURE STABILITY
# Start fault tolerance threads
self.start_leader_check_thread()
if self.isGroupLeader:
self.start_heartbeat_thread()
# time.sleep(5) # WAIT FOR THE OTHER PEERS TO SETTLE
# If this peer self-assumed leadership (no election yet), set it as sequencer now
if self.isGroupLeader and not self.sequencer_peer_id:
self._log(f"Peer {self.peer_id} self-assumed leadership; announcing as coordinator/sequencer.")
self.announce_coordinator()
# if(self.peer_id == "peer1"):
# self.send_message_to_sequencer("Hello from peer1")
# if(self.peer_id == "peer3"):
# self.send_message_to_sequencer("Hello from peer3")
# if(self.peer_id == "peer2"):
# self.send_message_to_sequencer("Hello from peer2")
# time.sleep(5) # WAIT FOR MESSAGES TO BE PROCESSED
self.print_status()
# Start mocked sensor thread once part of network and multicast ready
try:
if self.sensor_thread is None:
self.start_sensor_thread()
except Exception as e:
self._log(f"Failed to start sensor thread: {type(e).__name__}: {e}")
# Keep the peer running by joining the threads
if self.tcp_thread:
self.tcp_thread.join()
if self.udp_thread:
self.udp_thread.join()
def send_message_to_sequencer(self, message: str) -> None:
"""Send a message to the current sequencer for ordering.
If this peer is the sequencer, it assigns the next sequence number
and multicasts the ordered message directly. Otherwise, it forwards
the request to the leader/sequencer over TCP.
Args:
message (str): Payload to be ordered (e.g., sensor data string).
"""
if self.sequencer_peer_id == self.peer_id:
# self._log(f"[Node {self.peer_id}] I am the sequencer, processing message directly: '{message}'")
with self.lock:
self.sequencer_sequence_number += 1
ordered_msg = (self.sequencer_sequence_number, message)
self.multicast_message_to_group(ordered_msg, from_peer_id=self.peer_id)
return
else:
# self._log(f"[Node {self.peer_id}] Sending: '{message}'")
# In a real system, this goes to the sequencer first
#wraps message with sequence number
#send message to sequencer peer
peer_info = self.groupView.get(self.sequencer_peer_id)
message = f"SEQUENCER_REQUEST:{message}"
if self.sequencer_peer_id:
try:
if self.sequencer_peer_id not in self.connection_dict:
self.send_connection_request(peer_info['address'], peer_info['port'], self.sequencer_peer_id)
self.send_message(self.sequencer_peer_id, message)
# self._log(f"Sent sequencer request to {self.sequencer_peer_id}: {message}")
except Exception as e:
self._log(f"Failed to send updated successor information to {self.sequencer_peer_id}: {e}")
else:
self._log("Sequencer peer ID unknown, cannot send message.")
def process_buffer(self) -> None:
"""Deliver messages from the hold-back buffer in sequence order.
Processes the `incoming_queue` (priority queue) and delivers messages
whose sequence number equals `expected_seq`. For each delivered
message, calls `handle_ordered_payload()` and increments
`expected_seq`. Stops when a gap is detected.
"""
while not self.incoming_queue.empty():
# Peek at the priority queue (lowest seq number first)
seq_id, msg = self.incoming_queue.queue[0]
if seq_id == self.expected_seq:
# Correct order! Deliver it.
self.incoming_queue.get()
self.delivered_messages.append(msg)
self.delivered_seq_set.add(seq_id)
self._log(f" ==> [Node {self.peer_id}] Delivered #{seq_id}: {msg}")
try:
self.handle_ordered_payload(msg, seq_id)
except Exception as e:
self._log(f"Error handling ordered payload (seq_id={seq_id}, msg={msg!r}): {type(e).__name__}: {e}")
self.expected_seq += 1
else:
# Out of order! Gap detected. Wait for the missing message.
self.request_missing_seq(self.expected_seq)
break
def connect(self) -> None:
"""Log a connection attempt.
Note:
This method currently only prints a message and does not
initiate a network connection.
"""
# self._log(f"Connecting to Peer {self.peer_id} at {self.address}:{self.port}")
def create_listening_socket(self) -> socket.socket:
"""Create a TCP listening socket bound to this peer's port.
Returns:
socket.socket: A listening TCP socket bound to `0.0.0.0:port`.
"""
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(('0.0.0.0', self.port))
s.listen()
return s
def accept_connection(self, listening_socket: socket.socket) -> None:
"""Accept incoming TCP connections and register peers.
Performs a simple handshake to parse the remote peer's `peer_id`,
IP, and port, updates `groupView` and `orderedPeerList`, warns on
endpoint collisions for the same `peer_id`, and spawns a
`receive_message()` thread.
Args:
listening_socket (socket.socket): The TCP listening socket.
"""
while True:
conn, addr = listening_socket.accept()
msg = conn.recv(1024) # Optional handshake message
decoded_msg = msg.decode()
# Parse: "HELLO from 127.0.0.1:5001 and peer id:peer2"
if " and peer id:" in decoded_msg:
parts = decoded_msg.split(" and peer id:")
address_port = parts[0].split(" from ")[1] # "127.0.0.1:5001"
peer_id = parts[1]
peer_ip, peer_port_str = address_port.split(":")
peer_port = int(peer_port_str)
else:
# Fallback, but should not happen
peer_port = 0
peer_id = "unknown"
self._log(f"Accepted connection from {addr} with port {peer_port} and peer id:{peer_id}")
# self.connection_dict[f"peer_{peer_port}"] = conn
self.connection_dict[peer_id] = conn
# Update local group membership knowledge
if peer_id not in self.groupView:
self.groupView[peer_id] = {
'address': peer_ip,
'port': peer_port,
'role': 'member'
}
if peer_id not in self.orderedPeerList:
self.orderedPeerList.append(peer_id)
self.orderedPeerList = sorted(self.orderedPeerList)
self._log(f"Updated group view on accept: {self.groupView}")
# If we're the leader, propagate updated view
if self.isGroupLeader:
self.view_id += 1
self.multicast_groupviewandorderedlist_update()
else:
# Collision guard: same peer_id seen with a different endpoint
existing = self.groupView.get(peer_id, {})
if existing.get('address') != peer_ip or existing.get('port') != peer_port:
self._log(f"WARNING: Detected peer_id collision for {peer_id}. Existing=({existing.get('address')}:{existing.get('port')}), Incoming=({peer_ip}:{peer_port}).")
threading.Thread(target=self.receive_message,
args=(peer_id,),
daemon=True).start()
def send_message(self, peer_key: str, message: str) -> None:
"""Send a newline-framed TCP message to a peer.
Args:
peer_key (str): Target peer ID.
message (str): Message payload to send.
"""
try:
if peer_key in self.connection_dict:
conn = self.connection_dict[peer_key]
# Ensure newline framing for TCP messages
if not message.endswith("\n"):
message = message + "\n"
conn.sendall(message.encode())
# self._log(f"Sent to {peer_key}: {message}")
else:
self._log(f"No connection found for {peer_key} while sending message only {self.connection_dict.keys()} available")
# self.handle_peer_failure(peer_key) # Clean up the group view
except Exception as e:
self._log(f"[Error] Connection lost to {peer_key}. Cleaning up...")
self.handle_peer_failure(peer_key) # Trigger cleanup logic
def request_missing_seq(self, seq_id: int) -> None:
"""Request a missing sequence from the sequencer with backoff."""
if not self.sequencer_peer_id or self.sequencer_peer_id == self.peer_id:
return
if seq_id in self.delivered_seq_set or seq_id in self.seen_seq:
return
now = time.time()
last = self.missing_seq_last_request.get(seq_id, 0.0)
if now - last < self.missing_seq_retry_interval:
return
self.missing_seq_last_request[seq_id] = now
try:
self.ensure_connection(self.sequencer_peer_id)
self.send_message(self.sequencer_peer_id, f"MISSING_SEQ_REQUEST:{seq_id}")
except Exception as e:
self._log(f"Failed to request missing seq {seq_id} from sequencer: {e}")
def receive_message(self, peer_key: str, buffer_size: int = 1024) -> None:
"""Continuously read and handle messages from a TCP peer.
Implements newline-delimited framing per connection and processes
protocol messages including successor info, heartbeats, view changes,
election, and coordinator announcements.
Args:
peer_key (str): The remote peer's ID.
buffer_size (int, optional): Read buffer size. Defaults to 1024.
"""
# self._log(f"Receive thread started for {peer_key}")
if peer_key in self.connection_dict:
conn = self.connection_dict[peer_key]
# Retrieve or initialize buffer for this connection
buffer = self.conn_buffers.get(peer_key, "")
while True:
try:
data = conn.recv(buffer_size)
if not data:
self._log(f"Connection closed by {peer_key}")
break
except Exception as e:
self._log(f"Connection error with {peer_key}: {e}")
if isinstance(e, ConnectionResetError):
self._log(f"[Network] Connection reset by {peer_key}. Initiating recovery.")
self.handle_peer_failure(peer_key)
break # Exit the thread gracefully
break
buffer += data.decode()
# Process complete newline-delimited messages
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
message = line.strip()
if not message:
continue
# self._log(f"Received from {peer_key}: {message}")
if message.startswith("SUCCESSOR_INFORMATION:"):
parts = message.split("SUCCESSOR_INFORMATION:")[1].split(":", 1)
successor_list_str = parts[0]
group_view_str = parts[1]
try:
successor_list = ast.literal_eval(successor_list_str)
group_view = ast.literal_eval(group_view_str)
except Exception as e:
self._log(f"Failed to parse SUCCESSOR_INFORMATION safely: {e}")
continue
self.handle_successor_information(successor_list, group_view)
self.partOfNetwork = True #ONLY AFTER RECEIVING SUCCESSOR INFO DO WE CONSIDER OURSELVES PART OF THE NETWORK
self._log(f"Peer {self.peer_id} is now part of the network.")
self._log("successor information processed. ", self.successor)
elif message == "HEARTBEAT":
# Only accept heartbeats from the known leader
if peer_key == self.leader_id:
self.last_leader_heartbeat = time.time()
# self._log(f"Received HEARTBEAT from leader {peer_key} at {self.last_leader_heartbeat}")
self.send_message(peer_key, "HEARTBEAT_ACK")
# Ensure leader heartbeat monitoring is active
leader_thread = getattr(self, "leader_check_thread", None)
if (not self.isGroupLeader and
(leader_thread is None or not leader_thread.is_alive())):
self.start_leader_check_thread()
else:
self._log(f"Ignoring heartbeat from non-leader {peer_key}")
# NEW: If I'm a leader and receive heartbeat from someone else claiming leadership
# NEW: If I'm a leader and receive heartbeat from someone else claiming leadership
if self.isGroupLeader:
self._log(f"CONFLICT: I'm leader but got HEARTBEAT from {peer_key}. Starting election.")
self.start_bully_election()
elif message == "HEARTBEAT_ACK":
# Leader received ack, update timestamp
# self._log(f"Received HEARTBEAT_ACK from {peer_key}")
self.peer_last_heartbeat[peer_key] = time.time()
elif message.startswith("VIEW_CHANGE_JSON:"):
try:
payload_str = message.split("VIEW_CHANGE_JSON:", 1)[1]
payload = json.loads(payload_str)
new_ordered_list = payload.get("ordered", [])
new_group_view = payload.get("group", {})
new_view_id = int(payload.get("view_id", 0))
# self._log(f"Parsed VIEW_CHANGE_JSON - Ordered: {new_ordered_list}, ViewID: {new_view_id}")
self.handle_view_change(new_ordered_list, new_group_view, new_view_id)
except Exception as e:
self._log(f"Failed to parse VIEW_CHANGE_JSON: {e}")
continue
elif message.startswith("VIEW_CHANGE:"):
self._log(f"Hurrah Received VIEW_CHANGE message: {message} my peer id is {self.peer_id}")
parts = message.split("VIEW_CHANGE:")[1].split("-", 2)
new_ordered_list_str = parts[0]
new_group_view_str = parts[1]
new_view_id_str = parts[2]
try:
new_ordered_list = ast.literal_eval(new_ordered_list_str)
new_group_view = ast.literal_eval(new_group_view_str)
except Exception as e:
self._log(f"Failed to parse VIEW_CHANGE safely: {e}")
continue
new_view_id = int(new_view_id_str)
self._log(f"Parsed VIEW_CHANGE - Ordered List: {new_ordered_list}, Group View: {new_group_view}, View ID: {new_view_id}")
self.handle_view_change(new_ordered_list, new_group_view, new_view_id)
elif message.startswith("ADDED TO NETWORK"):
self._log(f"Peer {self.peer_id} received acknowledgement of being added to the network.")
self.partOfNetwork = True
starting_seq = message.split("ADDED TO NETWORK:")[1]
self.expected_seq = int(starting_seq) # Jump to the current sequence!
self._log(f"Synchronized! Now expecting message #{self.expected_seq}")
# Ensure this peer is marked as a member, not leader
self.isGroupLeader = False
self.sequencer_peer_id = peer_key
self.leader_id = peer_key
if(self.multicast_thread_active==False):
self.create_multicast_threads()
# Start leader heartbeat monitoring as a member
leader_thread = getattr(self, "leader_check_thread", None)
if (not self.isGroupLeader and
(leader_thread is None or not leader_thread.is_alive())):
self.start_leader_check_thread()
elif message.startswith("SEQUENCER_REQUEST:"):
if self.sequencer_peer_id != self.peer_id:
self._log(f"Error: Peer {self.peer_id} received sequencer request but is not the sequencer.")
continue
else:
# Handle sequencer request
original_msg = message.split("SEQUENCER_REQUEST:")[1]
# self._log(f"Sequencer request received: {original_msg}")
with self.lock:
self.sequencer_sequence_number += 1
ordered_msg = (self.sequencer_sequence_number, original_msg)
self.multicast_message_to_group(ordered_msg, from_peer_id=peer_key)
elif message.startswith("ELECTION:"):
sender_id = message.split(":")[1]
self.handle_election_message(sender_id)
elif message.startswith("OK"):
# Received OK from higher node
with self.lock:
self.received_ok = True
elif message == "SEQ_SYNC_REQUEST":
# Reply with last delivered sequence number
last_seq = self._local_last_delivered_seq()
try:
self.send_message(peer_key, f"SEQ_SYNC_RESPONSE:{last_seq}")
except Exception as e:
self._log(f"Failed to send SEQ_SYNC_RESPONSE to {peer_key}: {e}")
elif message.startswith("SEQ_SYNC_RESPONSE:"):
if self.isGroupLeader and self.seq_sync_active:
try:
resp_seq = int(message.split("SEQ_SYNC_RESPONSE:", 1)[1])
self.seq_sync_responses[peer_key] = resp_seq
except Exception as e:
self._log(f"Failed to parse SEQ_SYNC_RESPONSE from {peer_key}: {e}")
elif message.startswith("SEQ_SYNC_FINAL:"):
try:
final_seq = int(message.split("SEQ_SYNC_FINAL:", 1)[1])
# Ensure we don't move backwards
self.expected_seq = max(self.expected_seq, final_seq + 1)
except Exception as e:
self._log(f"Failed to parse SEQ_SYNC_FINAL: {e}")
elif message.startswith("MISSING_SEQ_REQUEST:"):
if self.sequencer_peer_id == self.peer_id:
try:
req_seq = int(message.split("MISSING_SEQ_REQUEST:", 1)[1])
payload = self.sent_history.get(req_seq)
if payload is not None:
self.send_message(peer_key, f"MISSING_SEQ_RESPONSE:{req_seq}:{payload}")
else:
self.send_message(peer_key, f"MISSING_SEQ_NOT_FOUND:{req_seq}")
except Exception as e:
self._log(f"Failed to handle MISSING_SEQ_REQUEST from {peer_key}: {e}")
elif message.startswith("MISSING_SEQ_RESPONSE:"):
try:
rest = message.split("MISSING_SEQ_RESPONSE:", 1)[1]
seq_str, payload = rest.split(":", 1)
seq_id = int(seq_str)
if seq_id not in self.delivered_seq_set and seq_id not in self.seen_seq:
self.seen_seq.add(seq_id)
self.incoming_queue.put((seq_id, payload))
self.process_buffer()
except Exception as e:
self._log(f"Failed to parse MISSING_SEQ_RESPONSE: {e}")
elif message.startswith("MISSING_SEQ_NOT_FOUND:"):
# Sequencer doesn't have it (history evicted)
pass
elif message == "VIEW_SYNC_REQUEST":
# Leader replies with the latest view
if self.isGroupLeader:
payload = {
"ordered": self.orderedPeerList,
"group": self.groupView,
"view_id": self.view_id,
}
try:
self.send_message(peer_key, f"VIEW_CHANGE_JSON:{json.dumps(payload)}")
except Exception as e:
self._log(f"Failed to send VIEW_SYNC response to {peer_key}: {e}")
elif message.startswith("COORDINATOR:"):
new_leader = message.split(":")[1]
self._log(f"Received COORDINATOR announcement: {new_leader}")
self.leader_id = new_leader
self.sequencer_peer_id = new_leader
self.isGroupLeader = (new_leader == self.peer_id)
if self.isGroupLeader:
self._log(f"Peer {self.peer_id} became the Group Leader via election.")
self.start_heartbeat_thread()
else:
self._log(f"Peer {self.peer_id} acknowledges new leader {new_leader}.")
# Ensure leader heartbeat monitoring is active
leader_thread = getattr(self, "leader_check_thread", None)
if leader_thread is None or not leader_thread.is_alive():
self.start_leader_check_thread()
# Store any partial data back into buffer
self.conn_buffers[peer_key] = buffer
else:
self._log(f"No connection found for {peer_key} while receiving message")
def create_multicast_socket(self) -> None:
"""Create a UDP socket configured for multicast sending.
Sets `IP_MULTICAST_TTL` and stores the socket in `multicast_socket`.
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
# Bind to the specific interface for sending multicast to ensure it goes out the right way
try:
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, socket.inet_aton(self.address))
except Exception as e:
self._log(f"Warning: Could not set IP_MULTICAST_IF to {self.address}: {e}")
self.multicast_socket = sock
def multicast_message_to_group(self, ordered_msg: Tuple[int, Any], from_peer_id: str) -> None:
"""Multicast an ordered message with metadata to the group.
Args:
ordered_msg (tuple[int, Any]): `(sequence, payload)` tuple.
from_peer_id (str): ID of the sender peer.
"""
if not self.multicast_socket:
self.create_multicast_socket()
# Store for potential retransmission if we're the sequencer
try:
seq_id, payload = ordered_msg
self.sent_history[int(seq_id)] = payload
if len(self.sent_history) > self.history_max:
self.sent_history.popitem(last=False)
except Exception:
pass
#construct message with metadata
whole_message = {
"type": "MULTICAST_MESSAGE_ORDER",
"message": ordered_msg,
"from": from_peer_id
}
# self._log(f"Multicasting message to group chat: {whole_message}")
# multicast the message with metadata
#ip multicast syntax
try:
self.multicast_socket.sendto(json.dumps(whole_message).encode(), (MCAST_GRP, MCAST_PORT))
# self._log(f"Multicast message sent: {whole_message}")
except Exception as e:
self._log(f"Failed to send multicast message: {e}")
def handle_view_change(self, new_ordered_list: List[str], new_group_view: Dict[str, Any], new_view_id: int) -> None:
"""Apply a group view change if it is newer.
Args:
new_ordered_list (list[str]): Updated ordered peer IDs.
new_group_view (dict): Updated group view mapping.
new_view_id (int): Monotonic view identifier.
"""
#if self.view_id < new_view_id:
# self._log("Entering handle_view_change")
if self.view_id < new_view_id:
if new_view_id > self.view_id + 1:
self._log(f"View gap detected: current={self.view_id}, received={new_view_id}. Requesting sync.")
self.request_view_sync()
self._log(f"Received view change: {new_ordered_list}, {new_group_view}")
self.orderedPeerList = new_ordered_list
self.groupView = new_group_view
self._log(f"Updated ordered peer list after VIEW_CHANGE: {self.orderedPeerList} and group view: {self.groupView}")
self.view_id = new_view_id
self.view_sync_inflight = False
else:
# self._log(f"Ignoring older view change with view ID {new_view_id}, current view ID is {self.view_id}")
pass
def create_multicast_listener(self) -> socket.socket:
"""Create and configure a UDP multicast listener socket.
Binds to `MCAST_PORT` and joins the multicast group `MCAST_GRP`.
Returns:
socket.socket: A configured UDP socket for receiving multicast.
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if hasattr(socket, 'SO_REUSEPORT'):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
else:
self._log("Warning: SO_REUSEPORT not supported on this platform.")
sock.bind(('', MCAST_PORT))
# Use the explicit local IP to join the multicast group on the correct interface
mreq = struct.pack(
"4s4s",
socket.inet_aton(MCAST_GRP),
socket.inet_aton(self.address)
)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
return sock
def listen_for_multicast_messages(self) -> None:
"""Continuously receive and process multicast messages.
Parses JSON messages, pushes to the hold-back buffer, and triggers
`process_buffer()` to deliver them in order.
"""
sock = self.create_multicast_listener()
while True:
data, addr = sock.recvfrom(4096)
message = json.loads(data.decode())
# self._log(f"Received multicast message at {self.peer_id} from {addr}: {message}")
#process message here
#how to extract metadata from message?
message_content = message["message"]
from_peer = message["from"]
try:
seq_id, payload = message_content
seq_id = int(seq_id)
except Exception:
self._log(f"Malformed multicast message content: {message_content}")
continue
if seq_id in self.delivered_seq_set or seq_id in self.seen_seq:
continue
self.seen_seq.add(seq_id)
self.incoming_queue.put((seq_id, payload))
self.process_buffer()
# -------------------- Sensor Data (Mocked) --------------------
def start_sensor_thread(self) -> None:
"""Start the mocked sensor data generation thread.
Defers start until the peer is part of the network and the
`sequencer_peer_id` is known (set after election).
"""
# Only start once we are part of the network and sequencer is known
if not self.partOfNetwork or not self.sequencer_peer_id:
self._log(f"Sensor thread waiting: partOfNetwork={self.partOfNetwork}, sequencer_peer_id={self.sequencer_peer_id}")
# Retry with longer patience to allow election to complete
threading.Timer(1.0, self.start_sensor_thread).start()
return
if self.sensor_thread and self.sensor_thread.is_alive():
return
self.data_dir.mkdir(parents=True, exist_ok=True)
self.sensor_thread = threading.Thread(target=self._sensor_loop, daemon=True)
self.sensor_thread.start()
self._log(f"Sensor thread started for {self.peer_id} with interval {self.sensor_interval_seconds}s")
def _sensor_loop(self) -> None:
"""Generate and send sensor measurements at a fixed interval."""
# Small random initial delay to de-sync sends across peers
time.sleep(random.uniform(0, 2))
while True:
try:
measurement = self.generate_mock_measurement()
payload = self.encode_measurement(measurement)
self.send_message_to_sequencer(payload)
except Exception as e:
self._log(f"Sensor loop error: {e}")
time.sleep(self.sensor_interval_seconds)
def generate_mock_measurement(self) -> Dict[str, Any]:
"""Create a mock sensor measurement.
Returns:
dict: Measurement fields including `sensor_id`, `timestamp`,
`temperature`, `humidity`, and `pressure`.
"""
# Simple random walk around typical indoor values
temp = round(random.uniform(19.0, 24.0), 2) # Celsius
humidity = round(random.uniform(30.0, 60.0), 2) # %
pressure = round(random.uniform(990.0, 1030.0), 2) # hPa
# Use timezone-aware UTC ISO 8601 with 'Z' suffix
ts = datetime.now(timezone.utc).isoformat(timespec='seconds').replace('+00:00', 'Z')
return {
"sensor_id": self.peer_id,
"timestamp": ts,
"temperature": temp,
"humidity": humidity,
"pressure": pressure,
}
def encode_measurement(self, m: Dict[str, Any]) -> str:
"""Encode a measurement into the totally ordered payload format.
Args:
m (dict): Measurement dictionary.
Returns:
str: Encoded string `DATA|sensor_id|timestamp|temperature|humidity|pressure`.
"""
# Totally ordered payload format
# DATA|sensor_id|timestamp|temperature|humidity|pressure
return f"DATA|{m['sensor_id']}|{m['timestamp']}|{m['temperature']}|{m['humidity']}|{m['pressure']}"
def handle_ordered_payload(self, payload: str, seq_id: int) -> None:
"""Handle an ordered data payload delivered from the buffer.
Args:
payload (str): The delivered payload string.
seq_id (int): Assigned sequence number.
"""
# Handle only our data messages; ignore other demo strings
if isinstance(payload, str) and payload.startswith("DATA|"):
parts = payload.split("|")
if len(parts) != 6:
self._log(f"Malformed DATA payload: {payload}")
return
_, sensor_id, ts, temp, hum, pres = parts
self.write_measurement_csv(sensor_id, ts, temp, hum, pres, seq_id)
def write_measurement_csv(self, sensor_id: str, ts: str, temp: str, hum: str, pres: str, seq_id: int) -> None:
"""Append a measurement row to the per-sensor CSV file.
Args:
sensor_id (str): Sensor UUID.
ts (str): ISO 8601 UTC timestamp.
temp (str): Temperature in Celsius.
hum (str): Relative humidity percentage.
pres (str): Pressure in hPa.
seq_id (int): Ordered sequence number.
"""
# One CSV per sensor in this peer's directory (replicated across peers)
try:
self.data_dir.mkdir(parents=True, exist_ok=True)
# Append to a text file with ASCII box formatting
file_path = self.data_dir / f"{sensor_id}.txt"
file_exists = file_path.exists()
with open(file_path, mode='a', encoding='utf-8') as f:
# Define column widths
w_seq = 10
w_ts = 25
w_id = 38
w_temp = 10
w_hum = 10
w_pres = 12
# Separator line
sep = f"+{'-'*w_seq}+{'-'*w_ts}+{'-'*w_id}+{'-'*w_temp}+{'-'*w_hum}+{'-'*w_pres}+\n"
if not file_exists:
# Write header
f.write(sep)
header = f"|{'SEQUENCE'.center(w_seq)}|{'TIMESTAMP'.center(w_ts)}|{'SENSOR ID'.center(w_id)}|{'TEMP(C)'.center(w_temp)}|{'HUM(%)'.center(w_hum)}|{'PRES(hPa)'.center(w_pres)}|\n"
f.write(header)
f.write(sep)
# Write data row
row = f"|{str(seq_id).center(w_seq)}|{ts.center(w_ts)}|{sensor_id.center(w_id)}|{str(temp).center(w_temp)}|{str(hum).center(w_hum)}|{str(pres).center(w_pres)}|\n"
f.write(row)
f.write(sep)
except Exception as e:
self._log(f"Failed to write CSV for {sensor_id}: {e}")
def reset_data_dir(self) -> None:
"""Reset this peer's data directory for a clean run.
Removes existing directory and recreates it, preventing data from
previous runs from accumulating.
"""
# Remove and recreate this peer's data directory to avoid appending across runs
if self.data_dir.is_dir():
try:
shutil.rmtree(self.data_dir)
self._log(f"Cleared data directory: {self.data_dir}")
except Exception as e:
self._log(f"Failed to clear data directory {self.data_dir}: {e}")
try:
self.data_dir.mkdir(parents=True, exist_ok=True)
except Exception as e:
self._log(f"Failed to create data directory {self.data_dir}: {e}")
def create_multicast_threads(self):
"""Start multicast listener and processing threads.
Launches a background thread to receive and process multicast
messages. The listener socket is created inside the thread.
"""
multicast_thread = threading.Thread(target=self.listen_for_multicast_messages, daemon=True)
multicast_thread.start()
self.multicast_thread_active = True
def send_connection_request(self, peer_ip, peer_port, peer_id):
"""Establish a TCP connection and start a receiver thread.
Performs a simple handshake, stores the connection under `peer_id`,
and starts `receive_message()` in a new thread.
Args:
peer_ip (str): Target peer IP address.
peer_port (int): Target peer TCP port.
peer_id (str): Target peer UUID.
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.connect((peer_ip, peer_port)) # TCP handshake
# Optional handshake message
sock.send(b"HELLO from " + f"{self.address}:{self.port} and peer id:{self.peer_id}".encode())
# Store the connection
if peer_id not in self.connection_dict:
self.connection_dict[peer_id] = sock
self._log(f"Connected to id : {peer_id} and {peer_ip}:{peer_port}")
# Start receive thread for this connection
threading.Thread(target=self.receive_message, args=(peer_id,), daemon=True).start()
except socket.error as e:
self._log("Connection failed:", e)
raise
#SEND TO NEW NODE BROADCAST REQUESTS FOR NEW PEERS: CREATE NEW UDP SOCKET
#IN ORDER TO GAIN ACCESS TO THE NETWORK
def broadcast_new_peer_request(self, broadcast_ip, broadcast_port, message="NEW_PEER_REQUEST"):
"""Send a UDP broadcast request to join the network.
Args:
broadcast_ip (str): Broadcast target IP (e.g., 127.0.0.1).
broadcast_port (int): Broadcast port (e.g., 9999).
message (str, optional): Request type. Defaults to "NEW_PEER_REQUEST".
"""
udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
message = f"{message}:{self.peer_id}:{self.port}"
try:
udp_sock.sendto(message.encode(), (broadcast_ip, broadcast_port))
# self._log(f"Broadcasted new peer request to {broadcast_ip}:{broadcast_port}")
except Exception as e:
self._log(f"Failed to broadcast to {broadcast_ip}:{broadcast_port}: {e}")
udp_sock.close()
def listen_for_broadcasts(self, buffer_size=1024):
"""Listen for UDP broadcast messages and process join requests.
Adds new peers to `groupView` when acting as leader and multicasts
updated view information.
Args:
buffer_size (int, optional): Max UDP packet size. Defaults to 1024.
"""
udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
if hasattr(socket, 'SO_REUSEADDR'):
udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
else:
self._log("Warning: SO_REUSEADDR not supported on this platform.")
try: