Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 14 additions & 23 deletions src/packet/forward_tsn_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,13 @@ use crate::packet::write_u16_be;
use crate::packet::write_u32_be;
use crate::packet::ChunkParseError;
use crate::packet::SerializableTlv;
use crate::types::Mid;
use crate::packet::SkippedStream;
use crate::types::Ssn;
use crate::types::StreamKey;
use crate::types::Tsn;
use anyhow::ensure;
use anyhow::Error;
use std::fmt;

#[derive(Debug, PartialEq)]
pub enum SkippedStream {
ForwardTsn(StreamId, Ssn),
IForwardTsn(StreamKey, Mid),
}

pub(crate) const CHUNK_TYPE: u8 = 192;

/// Forward TSN chunk
Expand Down Expand Up @@ -69,16 +62,15 @@ impl TryFrom<RawChunk<'_>> for ForwardTsnChunk {
ensure!(raw.value.len() >= 4 && (raw.value.len() % 4) == 0, ChunkParseError::InvalidLength);

let new_cumulative_tsn = Tsn(read_u32_be!(&raw.value[0..4]));
let num_skipped = (raw.value.len() - 4) / 4;

let mut skipped_streams = Vec::<SkippedStream>::with_capacity(num_skipped);
let mut offset = 4;
for _ in 0..num_skipped {
let stream_id = StreamId(read_u16_be!(&raw.value[offset..offset + 2]));
let ssn = Ssn(read_u16_be!(&raw.value[offset + 2..offset + 4]));
skipped_streams.push(SkippedStream::ForwardTsn(stream_id, ssn));
offset += 4;
}

let skipped_streams = raw.value[4..]
.chunks_exact(4)
.map(|c| {
let stream_id = StreamId(read_u16_be!(&c[0..2]));
let ssn = Ssn(read_u16_be!(&c[2..4]));
SkippedStream::ForwardTsn(stream_id, ssn)
})
.collect();

Ok(Self { new_cumulative_tsn, skipped_streams })
}
Expand All @@ -89,16 +81,15 @@ impl SerializableTlv for ForwardTsnChunk {
let value = write_chunk_header(CHUNK_TYPE, 0, self.value_size(), output);
write_u32_be!(&mut value[0..4], self.new_cumulative_tsn.0);

let mut offset = 4;
for skipped in &self.skipped_streams {
let mut chunks = value[4..].chunks_exact_mut(4);
for (skipped, chunk) in self.skipped_streams.iter().zip(&mut chunks) {
match skipped {
SkippedStream::ForwardTsn(stream_id, ssn) => {
write_u16_be!(&mut value[offset..offset + 2], stream_id.0);
write_u16_be!(&mut value[offset + 2..offset + 4], ssn.0);
write_u16_be!(&mut chunk[0..2], stream_id.0);
write_u16_be!(&mut chunk[2..4], ssn.0);
}
_ => panic!("Unsupported skipped stream"),
}
offset += 4;
}
}

Expand Down
37 changes: 17 additions & 20 deletions src/packet/iforward_tsn_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use super::forward_tsn_chunk::SkippedStream;
use crate::api::StreamId;
use crate::packet::chunk::write_chunk_header;
use crate::packet::chunk::RawChunk;
Expand All @@ -22,6 +21,7 @@ use crate::packet::write_u16_be;
use crate::packet::write_u32_be;
use crate::packet::ChunkParseError;
use crate::packet::SerializableTlv;
use crate::packet::SkippedStream;
use crate::types::Mid;
use crate::types::StreamKey;
use crate::types::Tsn;
Expand Down Expand Up @@ -70,18 +70,16 @@ impl TryFrom<RawChunk<'_>> for IForwardTsnChunk {
);

let new_cumulative_tsn = Tsn(read_u32_be!(&raw.value[0..4]));
let num_skipped = (raw.value.len() - 4) / 8;

let mut skipped_streams = Vec::<SkippedStream>::with_capacity(num_skipped);
let mut offset = 4;
for _ in 0..num_skipped {
let stream_id = StreamId(read_u16_be!(&raw.value[offset..offset + 2]));
let is_unordered = (raw.value[offset + 3] & 1) != 0;
let mid = Mid(read_u32_be!(&raw.value[offset + 4..offset + 8]));
skipped_streams
.push(SkippedStream::IForwardTsn(StreamKey::from(is_unordered, stream_id), mid));
offset += 8;
}

