From 33e73ef35515eb52bcb174cb188f49cc5806652b Mon Sep 17 00:00:00 2001 From: Greg Lucas Date: Tue, 6 Jan 2026 20:56:58 -0700 Subject: [PATCH 1/4] Handling codice segmented packets --- imap_processing/codice/codice_l1a_de.py | 710 +++++++++--------- .../codice_packet_definition.xml | 467 ++++-------- 2 files changed, 521 insertions(+), 656 deletions(-) diff --git a/imap_processing/codice/codice_l1a_de.py b/imap_processing/codice/codice_l1a_de.py index 5991fa3749..7f1afca911 100644 --- a/imap_processing/codice/codice_l1a_de.py +++ b/imap_processing/codice/codice_l1a_de.py @@ -9,112 +9,107 @@ from imap_processing.codice.utils import ( CODICEAPID, CoDICECompression, - SegmentedPacketOrder, ViewTabInfo, apply_replacements_to_attrs, get_codice_epoch_time, ) from imap_processing.spice.time import met_to_ttj2000ns +from imap_processing.utils import combine_segmented_packets -def get_de_metadata(packets: xr.Dataset, packet_index: int) -> bytes: - """ - Gather and return packet metadata (From packet_version through byte_count). - - Extract the metadata in the packet_indexed direct event packet, which is then - used to construct the full data of the group of packet_indexs. - - Parameters - ---------- - packets : xarray.Dataset - The packet_indexed direct event packet data. - packet_index : int - The index of the packet_index of interest. - - Returns - ------- - metadata : bytes - The compressed metadata for the packet_indexed packet. - """ - # String together the metadata fields and convert the data to a bytes obj - metadata_str = "" - for field, num_bits in constants.DE_METADATA_FIELDS.items(): - metadata_str += f"{packets[field].data[packet_index]:0{num_bits}b}" - metadata_chunks = [metadata_str[i : i + 8] for i in range(0, len(metadata_str), 8)] - metadata_ints = [int(item, 2) for item in metadata_chunks] - metadata = bytes(metadata_ints) - - return metadata - - -def group_data(packets: xr.Dataset) -> list[bytes]: +def extract_initial_items_from_combined_packets( + packets: xr.Dataset, +) -> xr.Dataset: """ - Organize continuation packets into appropriate groups. - - Some packets are continuation packets, as in, they are packets that are - part of a group of packets. These packets are marked by the `seq_flgs` field - in the CCSDS header of the packet. For CoDICE, the values are defined as - follows: + Extract metadata fields from the beginning of combined event_data packets. - 3 = Packet is not part of a group - 1 = Packet is the first packet of the group - 0 = Packet is in the middle of the group - 2 = Packet is the last packet of the group - - For packets that are part of a group, the byte count associated with the - first packet of the group signifies the byte count for the entire group. + Extracts bit fields from the first 20 bytes of each event_data array + and adds them as new variables to the dataset. Parameters ---------- - packets : xarray.Dataset - Dataset containing the packets to group. + packets : xr.Dataset + Dataset containing combined packets with event_data. Returns ------- - grouped_data : list[bytes] - The packet data, converted to bytes and grouped appropriately. + xr.Dataset + Dataset with extracted metadata fields added. """ - grouped_data = [] # Holds the properly grouped data to be decompressed - current_group = bytearray() # Temporary storage for current group - group_byte_count = None # Temporary storage for current group byte count - - for packet_index in range(len(packets.event_data.data)): - packet_data = packets.event_data.data[packet_index] - group_code = packets.seq_flgs.data[packet_index] - byte_count = packets.byte_count.data[packet_index] - - # If the group code is 3, this means the data is unsegmented - # and can be decompressed as-is - if group_code == SegmentedPacketOrder.UNSEGMENTED: - grouped_data.append(packet_data[:byte_count]) - - # If the group code is 1, this means the data is the first data in a - # group. Also, set the byte count for the group - elif group_code == SegmentedPacketOrder.FIRST_SEGMENT: - group_byte_count = byte_count - current_group += packet_data - - # If the group code is 0, this means the data is part of the middle of - # the group. - elif group_code == SegmentedPacketOrder.CONTINUATION_SEGMENT: - current_group += get_de_metadata(packets, packet_index) - current_group += packet_data - - # If the group code is 2, this means the data is the last data in the - # group - elif group_code == SegmentedPacketOrder.LAST_SEGMENT: - current_group += get_de_metadata(packets, packet_index) - current_group += packet_data - - # The grouped data is now ready to be decompressed - values_to_decompress = current_group[:group_byte_count] - grouped_data.append(values_to_decompress) + # Initialize arrays for extracted fields + n_packets = len(packets.epoch) + + # Preallocate arrays + packet_version = np.zeros(n_packets, dtype=np.uint16) + spin_period = np.zeros(n_packets, dtype=np.uint16) + acq_start_seconds = np.zeros(n_packets, dtype=np.uint32) + acq_start_subseconds = np.zeros(n_packets, dtype=np.uint32) + spare_1 = np.zeros(n_packets, dtype=np.uint8) + st_bias_gain_mode = np.zeros(n_packets, dtype=np.uint8) + sw_bias_gain_mode = np.zeros(n_packets, dtype=np.uint8) + priority = np.zeros(n_packets, dtype=np.uint8) + suspect = np.zeros(n_packets, dtype=np.uint8) + compressed = np.zeros(n_packets, dtype=np.uint8) + num_events = np.zeros(n_packets, dtype=np.uint32) + byte_count = np.zeros(n_packets, dtype=np.uint32) + + # Extract fields from each packet + for pkt_idx in range(n_packets): + event_data = packets.event_data.data[pkt_idx] + + # Byte-aligned fields using int.from_bytes + packet_version[pkt_idx] = int.from_bytes(event_data[0:2], byteorder="big") + spin_period[pkt_idx] = int.from_bytes(event_data[2:4], byteorder="big") + acq_start_seconds[pkt_idx] = int.from_bytes(event_data[4:8], byteorder="big") + + # Non-byte-aligned fields (bytes 8-12 contain mixed bit fields) + # Extract 4 bytes and unpack bit fields + mixed_bytes = int.from_bytes(event_data[8:12], byteorder="big") + + # acq_start_subseconds: 20 bits (MSB) + acq_start_subseconds[pkt_idx] = (mixed_bytes >> 12) & 0xFFFFF + # spare_1: 2 bits + spare_1[pkt_idx] = (mixed_bytes >> 10) & 0x3 + # st_bias_gain_mode: 2 bits + st_bias_gain_mode[pkt_idx] = (mixed_bytes >> 8) & 0x3 + # sw_bias_gain_mode: 2 bits + sw_bias_gain_mode[pkt_idx] = (mixed_bytes >> 6) & 0x3 + # priority: 4 bits + priority[pkt_idx] = (mixed_bytes >> 2) & 0xF + # suspect: 1 bit + suspect[pkt_idx] = (mixed_bytes >> 1) & 0x1 + # compressed: 1 bit (LSB) + compressed[pkt_idx] = mixed_bytes & 0x1 + + # Remaining byte-aligned fields + num_events[pkt_idx] = int.from_bytes(event_data[12:16], byteorder="big") + byte_count[pkt_idx] = int.from_bytes(event_data[16:20], byteorder="big") + + # Remove the first 20 bytes from event_data (header fields from above) + # Then trim to the number of bytes indicated by byte_count + packets.event_data.data[pkt_idx] = event_data[20 : 20 + byte_count[pkt_idx]] + + if compressed[pkt_idx]: + packets.event_data.data[pkt_idx] = decompress( + packets.event_data.data[pkt_idx], + CoDICECompression.LOSSLESS, + ) - # Reset the current group - current_group = bytearray() - group_byte_count = None + # Add extracted fields to dataset + packets["packet_version"] = xr.DataArray(packet_version, dims=["epoch"]) + packets["spin_period"] = xr.DataArray(spin_period, dims=["epoch"]) + packets["acq_start_seconds"] = xr.DataArray(acq_start_seconds, dims=["epoch"]) + packets["acq_start_subseconds"] = xr.DataArray(acq_start_subseconds, dims=["epoch"]) + packets["spare_1"] = xr.DataArray(spare_1, dims=["epoch"]) + packets["st_bias_gain_mode"] = xr.DataArray(st_bias_gain_mode, dims=["epoch"]) + packets["sw_bias_gain_mode"] = xr.DataArray(sw_bias_gain_mode, dims=["epoch"]) + packets["priority"] = xr.DataArray(priority, dims=["epoch"]) + packets["suspect"] = xr.DataArray(suspect, dims=["epoch"]) + packets["compressed"] = xr.DataArray(compressed, dims=["epoch"]) + packets["num_events"] = xr.DataArray(num_events, dims=["epoch"]) + packets["byte_count"] = xr.DataArray(byte_count, dims=["epoch"]) - return grouped_data + return packets def unpack_bits(bit_structure: dict, de_data: np.ndarray) -> dict: @@ -160,318 +155,323 @@ def unpack_bits(bit_structure: dict, de_data: np.ndarray) -> dict: return unpacked -def process_de_data( +def _create_dataset_coords( packets: xr.Dataset, - decompressed_data: list[list[int]], apid: int, + num_priorities: int, cdf_attrs: ImapCdfAttributes, ) -> xr.Dataset: """ - Reshape the decompressed direct event data into CDF-ready arrays. - - Unpacking DE needs below for-loops because of many reasons, including: - - Need of preserve fillval per field of various bit lengths - - inability to use nan for 64-bits unpacking - - num_events being variable length per epoch - - binning priorities into its bins - - unpacking 64-bits into fields and indexing correctly + Create the output dataset with coordinates. Parameters ---------- - packets : xarray.Dataset - Dataset containing the packets, needed to determine priority order - and data quality. - decompressed_data : list[list[int]] - The decompressed data to reshape, in the format [[]]. + packets : xr.Dataset + Combined packets with extracted header fields. apid : int - The sensor type, used primarily to determine if the data are from - CoDICE-Lo or CoDICE-Hi. + APID for sensor type. + num_priorities : int + Number of priorities for this APID. cdf_attrs : ImapCdfAttributes - The CDF attributes to be added to the dataset. + CDF attributes manager. Returns ------- - data : xarray.Dataset - Processed Direct Event data. + xr.Dataset + Dataset with coordinates defined. """ - # xr.Dataset to hold all the (soon to be restructured) direct event data - de_data = xr.Dataset() - - # Extract some useful variables - num_priorities = constants.DE_DATA_PRODUCT_CONFIGURATIONS[apid]["num_priorities"] - bit_structure = constants.DE_DATA_PRODUCT_CONFIGURATIONS[apid]["bit_structure"] - - # Determine the number of epochs to help with data array initialization - # There is one epoch per set of priorities - num_epochs = len(decompressed_data) // num_priorities - - # Initialize data arrays for unpacked 64-bits fields - for field in bit_structure: - if field not in ["Priority", "Spare"]: - # Update attrs based on fillval per field - fillval = bit_structure[field]["fillval"] - dtype = bit_structure[field]["dtype"] - attrs = cdf_attrs.get_variable_attributes("de_3d_attrs") - attrs = apply_replacements_to_attrs( - attrs, {"num_digits": len(str(fillval)), "valid_max": fillval} - ) - de_data[field] = xr.DataArray( - np.full( - (num_epochs, num_priorities, 10000), - fillval, - dtype=dtype, - ), - name=field, - dims=["epoch", "priority", "event_num"], - attrs=attrs, - ) + # Get timing info from the first packet of each epoch + epoch_slice = slice(None, None, num_priorities) - # Get num_events, data quality, and priorities data for beginning of packet_indexs - packet_index_starts = np.where( - (packets.seq_flgs.data == SegmentedPacketOrder.UNSEGMENTED) - | (packets.seq_flgs.data == SegmentedPacketOrder.FIRST_SEGMENT) - )[0] - num_events_arr = packets.num_events.data[packet_index_starts] - data_quality_arr = packets.suspect.data[packet_index_starts] - priorities_arr = packets.priority.data[packet_index_starts] - - # Initialize other fields of l1a that we want to - # carry in L1A CDF file - de_data["num_events"] = xr.DataArray( - np.full((num_epochs, num_priorities), 65535, dtype=np.uint16), - name="num_events", - dims=["epoch", "priority"], - attrs=cdf_attrs.get_variable_attributes("de_2d_attrs"), + view_tab_info = ViewTabInfo( + apid=apid, + sensor=1 if apid == CODICEAPID.COD_HI_PHA else 0, + collapse_table=0, + three_d_collapsed=0, + view_id=0, + ) + epochs, epochs_delta = get_codice_epoch_time( + packets["acq_start_seconds"].isel(epoch=epoch_slice), + packets["acq_start_subseconds"].isel(epoch=epoch_slice), + packets["spin_period"].isel(epoch=epoch_slice), + view_tab_info, ) - de_data["data_quality"] = xr.DataArray( - np.full((num_epochs, num_priorities), 65535, dtype=np.uint16), - name="data_quality", - dims=["epoch", "priority"], - attrs=cdf_attrs.get_variable_attributes("de_2d_attrs"), + # Convert to numpy arrays + epochs_data = np.asarray(epochs) + epochs_delta_data = np.asarray(epochs_delta) + epoch_values = met_to_ttj2000ns(epochs_data) + + dataset = xr.Dataset( + coords={ + "epoch": ( + "epoch", + epoch_values, + cdf_attrs.get_variable_attributes("epoch", check_schema=False), + ), + "epoch_delta_minus": ( + "epoch", + epochs_delta_data, + cdf_attrs.get_variable_attributes( + "epoch_delta_minus", check_schema=False + ), + ), + "epoch_delta_plus": ( + "epoch", + epochs_delta_data, + cdf_attrs.get_variable_attributes( + "epoch_delta_plus", check_schema=False + ), + ), + "event_num": ( + "event_num", + np.arange(constants.MAX_DE_EVENTS_PER_PACKET), + cdf_attrs.get_variable_attributes("event_num", check_schema=False), + ), + "event_num_label": ( + "event_num", + np.arange(constants.MAX_DE_EVENTS_PER_PACKET).astype(str), + cdf_attrs.get_variable_attributes( + "event_num_label", check_schema=False + ), + ), + "priority": ( + "priority", + np.arange(num_priorities), + cdf_attrs.get_variable_attributes("priority", check_schema=False), + ), + "priority_label": ( + "priority", + np.arange(num_priorities).astype(str), + cdf_attrs.get_variable_attributes("priority_label", check_schema=False), + ), + } ) - # As mentioned above, epoch data is of this shape: - # (epoch, (num_events * )). - # num_events is a variable number per priority. - for epoch_index in range(num_epochs): - # current epoch's grouped data are: - # current group's start index * 8 to next group's start indices * 8 - epoch_start = packet_index_starts[epoch_index] * num_priorities - epoch_end = packet_index_starts[epoch_index + 1] * num_priorities - # Extract the decompressed data for current epoch. - # epoch_data should be of shape ((num_priorities * num_events),) - epoch_data = decompressed_data[epoch_start:epoch_end] - - # Extract these other data - unordered_priority = priorities_arr[epoch_start:epoch_end] - unordered_data_quality = data_quality_arr[epoch_start:epoch_end] - unordered_num_events = num_events_arr[epoch_start:epoch_end] - - # If priority array unique size is not same size as - # num_priorities, then throw error. They should match. - if len(np.unique(unordered_priority)) != num_priorities: - raise ValueError( - f"Priority array for epoch {epoch_index} contains " - f"non-unique values: {unordered_priority}" - ) + return dataset - # Until here, we have the out of order priority data. Data could have been - # collected in any priority order. Eg. - # priority - [0, 4, 5, 1, 3, 2, 6, 7] - # Now, we need to put data into their respective priority indexes - # in final arrays for the current epoch. Eg. put data into - # priority - [0, 1, 2, 3, 4, 5, 6, 7] - de_data["num_events"][epoch_index, unordered_priority] = unordered_num_events - de_data["data_quality"][epoch_index, unordered_priority] = ( - unordered_data_quality - ) - # Fill the event data into it's bin in same logic as above. But - # since the epoch has different num_events per priority, - # we need to loop and index accordingly. Otherwise, numpy throws - # 'The detected shape was (n,) + inhomogeneous part' error. - for priority_index in range(len(unordered_priority)): - # Get num_events - priority_num_events = int(unordered_num_events[priority_index]) - # Reshape epoch data into (num_events, 8). That 8 is 8-bytes that - # make up 64-bits. Therefore, combine last 8 dimension into one to - # get 64-bits event data that we need to unpack later. First, - # combine last 8 dimension into one 64-bits value - # we need to make a copy and reverse the byte order - # to match LSB order before we use .view. - events_in_bytes = ( - np.array(epoch_data[priority_index], dtype=np.uint8) - .reshape(priority_num_events, 8)[:, ::-1] - .copy() - ) - combined_64bits = events_in_bytes.view(np.uint64)[:, 0] - # Unpack 64-bits into fields - unpacked_fields = unpack_bits(bit_structure, combined_64bits) - # Put unpacked event data into their respective variable and priority - # number bins - priority_num = int(unordered_priority[priority_index]) - for field_name, field_data in unpacked_fields.items(): - if field_name not in ["Priority", "Spare"]: - de_data[field_name][ - epoch_index, priority_num, :priority_num_events - ] = field_data +def _unpack_and_store_events( + de_data: xr.Dataset, + packets: xr.Dataset, + num_priorities: int, + bit_structure: dict, + event_fields: list[str], +) -> xr.Dataset: + """ + Unpack all event data and store directly into the dataset arrays. + + Parameters + ---------- + de_data : xr.Dataset + Dataset to store unpacked events into (modified in place). + packets : xr.Dataset + Combined packets with extracted header fields. + num_priorities : int + Number of priorities per epoch. + bit_structure : dict + Bit structure defining how to unpack 64-bit event values. + event_fields : list[str] + List of field names to unpack (excludes priority/spare). + + Returns + ------- + xr.Dataset + The dataset with unpacked events stored. + """ + # Extract arrays from packets dataset + num_events_arr = packets.num_events.values + priorities_arr = packets.priority.values + event_data_arr = packets.event_data.values + + total_events = int(np.sum(num_events_arr)) + if total_events == 0: + return de_data + + num_packets = len(num_events_arr) + + # Preallocate arrays for concatenated events and their destination indices + all_event_bytes = np.zeros((total_events, 8), dtype=np.uint8) + event_epoch_idx = np.zeros(total_events, dtype=np.int32) + event_priority_idx = np.zeros(total_events, dtype=np.int32) + event_position_idx = np.zeros(total_events, dtype=np.int32) + + # Build concatenated event array and index mappings + offset = 0 + for pkt_idx in range(num_packets): + n_events = int(num_events_arr[pkt_idx]) + if n_events == 0: + continue + + # Extract and byte-reverse events for LSB unpacking + pkt_bytes = np.asarray(event_data_arr[pkt_idx], dtype=np.uint8) + pkt_bytes = pkt_bytes.reshape(n_events, 8)[:, ::-1] + all_event_bytes[offset : offset + n_events] = pkt_bytes + + # Record destination indices for scattering + event_epoch_idx[offset : offset + n_events] = pkt_idx // num_priorities + event_priority_idx[offset : offset + n_events] = priorities_arr[pkt_idx] + event_position_idx[offset : offset + n_events] = np.arange(n_events) + + offset += n_events + + # Convert bytes to 64-bit values and unpack all fields at once + all_64bits = all_event_bytes.view(np.uint64).ravel() + unpacked = unpack_bits(bit_structure, all_64bits) + + # Scatter unpacked values directly into the dataset arrays + for field in event_fields: + de_data[field].values[ + event_epoch_idx, event_priority_idx, event_position_idx + ] = unpacked[field] return de_data -def l1a_direct_event(unpacked_dataset: xr.Dataset, apid: int) -> xr.Dataset: +def process_de_data( + packets: xr.Dataset, + apid: int, + cdf_attrs: ImapCdfAttributes, +) -> xr.Dataset: """ - Process CoDICE L1A Direct Event data. + Process direct event data into a complete CDF-ready dataset. Parameters ---------- - unpacked_dataset : xarray.Dataset - Input L1A Direct Event dataset. + packets : xarray.Dataset + Dataset containing the combined packets with extracted header fields. apid : int - APID to process. + The APID identifying CoDICE-Lo or CoDICE-Hi. + cdf_attrs : ImapCdfAttributes + The CDF attributes manager. Returns ------- xarray.Dataset - Processed L1A Direct Event dataset. + Complete processed Direct Event dataset with coordinates and attributes. """ - # Group segmented data. - # TODO: this may get replaced with space_packet_parser's functionality - grouped_data = group_data(unpacked_dataset) - - # Decompress data shape is (epoch, priority * num_events) - decompressed_data = [ - decompress( - group, - CoDICECompression.LOSSLESS, + # Get configuration for this APID + config = constants.DE_DATA_PRODUCT_CONFIGURATIONS[apid] + num_priorities = config["num_priorities"] + bit_structure = config["bit_structure"] + + # Truncate to complete priority groups only + num_packets = len(packets["epoch"]) + num_epochs = num_packets // num_priorities + num_complete_packets = num_epochs * num_priorities + if num_complete_packets < num_packets: + packets = packets.isel(epoch=slice(None, num_complete_packets)) + + # Create dataset with coordinates + de_data = _create_dataset_coords(packets, apid, num_priorities, cdf_attrs) + + # Set global attributes based on APID + if apid == CODICEAPID.COD_LO_PHA: + de_data.attrs = cdf_attrs.get_global_attributes( + "imap_codice_l1a_lo-direct-events" + ) + de_data["k_factor"] = xr.DataArray( + np.array([constants.K_FACTOR]), + dims=["k_factor"], + attrs=cdf_attrs.get_variable_attributes("k_factor", check_schema=False), + ) + else: + de_data.attrs = cdf_attrs.get_global_attributes( + "imap_codice_l1a_hi-direct-events" ) - for group in grouped_data - ] - # Gather the CDF attributes - cdf_attrs = ImapCdfAttributes() - cdf_attrs.add_instrument_global_attrs("codice") - cdf_attrs.add_instrument_variable_attrs("codice", "l1a") + # Add per-epoch metadata from first packet of each epoch + epoch_slice = slice(None, None, num_priorities) + for var in ["sw_bias_gain_mode", "st_bias_gain_mode"]: + de_data[var] = xr.DataArray( + packets[var].isel(epoch=epoch_slice).values, + dims=["epoch"], + attrs=cdf_attrs.get_variable_attributes(var), + ) - # Unpack DE packet data into CDF-ready variables - de_dataset = process_de_data(unpacked_dataset, decompressed_data, apid, cdf_attrs) + # Initialize 3D event data arrays with fill values + event_fields = [f for f in bit_structure if f not in ["priority"]] + for field in event_fields: + info = bit_structure[field] + attrs = apply_replacements_to_attrs( + cdf_attrs.get_variable_attributes("de_3d_attrs"), + {"num_digits": len(str(info["fillval"])), "valid_max": info["fillval"]}, + ) + de_data[field] = xr.DataArray( + np.full( + (num_epochs, num_priorities, constants.MAX_DE_EVENTS_PER_PACKET), + info["fillval"], + dtype=info["dtype"], + ), + dims=["epoch", "priority", "event_num"], + attrs=attrs, + ) - # Determine the epochs to use in the dataset, which are the epochs whenever - # there is a start of a segment and the priority is 0 - epoch_indices = np.where( - ( - (unpacked_dataset.seq_flgs.data == SegmentedPacketOrder.UNSEGMENTED) - | (unpacked_dataset.seq_flgs.data == SegmentedPacketOrder.FIRST_SEGMENT) + # Initialize 2D per-priority metadata arrays + for var in ["num_events", "data_quality"]: + de_data[var] = xr.DataArray( + np.full((num_epochs, num_priorities), 65535, dtype=np.uint16), + dims=["epoch", "priority"], + attrs=cdf_attrs.get_variable_attributes("de_2d_attrs"), ) - & (unpacked_dataset.priority.data == 0) - )[0] - acq_start_seconds = unpacked_dataset.acq_start_seconds[epoch_indices] - acq_start_subseconds = unpacked_dataset.acq_start_subseconds[epoch_indices] - spin_periods = unpacked_dataset.spin_period[epoch_indices] - - # Calculate epoch variables using sensor id and apid - # Provide 0 as default input for other inputs but they - # are not used in epoch calculation - view_tab_info = ViewTabInfo( - apid=apid, - sensor=1 if apid == CODICEAPID.COD_HI_PHA else 0, - collapse_table=0, - three_d_collapsed=0, - view_id=0, - ) - epochs, epochs_delta = get_codice_epoch_time( - acq_start_seconds, acq_start_subseconds, spin_periods, view_tab_info - ) - # Define coordinates - epoch = xr.DataArray( - met_to_ttj2000ns(epochs), - name="epoch", - dims=["epoch"], - attrs=cdf_attrs.get_variable_attributes("epoch", check_schema=False), - ) - epoch_delta_minus = xr.DataArray( - epochs_delta, - name="epoch_delta_minus", - dims=["epoch"], - attrs=cdf_attrs.get_variable_attributes( - "epoch_delta_minus", check_schema=False - ), - ) - epoch_delta_plus = xr.DataArray( - epochs_delta, - name="epoch_delta_plus", - dims=["epoch"], - attrs=cdf_attrs.get_variable_attributes("epoch_delta_plus", check_schema=False), - ) - event_num = xr.DataArray( - np.arange(constants.MAX_DE_EVENTS_PER_PACKET), - name="event_num", - dims=["event_num"], - attrs=cdf_attrs.get_variable_attributes("event_num", check_schema=False), - ) - event_num_label = xr.DataArray( - np.arange(constants.MAX_DE_EVENTS_PER_PACKET).astype(str), - name="event_num_label", - dims=["event_num"], - attrs=cdf_attrs.get_variable_attributes("event_num_label", check_schema=False), - ) - priority = xr.DataArray( - np.arange(constants.DE_DATA_PRODUCT_CONFIGURATIONS[apid]["num_priorities"]), - name="priority", - dims=["priority"], - attrs=cdf_attrs.get_variable_attributes("priority", check_schema=False), - ) - priority_label = xr.DataArray( - np.arange( - constants.DE_DATA_PRODUCT_CONFIGURATIONS[apid]["num_priorities"] - ).astype(str), - name="priority_label", - dims=["priority"], - attrs=cdf_attrs.get_variable_attributes("priority_label", check_schema=False), - ) + # Reshape packet arrays for validation and assignment + priorities_2d = packets.priority.values.reshape(num_epochs, num_priorities) + num_events_2d = packets.num_events.values.reshape(num_epochs, num_priorities) + data_quality_2d = packets.suspect.values.reshape(num_epochs, num_priorities) + + # Validate each epoch has all unique priorities + unique_counts = np.array([len(np.unique(row)) for row in priorities_2d]) + if np.any(unique_counts != num_priorities): + bad_epoch = np.argmax(unique_counts != num_priorities) + raise ValueError( + f"Priority array for epoch {bad_epoch} contains " + f"non-unique values: {priorities_2d[bad_epoch]}" + ) - # Logical source id to lookup global attributes - if apid == CODICEAPID.COD_LO_PHA: - attrs = cdf_attrs.get_global_attributes("imap_codice_l1a_lo-direct-events") - elif apid == CODICEAPID.COD_HI_PHA: - attrs = cdf_attrs.get_global_attributes("imap_codice_l1a_hi-direct-events") - - # Add coordinates and global attributes to dataset - de_dataset = de_dataset.assign_coords( - epoch=epoch, - epoch_delta_minus=epoch_delta_minus, - epoch_delta_plus=epoch_delta_plus, - event_num=event_num, - event_num_label=event_num_label, - priority=priority, - priority_label=priority_label, + # Assign num_events and data_quality using priorities as column indices + epoch_idx = np.arange(num_epochs)[:, np.newaxis] + de_data["num_events"].values[epoch_idx, priorities_2d] = num_events_2d + de_data["data_quality"].values[epoch_idx, priorities_2d] = data_quality_2d + + # Unpack all events and store directly into dataset arrays + de_data = _unpack_and_store_events( + de_data, + packets, + num_priorities, + bit_structure, + event_fields, ) - de_dataset.attrs = attrs - # Carry over these variables from unpacked dataset - if apid == CODICEAPID.COD_LO_PHA: - # Add k_factor - de_dataset["k_factor"] = xr.DataArray( - np.array([constants.K_FACTOR]), - name="k_factor", - dims=["k_factor"], - attrs=cdf_attrs.get_variable_attributes("k_factor", check_schema=False), - ) + return de_data - de_dataset["sw_bias_gain_mode"] = xr.DataArray( - unpacked_dataset["sw_bias_gain_mode"].data[epoch_indices], - name="sw_bias_gain_mode", - dims=["epoch"], - attrs=cdf_attrs.get_variable_attributes("sw_bias_gain_mode"), - ) - de_dataset["st_bias_gain_mode"] = xr.DataArray( - unpacked_dataset["st_bias_gain_mode"].data[epoch_indices], - name="st_bias_gain_mode", - dims=["epoch"], - attrs=cdf_attrs.get_variable_attributes("st_bias_gain_mode"), +def l1a_direct_event(unpacked_dataset: xr.Dataset, apid: int) -> xr.Dataset: + """ + Process CoDICE L1A Direct Event data. + + Parameters + ---------- + unpacked_dataset : xarray.Dataset + Input L1A Direct Event dataset. + apid : int + APID to process. + + Returns + ------- + xarray.Dataset + Processed L1A Direct Event dataset. + """ + # Combine segmented packets and extract header fields + packets = combine_segmented_packets( + unpacked_dataset, binary_field_name="event_data" ) + packets = extract_initial_items_from_combined_packets(packets) + + # Gather the CDF attributes + cdf_attrs = ImapCdfAttributes() + cdf_attrs.add_instrument_global_attrs("codice") + cdf_attrs.add_instrument_variable_attrs("codice", "l1a") - return de_dataset + # Process packets into complete CDF-ready dataset + return process_de_data(packets, apid, cdf_attrs) diff --git a/imap_processing/codice/packet_definitions/codice_packet_definition.xml b/imap_processing/codice/packet_definitions/codice_packet_definition.xml index a50eb3d4b6..829ca02f04 100644 --- a/imap_processing/codice/packet_definitions/codice_packet_definition.xml +++ b/imap_processing/codice/packet_definitions/codice_packet_definition.xml @@ -1,6 +1,6 @@ - + @@ -45,7 +45,7 @@ - + @@ -297,8 +297,17 @@ + + + + + + + + + - + @@ -358,7 +367,7 @@ - + @@ -439,7 +448,8 @@ - + + @@ -567,46 +577,28 @@ - - - - - - - + - - - - - - - + - - - - - - - + - - - - - - - + - + + + + + + + @@ -615,7 +607,8 @@ - + + @@ -624,7 +617,8 @@ - + + @@ -633,7 +627,8 @@ - + + @@ -688,7 +683,7 @@ - + @@ -697,8 +692,7 @@ - - + @@ -707,8 +701,7 @@ - - + @@ -717,7 +710,7 @@ - + @@ -726,7 +719,7 @@ - + @@ -796,19 +789,19 @@ - + - + - + - + - + @@ -835,15 +828,55 @@ - + - + + + + + - + - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -1115,7 +1148,7 @@ - + @@ -1145,7 +1178,13 @@ - + + + + + + + @@ -1154,7 +1193,13 @@ - + + + + + + + @@ -1163,7 +1208,13 @@ - + + + + + + + @@ -1172,7 +1223,13 @@ - + + + + + + + @@ -1181,7 +1238,13 @@ - + + + + + + + @@ -1474,74 +1537,12 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + @@ -1737,7 +1738,6 @@ - @@ -2422,74 +2422,12 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + @@ -3104,6 +3042,12 @@ + + INDICATES THE CURRENT OPERATIONAL STATE OF THE LO ESA SWEEP: +- NORMAL - BOTH ESAS ARE TRACKING TOGETHER +- RGFO - REDUCED GAIN FACTOR OPERATION; ESA-A IS REDUCED IN ORDER TO REDUCE THE GAIN FACTOR AND ALLOW FEWER IONS INTO THE DETECTOR +- NSO - NO SCAN OPERATION; BOTH ESAS ARE RETURNED TO A HIGH-ENERGY SETTING AND NO SCANNING IS DONE FOR THE REMAINDER OF THE ESA SWEEP + @@ -3127,17 +3071,27 @@ - + - - - - - - - - - + + ALARM PERSISTENCE = 3 IN OASIS + + + ALARM PERSISTENCE = 3 IN OASIS + + + ALARM PERSISTENCE = 3 IN OASIS + + + ALARM PERSISTENCE = 3 IN OASIS + + + ALARM PERSISTENCE = 3 IN OASIS + + + + + @@ -3159,11 +3113,13 @@ - - - - - + + + + + + EACH BIT INDICATES WHETHER THE CORRESPONDING MACRO IS CURRENTLY RUNNING (E.G. BIT 1 WILL BE SET IF MACRO 1 IS RUNNING) + INDICATES WHETHER ANY CATEGORY 1 LIMITS HAVE TRIGGERED. @@ -3194,7 +3150,7 @@ INDICATES WHETHER THE MOST RECENT TRIGGER WAS A MINIMUM OR MAXIMUM LIMIT - INDICATES THE ID OF THE MOST RECENT FDC TRIGGER + INDICATES THE TABLE INDEX OF THE MOST RECENT FDC TRIGGER INDICATES THE ACTION THAT WAS TAKEN FOR THE MOST RECENT FDC TRIGGER @@ -3208,7 +3164,7 @@ INDICATES WHETHER FSW CONTROL OF THE OPERATIONAL HEATER IS ENABLED - + INDICATES THE CURRENT STATE OF THE PHYSICAL HEATER OUTPUT @@ -3313,40 +3269,6 @@ SECONDARY HEADER - WHOLE-SECONDS PART OF SCLK - - PACKET VERSION - THIS WILL BE INCREMENTED EACH TIME THE FORMAT OF THE PACKET CHANGESCOD_LO_PHA. - - - SPIN PERIOD REPORTED BY THE SPACECRAFT IN THE TIME AND STATUS MESSAGE. REPORTED PERIOD IS THE PERIOD THAT WAS ACTIVE WHEN THE 16-SPIN ACQUISITION CYCLE STARTED. - - - FULL-SECONDS PORTION OF THE TIME AT WHICH THE 16-SPIN CYCLE STARTED - - - SUB-SECONDS PORTION OF THE TIME AT WHICH THE 16-SPIN CYCLE STARTED - - - SPARE FOR ALIGNMENT - - - BIAS GAIN MODE FOR THE SUPRATHERMAL SECTOR - - - BIAS GAIN MODE FOR THE SOLARWIND SECTOR - - - - INDICATES THAT THERE WAS SOME ERROR DETECTED DURING ACQUISITION OR PROCESSING OF THE DATA. ERRORS COULD INCLUDE CORRUPTED ACQUISITION MEMORY (I.E. EDAC ERRORS), TIMING VIOLATIONS, OR OTHER EVENTS THAT INTERRUPTED OR OTHERWISE AFFECTED DATA COLLECTION. - - - WHETHER THE EVENT DATA IS COMPRESSED. IF 1/YES, EVENT_DATA ARRAY IS COMPRESSED USING THE LZMA COMPRESSION ALGORITHM. - - - NUMBER OF EVENTS SELECTED FOR DOWNLINK (I.E. NUMBER OF EVENTS IN THE EVENT_DATA ARRAY) - - - NUMBER OF BYTES IN THE EVENT_DATA ARRAY. IF COMPRESSED, THIS VALUE REPRESENTS THE LENGTH OF THE COMPRESSED DATA. - OPTIONALLY COMPRESSED ARRAY OF EVENT DATA @@ -3999,40 +3921,6 @@ WHEN THIS ARRAY IS TOO LARGE FOR A SINGLE CCSDS PACKET, CODICE WILL UTILIZE THE SECONDARY HEADER - WHOLE-SECONDS PART OF SCLK - - PACKET VERSION - THIS WILL BE INCREMENTED EACH TIME THE FORMAT OF THE PACKET CHANGESCOD_LO_PHA. - - - SPIN PERIOD REPORTED BY THE SPACECRAFT IN THE TIME AND STATUS MESSAGE. REPORTED PERIOD IS THE PERIOD THAT WAS ACTIVE WHEN THE 16-SPIN ACQUISITION CYCLE STARTED. - - - FULL-SECONDS PORTION OF THE TIME AT WHICH THE 16-SPIN CYCLE STARTED - - - SUB-SECONDS PORTION OF THE TIME AT WHICH THE 16-SPIN CYCLE STARTED - - - SPARE FOR ALIGNMENT - - - BIAS GAIN MODE FOR THE SUPRATHERMAL SECTOR - - - BIAS GAIN MODE FOR THE SOLARWIND SECTOR - - - - INDICATES THAT THERE WAS SOME ERROR DETECTED DURING ACQUISITION OR PROCESSING OF THE DATA. ERRORS COULD INCLUDE CORRUPTED ACQUISITION MEMORY (I.E. EDAC ERRORS), TIMING VIOLATIONS, OR OTHER EVENTS THAT INTERRUPTED OR OTHERWISE AFFECTED DATA COLLECTION. - - - WHETHER THE EVENT DATA IS COMPRESSED. IF 1/YES, EVENT_DATA ARRAY IS COMPRESSED USING THE RICE COMPRESSION ALGORITHM. - - - NUMBER OF EVENTS SELECTED FOR DOWNLINK (I.E. NUMBER OF EVENTS IN THE EVENT_DATA ARRAY) - - - NUMBER OF BYTES IN THE EVENT_DATA ARRAY. IF COMPRESSED, THIS VALUE REPRESENTS THE LENGTH OF THE COMPRESSED DATA. - OPTIONALLY COMPRESSED ARRAY OF EVENT DATA @@ -4491,6 +4379,7 @@ WHEN THIS ARRAY IS TOO LARGE FOR A SINGLE CCSDS PACKET, CODICE WILL UTILIZE THE + @@ -4546,11 +4435,11 @@ WHEN THIS ARRAY IS TOO LARGE FOR A SINGLE CCSDS PACKET, CODICE WILL UTILIZE THE - - - - - + + + + + @@ -4565,7 +4454,7 @@ WHEN THIS ARRAY IS TOO LARGE FOR A SINGLE CCSDS PACKET, CODICE WILL UTILIZE THE - + @@ -4632,18 +4521,6 @@ WHEN THIS ARRAY IS TOO LARGE FOR A SINGLE CCSDS PACKET, CODICE WILL UTILIZE THE - - - - - - - - - - - - @@ -4908,18 +4785,6 @@ WHEN THIS ARRAY IS TOO LARGE FOR A SINGLE CCSDS PACKET, CODICE WILL UTILIZE THE - - - - - - - - - - - - From c765c7e6c1d166d9e4740ccb88e3a5288ee5597b Mon Sep 17 00:00:00 2001 From: Greg Lucas Date: Sat, 17 Jan 2026 21:04:14 -0700 Subject: [PATCH 2/4] MNT: Add logging for source sequence counters to packet decom We have had some issues with missing source sequence counters, meaning not all necessary packets were present. This just adds some simple logs that can warn about this case and give some awareness about what is going on. --- imap_processing/tests/test_utils.py | 12 ++++++++ imap_processing/utils.py | 48 +++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+) diff --git a/imap_processing/tests/test_utils.py b/imap_processing/tests/test_utils.py index 1f5c0ca512..9d22e0074f 100644 --- a/imap_processing/tests/test_utils.py +++ b/imap_processing/tests/test_utils.py @@ -306,6 +306,18 @@ def test_combine_segmented_packets(): xr.testing.assert_equal(combined_ds, expected_ds) +def test_check_source_sequence_counter(caplog): + """Test _check_source_sequence_counter function.""" + data_vars = { + "src_seq_ctr": (["epoch"], np.array([0, 1, 3, 4, 6])), + } + ds = xr.Dataset(data_vars=data_vars) + + utils._check_source_sequence_counter(ds, apid=1234) + + assert "Found [2] gap(s) in source sequence counter for APID 1234" in caplog.text + + def test_extract_data_dict(): """Test extract_data_dict function.""" data_vars = { diff --git a/imap_processing/utils.py b/imap_processing/utils.py index cbfd5fcb7a..9710ee30d2 100644 --- a/imap_processing/utils.py +++ b/imap_processing/utils.py @@ -333,6 +333,9 @@ def packet_file_to_datasets( ) ds = ds.isel(epoch=unique_indices) + # Log a warning if there are gaps in the source sequence counter + _check_source_sequence_counter(ds, apid) + # Strip any leading characters before "." from the field names which was due # to the packet_name being a part of the variable name in the XTCE definition ds = ds.rename( @@ -423,6 +426,51 @@ def combine_segmented_packets( return combined_packets +def _check_source_sequence_counter(ds: xr.Dataset, apid: int) -> None: + """ + Check for gaps in the source sequence counter. + + Log a warning if gaps are found, but don't do anything else. + + Parameters + ---------- + ds : xarray.Dataset + Dataset containing the packets to check. + apid : int + APID of the packets. + """ + # Check for sequential source sequence counters + # CCSDS source sequence counter is a 14-bit field (0-16383) + counter_max = 16384 + src_seq_ctr = ds["src_seq_ctr"].data + + if len(src_seq_ctr) <= 1: + return + + # Check if each counter equals (previous + 1) % counter_max + # This handles both normal increments and rollover (16383 -> 0) + expected = (src_seq_ctr[:-1] + 1) % counter_max + actual = src_seq_ctr[1:] + non_sequential = expected != actual + + if np.any(non_sequential): + gap_indices = np.where(non_sequential)[0] + # Calculate total missing packets across all gaps + total_missing = sum( + (src_seq_ctr[idx + 1] - src_seq_ctr[idx] - 1) % counter_max + for idx in gap_indices + ) + # Show the counter values before and after each gap + gap_starts = src_seq_ctr[gap_indices].tolist() + gap_ends = src_seq_ctr[gap_indices + 1].tolist() + gap_pairs = list(zip(gap_starts, gap_ends, strict=True)) + logger.warning( + f"Found [{len(gap_indices)}] gap(s) in source sequence counter " + f"for APID {apid} at {gap_pairs} " + f"({total_missing} total missing packets)" + ) + + def packet_generator( packet_file: str | Path, xtce_packet_definition: str | Path, From dfcc494981aba48dc0b2e22bd902d1e374053b1a Mon Sep 17 00:00:00 2001 From: Greg Lucas Date: Sun, 18 Jan 2026 12:54:04 -0700 Subject: [PATCH 3/4] MNT: CoDICE filter out incomplete priority groups The acquisition start times dictate what priorities are in a group. i.e. all priorities have the same start time. Since we know how many priorities there are per Hi/Lo configuration, we know that we should have that many exact items with a given acquisition start time. This allows us to filter out items that weren't complete. --- imap_processing/codice/codice_l1a_de.py | 35 ++++++++++++++++++++----- imap_processing/utils.py | 6 +++++ 2 files changed, 35 insertions(+), 6 deletions(-) diff --git a/imap_processing/codice/codice_l1a_de.py b/imap_processing/codice/codice_l1a_de.py index 7f1afca911..99d08bebe6 100644 --- a/imap_processing/codice/codice_l1a_de.py +++ b/imap_processing/codice/codice_l1a_de.py @@ -1,5 +1,7 @@ """Processing functions for CoDICE L1A Direct Event data.""" +import logging + import numpy as np import xarray as xr @@ -16,6 +18,8 @@ from imap_processing.spice.time import met_to_ttj2000ns from imap_processing.utils import combine_segmented_packets +logger = logging.getLogger(__name__) + def extract_initial_items_from_combined_packets( packets: xr.Dataset, @@ -355,12 +359,31 @@ def process_de_data( num_priorities = config["num_priorities"] bit_structure = config["bit_structure"] - # Truncate to complete priority groups only - num_packets = len(packets["epoch"]) - num_epochs = num_packets // num_priorities - num_complete_packets = num_epochs * num_priorities - if num_complete_packets < num_packets: - packets = packets.isel(epoch=slice(None, num_complete_packets)) + # Identify complete priority groups by acq_start_seconds + # Each priority group should have exactly num_priorities packets + # with the same acq_start_seconds value + acq_start_seconds = packets["acq_start_seconds"].values + unique_times, counts = np.unique(acq_start_seconds, return_counts=True) + + # Find incomplete groups (not exactly num_priorities packets) + incomplete_mask = counts != num_priorities + if np.any(incomplete_mask): + incomplete_times = unique_times[incomplete_mask] + incomplete_counts = counts[incomplete_mask] + logger.warning( + f"Found {len(incomplete_times)} incomplete priority group(s) " + f"for APID {apid}. Expected {num_priorities} packets per group. " + f"Incomplete groups at acq_start_seconds {incomplete_times.tolist()} " + f"with counts {incomplete_counts.tolist()}. Dropping these packets." + ) + + # Keep only complete groups + complete_times = unique_times[~incomplete_mask] + keep_mask = np.isin(acq_start_seconds, complete_times) + packets = packets.isel(epoch=keep_mask) + + # Calculate number of epochs from complete groups + num_epochs = len(complete_times) # Create dataset with coordinates de_data = _create_dataset_coords(packets, apid, num_priorities, cdf_attrs) diff --git a/imap_processing/utils.py b/imap_processing/utils.py index 9710ee30d2..5aa8589834 100644 --- a/imap_processing/utils.py +++ b/imap_processing/utils.py @@ -389,6 +389,8 @@ def combine_segmented_packets( # Get indices of packets we'll keep (first packet of each group) group_start_indices = np.where(is_group_start)[0] + # Keep track of the groups that don't have the expected sequences + bad_groups = [] # Concatenate binary data in-place for each group for group_id in np.unique(group_ids): @@ -411,15 +413,19 @@ def combine_segmented_packets( and not np.all(seq_flags[1:-1] == SequenceFlags.CONTINUATION) ) ): + bad_groups.append(start_index) logger.warning( f"Incorrect/incomplete sequence flags in group {group_id}. " f"Flags: {seq_flags}, " f"SHCOARSEs: {packets['shcoarse'].data[group_indices]}" ) + packets[binary_field_name].data[start_index] = np.sum( packets[binary_field_name].data[group_indices] ) + # Remove any bad groups from the start indices we are keeping + group_start_indices = np.setdiff1d(group_start_indices, bad_groups) # Select only the first packet of each group (drop the middle/last packets) combined_packets = packets.isel(epoch=group_start_indices) From deada8e782249b5cf6a209a7c306c1f4878b0d93 Mon Sep 17 00:00:00 2001 From: Greg Lucas Date: Sun, 18 Jan 2026 13:06:32 -0700 Subject: [PATCH 4/4] DOC: Update documentation on CoDICE segmented packet handling --- imap_processing/codice/codice_l1a_de.py | 27 +++++++++++++++---------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/imap_processing/codice/codice_l1a_de.py b/imap_processing/codice/codice_l1a_de.py index 99d08bebe6..f373893845 100644 --- a/imap_processing/codice/codice_l1a_de.py +++ b/imap_processing/codice/codice_l1a_de.py @@ -25,19 +25,24 @@ def extract_initial_items_from_combined_packets( packets: xr.Dataset, ) -> xr.Dataset: """ - Extract metadata fields from the beginning of combined event_data packets. + Extract fields from the beginning of combined event_data packets. Extracts bit fields from the first 20 bytes of each event_data array - and adds them as new variables to the dataset. + and add them as new variables to the dataset. + + This was previously done in XTCE, but we can't do that because of + segmented packets that need to be combined. Each segmented packet + has its own (SHCOARSE, EVENTDATA, CHKSUM) fields, so we need to + only combine along the EVENTDATA field and extract data that way. Parameters ---------- - packets : xr.Dataset + packets : xarray.Dataset Dataset containing combined packets with event_data. Returns ------- - xr.Dataset + xarray.Dataset Dataset with extracted metadata fields added. """ # Initialize arrays for extracted fields @@ -170,7 +175,7 @@ def _create_dataset_coords( Parameters ---------- - packets : xr.Dataset + packets : xarray.Dataset Combined packets with extracted header fields. apid : int APID for sensor type. @@ -181,7 +186,7 @@ def _create_dataset_coords( Returns ------- - xr.Dataset + xarray.Dataset Dataset with coordinates defined. """ # Get timing info from the first packet of each epoch @@ -267,9 +272,9 @@ def _unpack_and_store_events( Parameters ---------- - de_data : xr.Dataset + de_data : xarray.Dataset Dataset to store unpacked events into (modified in place). - packets : xr.Dataset + packets : xarray.Dataset Combined packets with extracted header fields. num_priorities : int Number of priorities per epoch. @@ -280,7 +285,7 @@ def _unpack_and_store_events( Returns ------- - xr.Dataset + xarray.Dataset The dataset with unpacked events stored. """ # Extract arrays from packets dataset @@ -312,7 +317,7 @@ def _unpack_and_store_events( pkt_bytes = pkt_bytes.reshape(n_events, 8)[:, ::-1] all_event_bytes[offset : offset + n_events] = pkt_bytes - # Record destination indices for scattering + # Record destination indices for later array-based assignments event_epoch_idx[offset : offset + n_events] = pkt_idx // num_priorities event_priority_idx[offset : offset + n_events] = priorities_arr[pkt_idx] event_position_idx[offset : offset + n_events] = np.arange(n_events) @@ -323,7 +328,7 @@ def _unpack_and_store_events( all_64bits = all_event_bytes.view(np.uint64).ravel() unpacked = unpack_bits(bit_structure, all_64bits) - # Scatter unpacked values directly into the dataset arrays + # Place unpacked values directly into the dataset arrays for field in event_fields: de_data[field].values[ event_epoch_idx, event_priority_idx, event_position_idx