12
12
from collections import deque
13
13
from typing import Any , Optional , Sequence
14
14
15
- from spacepackets .ccsds .spacepacket import parse_space_packets , PacketId
15
+ from spacepackets .ccsds .spacepacket import (
16
+ PacketId ,
17
+ parse_space_packets_from_deque ,
18
+ )
16
19
17
20
from tmtccmd .com import ComInterface , SendError
18
21
from tmtccmd .com .tcpip_utils import EthAddr
@@ -115,7 +118,7 @@ def __connect_socket(self):
115
118
finally :
116
119
self .__tcp_socket .settimeout (None )
117
120
118
- def close (self , args : any = None ) -> None :
121
+ def close (self , args : Any = None ) -> None :
119
122
if not self .is_open ():
120
123
return
121
124
self .__thread_kill_signal .set ()
@@ -128,7 +131,7 @@ def close(self, args: any = None) -> None:
128
131
def send (self , data : bytes | bytearray ):
129
132
self .__tc_queue .put (data )
130
133
131
- def receive (self , poll_timeout : float = 0 ) -> list [bytes ]:
134
+ def receive (self , parameters : float = 0 ) -> list [bytes ]:
132
135
self .__tm_queue_to_packet_list ()
133
136
tm_packet_list = self .tm_packet_list
134
137
self .tm_packet_list = []
@@ -139,13 +142,23 @@ def __tm_queue_to_packet_list(self):
139
142
self .__analysis_queue .append (self .__tm_queue .get ())
140
143
# TCP is stream based, so there might be broken packets or multiple packets in one recv
141
144
# call. We parse the space packets contained in the stream here
142
- if self .com_type == TcpCommunicationType .SPACE_PACKETS :
143
- self .tm_packet_list .extend (
144
- parse_space_packets (
145
- analysis_queue = self .__analysis_queue ,
146
- packet_ids = self .space_packet_ids ,
147
- )
145
+ if self .com_type == TcpCommunicationType .SPACE_PACKETS and self .__analysis_queue :
146
+ result = parse_space_packets_from_deque (
147
+ analysis_queue = self .__analysis_queue ,
148
+ packet_ids = self .space_packet_ids ,
148
149
)
150
+ flattened = bytearray ()
151
+ for packet in result .tm_list :
152
+ self .tm_packet_list .append (packet )
153
+ while self .__analysis_queue :
154
+ flattened .extend (self .__analysis_queue .popleft ())
155
+ # Might be spammy, but I consider this a configuration error, and the user
156
+ # should be notified about it.
157
+ for skipped_range in result .skipped_ranges :
158
+ _LOGGER .warning ("skipped bytes in received TCP datastream:" )
159
+ print (flattened [skipped_range .start : skipped_range .stop ])
160
+ _LOGGER .warning ("list of valid packet IDs might be incomplete" )
161
+ self .__analysis_queue .append (flattened [result .scanned_bytes :])
149
162
else :
150
163
while self .__analysis_queue :
151
164
self .tm_packet_list .append (self .__analysis_queue .popleft ())
0 commit comments