let skipped_streams = raw.value[4..]
.chunks_exact(8)
.map(|c| {
let stream_id = StreamId(read_u16_be!(&c[0..2]));
let is_unordered = (c[3] & 1) != 0;
let mid = Mid(read_u32_be!(&c[4..8]));
SkippedStream::IForwardTsn(StreamKey::from(is_unordered, stream_id), mid)
})
.collect();

Ok(Self { new_cumulative_tsn, skipped_streams })
}
Expand All @@ -92,18 +90,17 @@ impl SerializableTlv for IForwardTsnChunk {
let value = write_chunk_header(CHUNK_TYPE, 0, self.value_size(), output);
write_u32_be!(&mut value[0..4], self.new_cumulative_tsn.0);

let mut offset = 4;
for skipped in &self.skipped_streams {
let mut chunks = value[4..].chunks_exact_mut(8);
for (skipped, chunk) in self.skipped_streams.iter().zip(&mut chunks) {
match skipped {
SkippedStream::IForwardTsn(stream_key, mid) => {
write_u16_be!(&mut value[offset..offset + 2], stream_key.id().0);
value[offset + 2] = 0;
value[offset + 3] = if stream_key.is_unordered() { 1 } else { 0 };
write_u32_be!(&mut value[offset + 4..offset + 8], mid.0);
write_u16_be!(&mut chunk[0..2], stream_key.id().0);
chunk[2] = 0;
chunk[3] = if stream_key.is_unordered() { 1 } else { 0 };
write_u32_be!(&mut chunk[4..8], mid.0);
}
_ => panic!("Unsupported skipped stream"),
}
offset += 8;
}
}

Expand Down
9 changes: 4 additions & 5 deletions src/packet/incoming_ssn_reset_request_parameter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,7 @@ impl TryFrom<RawParameter<'_>> for IncomingSsnResetRequestParameter {
ensure!(raw.value.len() >= 4 && (raw.value.len() % 2) == 0, ChunkParseError::InvalidLength);

let request_seq_nbr = read_u32_be!(&raw.value[0..4]);
let streams = (0..(raw.value.len() - 4) / 2)
.map(|i| StreamId(read_u16_be!(&raw.value[4 + i * 2..4 + i * 2 + 2])))
.collect();
let streams = raw.value[4..].chunks_exact(2).map(|c| StreamId(read_u16_be!(c))).collect();

Ok(Self { request_seq_nbr, streams })
}
Expand All @@ -70,8 +68,9 @@ impl SerializableTlv for IncomingSsnResetRequestParameter {
fn serialize_to(&self, output: &mut [u8]) {
let value = write_parameter_header(PARAMETER_TYPE, self.value_size(), output);
write_u32_be!(&mut value[0..4], self.request_seq_nbr);
for (idx, stream_id) in self.streams.iter().enumerate() {
write_u16_be!(&mut value[4 + idx * 2..4 + idx * 2 + 2], stream_id.0);
let mut chunks = value[4..].chunks_exact_mut(2);
for (stream_id, chunk) in self.streams.iter().zip(&mut chunks) {
write_u16_be!(chunk, stream_id.0);
}
}

Expand Down
10 changes: 10 additions & 0 deletions src/packet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,23 @@ macro_rules! write_u64_be {
};
}

use crate::api::StreamId;
use crate::types::Mid;
use crate::types::Ssn;
use crate::types::StreamKey;
pub(crate) use read_u16_be;
pub(crate) use read_u32_be;
pub(crate) use read_u64_be;
pub(crate) use write_u16_be;
pub(crate) use write_u32_be;
pub(crate) use write_u64_be;

#[derive(Debug, PartialEq)]
pub(crate) enum SkippedStream {
ForwardTsn(StreamId, Ssn),
IForwardTsn(StreamKey, Mid),
}

/// Trait for serialization/deserialization methods on TLV data types (chunks, parameters, error
/// causes) that have the same framing, but handle metadata (type, flags etc) differently.
pub(crate) trait SerializableTlv {
Expand Down
9 changes: 4 additions & 5 deletions src/packet/outgoing_ssn_reset_request_parameter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,7 @@ impl TryFrom<RawParameter<'_>> for OutgoingSsnResetRequestParameter {
ChunkParseError::InvalidLength
);

let streams = (0..(raw.value.len() - 12) / 2)
.map(|i| StreamId(read_u16_be!(&raw.value[12 + i * 2..12 + i * 2 + 2])))
.collect();
let streams = raw.value[12..].chunks_exact(2).map(|c| StreamId(read_u16_be!(c))).collect();

Ok(Self {
request_seq_nbr: read_u32_be!(&raw.value[0..4]),
Expand All @@ -86,8 +84,9 @@ impl SerializableTlv for OutgoingSsnResetRequestParameter {
write_u32_be!(&mut value[0..4], self.request_seq_nbr);
write_u32_be!(&mut value[4..8], self.response_seq_nbr);
write_u32_be!(&mut value[8..12], self.sender_last_assigned_tsn.0);
for (idx, stream_id) in self.streams.iter().enumerate() {
write_u16_be!(&mut value[12 + idx * 2..12 + idx * 2 + 2], stream_id.0);
let mut chunks = value[12..].chunks_exact_mut(2);
for (stream_id, chunk) in self.streams.iter().zip(&mut chunks) {
write_u16_be!(chunk, stream_id.0);
}
}

Expand Down
46 changes: 24 additions & 22 deletions src/packet/sack_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,20 +95,21 @@ impl TryFrom<RawChunk<'_>> for SackChunk {
ChunkParseError::InvalidLength
);

let mut gap_ack_blocks = Vec::<GapAckBlock>::with_capacity(nbr_of_gap_blocks);
let mut offset = 12;
for _ in 0..nbr_of_gap_blocks {
let start = read_u16_be!(&raw.value[offset..offset + 2]);
let end = read_u16_be!(&raw.value[offset + 2..offset + 4]);
gap_ack_blocks.push(GapAckBlock { start, end });
offset += 4;
}

let mut duplicate_tsns = Vec::<Tsn>::with_capacity(nbr_of_dup_tsns);
for _ in 0..nbr_of_dup_tsns {
duplicate_tsns.push(Tsn(read_u32_be!(&raw.value[offset..offset + 4])));
offset += 4;
}
let gap_blocks_end = 12 + nbr_of_gap_blocks * 4;
let gap_ack_blocks_data = &raw.value[12..gap_blocks_end];
let duplicate_tsns_data = &raw.value[gap_blocks_end..];

let gap_ack_blocks = gap_ack_blocks_data
.chunks_exact(4)
.map(|c| {
let start = read_u16_be!(&c[0..2]);
let end = read_u16_be!(&c[2..4]);
GapAckBlock { start, end }
})
.collect();

let duplicate_tsns =
duplicate_tsns_data.chunks_exact(4).map(|c| Tsn(read_u32_be!(c))).collect();

Ok(Self { cumulative_tsn_ack, a_rwnd, gap_ack_blocks, duplicate_tsns })
}
Expand All @@ -122,16 +123,17 @@ impl SerializableTlv for SackChunk {
write_u16_be!(&mut value[8..10], self.gap_ack_blocks.len() as u16);
write_u16_be!(&mut value[10..12], self.duplicate_tsns.len() as u16);

let mut offset = 12;
for block in &self.gap_ack_blocks {
write_u16_be!(&mut value[offset..offset + 2], block.start);
write_u16_be!(&mut value[offset + 2..offset + 4], block.end);
offset += 4;
let gap_blocks_end = 12 + self.gap_ack_blocks.len() * 4;

let mut chunks = value[12..gap_blocks_end].chunks_exact_mut(4);
for (block, chunk) in self.gap_ack_blocks.iter().zip(&mut chunks) {
write_u16_be!(&mut chunk[0..2], block.start);
write_u16_be!(&mut chunk[2..4], block.end);
}

for dup_tsn in &self.duplicate_tsns {
write_u32_be!(&mut value[offset..offset + 4], dup_tsn.0);
offset += 4;
let mut chunks = value[gap_blocks_end..].chunks_exact_mut(4);
for (dup_tsn, chunk) in self.duplicate_tsns.iter().zip(&mut chunks) {
write_u32_be!(chunk, dup_tsn.0);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/rx/interleaved_reassembly_streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::api::handover::SocketHandoverState;
use crate::api::Message;
use crate::api::StreamId;
use crate::packet::data::Data;
use crate::packet::forward_tsn_chunk::SkippedStream;
use crate::packet::SkippedStream;
use crate::rx::reassembly_streams::ReassemblyStreams;
use crate::types::Fsn;
use crate::types::Mid;
Expand Down
2 changes: 1 addition & 1 deletion src/rx/reassembly_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::api::handover::SocketHandoverState;
use crate::api::Message;
use crate::api::StreamId;
use crate::packet::data::Data;
use crate::packet::forward_tsn_chunk::SkippedStream;
use crate::packet::SkippedStream;
use crate::rx::interleaved_reassembly_streams::InterleavedReassemblyStreams;
use crate::rx::reassembly_streams::ReassemblyStreams;
use crate::rx::traditional_reassembly_streams::TraditionalReassemblyStreams;
Expand Down
2 changes: 1 addition & 1 deletion src/rx/reassembly_streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::api::handover::SocketHandoverState;
use crate::api::Message;
use crate::api::StreamId;
use crate::packet::data::Data;
use crate::packet::forward_tsn_chunk::SkippedStream;
use crate::packet::SkippedStream;
use crate::types::Tsn;

/// Implementations of this interface will be called when data is received, when data should be
Expand Down
2 changes: 1 addition & 1 deletion src/rx/traditional_reassembly_streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::api::handover::SocketHandoverState;
use crate::api::Message;
use crate::api::StreamId;
use crate::packet::data::Data;
use crate::packet::forward_tsn_chunk::SkippedStream;
use crate::packet::SkippedStream;
use crate::rx::reassembly_streams::ReassemblyStreams;
use crate::types::Ssn;
use crate::types::StreamKey;
Expand Down
2 changes: 1 addition & 1 deletion src/socket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ use crate::packet::error_causes::ErrorCause;
use crate::packet::error_chunk::ErrorChunk;
use crate::packet::forward_tsn_chunk;
use crate::packet::forward_tsn_chunk::ForwardTsnChunk;
use crate::packet::forward_tsn_chunk::SkippedStream;
use crate::packet::forward_tsn_supported_parameter::ForwardTsnSupportedParameter;
use crate::packet::heartbeat_ack_chunk::HeartbeatAckChunk;
use crate::packet::heartbeat_info_parameter::HeartbeatInfoParameter;
Expand Down Expand Up @@ -81,6 +80,7 @@ use crate::packet::user_initiated_abort_error_cause::UserInitiatedAbortErrorCaus
use crate::packet::write_u32_be;
use crate::packet::zero_checksum_acceptable_parameter::ZeroChecksumAcceptableParameter;
use crate::packet::SerializableTlv;
use crate::packet::SkippedStream;
use crate::socket::capabilities::Capabilities;
use crate::socket::state_cookie::StateCookie;
use crate::socket::transmission_control_block::CurrentResetRequest;
Expand Down
2 changes: 1 addition & 1 deletion src/tx/outstanding_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ use crate::api::StreamId;
use crate::math::round_up_to_4;
use crate::packet::data::Data;
use crate::packet::forward_tsn_chunk::ForwardTsnChunk;
use crate::packet::forward_tsn_chunk::SkippedStream;
use crate::packet::iforward_tsn_chunk::IForwardTsnChunk;
use crate::packet::sack_chunk::GapAckBlock;
use crate::packet::SkippedStream;
use crate::types::Mid;
use crate::types::OutgoingMessageId;
use crate::types::Ssn;
Expand Down
2 changes: 1 addition & 1 deletion src/tx/retransmission_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -735,8 +735,8 @@ mod tests {
use crate::api::PpId;
use crate::api::SendOptions;
use crate::events::Events;
use crate::packet::forward_tsn_chunk::SkippedStream;
use crate::packet::sack_chunk::GapAckBlock;
use crate::packet::SkippedStream;
use crate::tx::send_queue::SendQueue;
use crate::types::Mid;
use crate::types::Ssn;
Expand Down