Skip to content

⚡️ Speed up function sub_tracks by 69% #61

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from

Conversation

codeflash-ai[bot]
Copy link

@codeflash-ai codeflash-ai bot commented Feb 3, 2025

📄 69% (0.69x) speedup for sub_tracks in supervision/tracker/byte_tracker/core.py

⏱️ Runtime : 340 microseconds 201 microseconds (best of 147 runs)

📝 Explanation and details

Certainly! Here is a rewritten and optimized version of this program.

Explanation.

  1. Set Comprehension for track_ids_b: Using a set comprehension to create track_ids_b is efficient.
  2. List Comprehension for Filtering: Using a list comprehension to filter out the tracks from track_list_a that have their internal_track_id in track_ids_b is both faster and more memory efficient.

This solution avoids the intermediate dictionary creation, which reduces both the time complexity and memory footprint.

Correctness verification report:

Test Status
⚙️ Existing Unit Tests 🔘 None Found
🌀 Generated Regression Tests 27 Passed
⏪ Replay Tests 🔘 None Found
🔎 Concolic Coverage Tests 🔘 None Found
📊 Tests Coverage 100.0%
🌀 Generated Regression Tests Details
from typing import List
from unittest.mock import MagicMock

import numpy as np
# imports
import pytest  # used for our unit tests
from supervision.tracker.byte_tracker.core import sub_tracks


# function to test
class STrack:
    def __init__(self, internal_track_id):
        self.internal_track_id = internal_track_id
from supervision.tracker.byte_tracker.core import sub_tracks


# unit tests
def test_no_overlapping_tracks():
    # No overlapping tracks
    track_list_a = [STrack(1), STrack(2)]
    track_list_b = [STrack(3), STrack(4)]
    codeflash_output = sub_tracks(track_list_a, track_list_b)

def test_some_overlapping_tracks():
    # Some overlapping tracks
    track_list_a = [STrack(1), STrack(2)]
    track_list_b = [STrack(2), STrack(3)]
    codeflash_output = sub_tracks(track_list_a, track_list_b)

def test_all_overlapping_tracks():
    # All overlapping tracks
    track_list_a = [STrack(1), STrack(2)]
    track_list_b = [STrack(1), STrack(2)]
    codeflash_output = sub_tracks(track_list_a, track_list_b)

def test_empty_track_list_a():
    # Empty track_list_a
    track_list_a = []
    track_list_b = [STrack(1), STrack(2)]
    codeflash_output = sub_tracks(track_list_a, track_list_b)

def test_empty_track_list_b():
    # Empty track_list_b
    track_list_a = [STrack(1), STrack(2)]
    track_list_b = []
    codeflash_output = sub_tracks(track_list_a, track_list_b)

def test_both_lists_empty():
    # Both lists empty
    track_list_a = []
    track_list_b = []
    codeflash_output = sub_tracks(track_list_a, track_list_b)

def test_multiple_overlapping_tracks():
    # Multiple overlapping tracks
    track_list_a = [STrack(1), STrack(2), STrack(3)]
    track_list_b = [STrack(2), STrack(3), STrack(4)]
    codeflash_output = sub_tracks(track_list_a, track_list_b)

def test_non_sequential_ids():
    # Non-sequential IDs
    track_list_a = [STrack(10), STrack(20)]
    track_list_b = [STrack(15), STrack(20)]
    codeflash_output = sub_tracks(track_list_a, track_list_b)

def test_large_track_list_a():
    # Large track_list_a
    track_list_a = [STrack(i) for i in range(1000)]
    track_list_b = [STrack(i) for i in range(500, 1500)]
    codeflash_output = sub_tracks(track_list_a, track_list_b)

def test_large_track_list_b():
    # Large track_list_b
    track_list_a = [STrack(i) for i in range(500)]
    track_list_b = [STrack(i) for i in range(1000)]
    codeflash_output = sub_tracks(track_list_a, track_list_b)

def test_both_lists_large():
    # Both lists large
    track_list_a = [STrack(i) for i in range(1000)]
    track_list_b = [STrack(i) for i in range(1000, 2000)]
    codeflash_output = sub_tracks(track_list_a, track_list_b)

def test_duplicate_ids_in_track_list_a():
    # Duplicate IDs in track_list_a
    track_list_a = [STrack(1), STrack(1), STrack(2)]
    track_list_b = [STrack(2)]
    codeflash_output = sub_tracks(track_list_a, track_list_b)

def test_duplicate_ids_in_track_list_b():
    # Duplicate IDs in track_list_b
    track_list_a = [STrack(1), STrack(2)]
    track_list_b = [STrack(2), STrack(2)]
    codeflash_output = sub_tracks(track_list_a, track_list_b)

def test_no_overlapping_ids_with_different_data_types():
    # No overlapping IDs with different data types
    track_list_a = [STrack('1'), STrack('2')]
    track_list_b = [STrack(1), STrack(2)]
    codeflash_output = sub_tracks(track_list_a, track_list_b)
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.

from typing import List

import numpy as np
# imports
import pytest  # used for our unit tests
from supervision.tracker.byte_tracker.core import sub_tracks
# function to test
from supervision.tracker.byte_tracker.kalman_filter import KalmanFilter
from supervision.tracker.byte_tracker.single_object_track import TrackState
from supervision.tracker.byte_tracker.utils import IdCounter


class STrack:
    def __init__(
        self,
        tlwh: np.ndarray,
        score: np.ndarray,
        minimum_consecutive_frames: int,
        shared_kalman: KalmanFilter,
        internal_id_counter: IdCounter,
        external_id_counter: IdCounter,
    ):
        self.state = TrackState.New
        self.is_activated = False
        self.start_frame = 0
        self.frame_id = 0

        self._tlwh = np.asarray(tlwh, dtype=np.float32)
        self.kalman_filter = None
        self.shared_kalman = shared_kalman
        self.mean, self.covariance = None, None
        self.is_activated = False

        self.score = score
        self.tracklet_len = 0

        self.minimum_consecutive_frames = minimum_consecutive_frames

        self.internal_id_counter = internal_id_counter
        self.external_id_counter = external_id_counter
        self.internal_track_id = self.internal_id_counter.NO_ID
        self.external_track_id = self.external_id_counter.NO_ID

    def predict(self) -> None:
        mean_state = self.mean.copy()
        if self.state != TrackState.Tracked:
            mean_state[7] = 0
        self.mean, self.covariance = self.kalman_filter.predict(
            mean_state, self.covariance
        )

    @staticmethod
    def multi_predict(stracks: List[STrack], shared_kalman: KalmanFilter) -> None:
        if len(stracks) > 0:
            multi_mean = []
            multi_covariance = []
            for i, st in enumerate(stracks):
                multi_mean.append(st.mean.copy())
                multi_covariance.append(st.covariance)
                if st.state != TrackState.Tracked:
                    multi_mean[i][7] = 0

            multi_mean, multi_covariance = shared_kalman.multi_predict(
                np.asarray(multi_mean), np.asarray(multi_covariance)
            )
            for i, (mean, cov) in enumerate(zip(multi_mean, multi_covariance)):
                stracks[i].mean = mean
                stracks[i].covariance = cov

    def activate(self, kalman_filter: KalmanFilter, frame_id: int) -> None:
        """Start a new tracklet"""
        self.kalman_filter = kalman_filter
        self.internal_track_id = self.internal_id_counter.new_id()
        self.mean, self.covariance = self.kalman_filter.initiate(
            self.tlwh_to_xyah(self._tlwh)
        )

        self.tracklet_len = 0
        self.state = TrackState.Tracked
        if frame_id == 1:
            self.is_activated = True

        if self.minimum_consecutive_frames == 1:
            self.external_track_id = self.external_id_counter.new_id()

        self.frame_id = frame_id
        self.start_frame = frame_id

    def re_activate(self, new_track: STrack, frame_id: int) -> None:
        self.mean, self.covariance = self.kalman_filter.update(
            self.mean, self.covariance, self.tlwh_to_xyah(new_track.tlwh)
        )
        self.tracklet_len = 0
        self.state = TrackState.Tracked

        self.frame_id = frame_id
        self.score = new_track.score

    def update(self, new_track: STrack, frame_id: int) -> None:
        """
        Update a matched track
        :type new_track: STrack
        :type frame_id: int
        :type update_feature: bool
        :return:
        """
        self.frame_id = frame_id
        self.tracklet_len += 1

        new_tlwh = new_track.tlwh
        self.mean, self.covariance = self.kalman_filter.update(
            self.mean, self.covariance, self.tlwh_to_xyah(new_tlwh)
        )
        self.state = TrackState.Tracked
        if self.tracklet_len == self.minimum_consecutive_frames:
            self.is_activated = True
            if self.external_track_id == self.external_id_counter.NO_ID:
                self.external_track_id = self.external_id_counter.new_id()

        self.score = new_track.score

    @property
    def tlwh(self) -> np.ndarray:
        """Get current position in bounding box format `(top left x, top left y,
        width, height)`.
        """
        if self.mean is None:
            return self._tlwh.copy()
        ret = self.mean[:4].copy()
        ret[2] *= ret[3]
        ret[:2] -= ret[2:] / 2
        return ret

    @property
    def tlbr(self) -> np.ndarray:
        """Convert bounding box to format `(min x, min y, max x, max y)`, i.e.,
        `(top left, bottom right)`.
        """
        ret = self.tlwh.copy()
        ret[2:] += ret[:2]
        return ret

    @staticmethod
    def tlwh_to_xyah(tlwh) -> np.ndarray:
        """Convert bounding box to format `(center x, center y, aspect ratio,
        height)`, where the aspect ratio is `width / height`.
        """
        ret = np.asarray(tlwh).copy()
        ret[:2] += ret[2:] / 2
        ret[2] /= ret[3]
        return ret

    def to_xyah(self) -> np.ndarray:
        return self.tlwh_to_xyah(self.tlwh)

    @staticmethod
    def tlbr_to_tlwh(tlbr) -> np.ndarray:
        ret = np.asarray(tlbr).copy()
        ret[2:] -= ret[:2]
        return ret

    @staticmethod
    def tlwh_to_tlbr(tlwh) -> np.ndarray:
        ret = np.asarray(tlwh).copy()
        ret[2:] += ret[:2]
        return ret

    def __repr__(self) -> str:
        return "OT_{}_({}-{})".format(
            self.internal_track_id, self.start_frame, self.frame_id
        )
from supervision.tracker.byte_tracker.core import sub_tracks


# unit tests
def create_track(internal_track_id):
    # Helper function to create a dummy STrack with a given internal_track_id
    class DummyCounter:
        def __init__(self, id):
            self.NO_ID = -1
            self.id = id

        def new_id(self):
            return self.id

    return STrack(
        tlwh=np.array([0, 0, 1, 1], dtype=np.float32),
        score=np.array([1.0], dtype=np.float32),
        minimum_consecutive_frames=1,
        shared_kalman=None,
        internal_id_counter=DummyCounter(internal_track_id),
        external_id_counter=DummyCounter(internal_track_id)
    )

def test_non_overlapping_ids():
    # Test case where there are no overlapping IDs
    track_list_a = [create_track(1), create_track(2), create_track(3)]
    track_list_b = [create_track(4), create_track(5)]
    codeflash_output = sub_tracks(track_list_a, track_list_b)

def test_some_overlapping_ids():
    # Test case where some IDs overlap
    track_list_a = [create_track(1), create_track(2), create_track(3)]
    track_list_b = [create_track(2), create_track(4)]
    codeflash_output = sub_tracks(track_list_a, track_list_b)

def test_all_overlapping_ids():
    # Test case where all IDs overlap
    track_list_a = [create_track(1), create_track(2), create_track(3)]
    track_list_b = [create_track(1), create_track(2), create_track(3)]
    codeflash_output = sub_tracks(track_list_a, track_list_b)

def test_empty_track_list_a():
    # Test case where track_list_a is empty
    track_list_a = []
    track_list_b = [create_track(1), create_track(2), create_track(3)]
    codeflash_output = sub_tracks(track_list_a, track_list_b)

def test_empty_track_list_b():
    # Test case where track_list_b is empty
    track_list_a = [create_track(1), create_track(2), create_track(3)]
    track_list_b = []
    codeflash_output = sub_tracks(track_list_a, track_list_b)

def test_both_lists_empty():
    # Test case where both lists are empty
    track_list_a = []
    track_list_b = []
    codeflash_output = sub_tracks(track_list_a, track_list_b)

def test_duplicate_ids_in_track_list_b():
    # Test case where track_list_b has duplicate IDs
    track_list_a = [create_track(1), create_track(2), create_track(3)]
    track_list_b = [create_track(2), create_track(2), create_track(3)]
    codeflash_output = sub_tracks(track_list_a, track_list_b)

def test_large_track_list_a_small_track_list_b():
    # Test case with large track_list_a and small track_list_b
    track_list_a = [create_track(i) for i in range(1, 1001)]
    track_list_b = [create_track(i) for i in range(1, 11)]
    codeflash_output = sub_tracks(track_list_a, track_list_b)

def test_small_track_list_a_large_track_list_b():
    # Test case with small track_list_a and large track_list_b
    track_list_a = [create_track(i) for i in range(1, 11)]
    track_list_b = [create_track(i) for i in range(1, 1001)]
    codeflash_output = sub_tracks(track_list_a, track_list_b)

def test_both_lists_large():
    # Test case with both lists large
    track_list_a = [create_track(i) for i in range(1, 1001)]
    track_list_b = [create_track(i) for i in range(1, 1001)]
    codeflash_output = sub_tracks(track_list_a, track_list_b)

def test_non_integer_ids():
    # Test case with non-integer IDs
    track_list_a = [create_track('a'), create_track('b'), create_track('c')]
    track_list_b = [create_track('b')]
    codeflash_output = sub_tracks(track_list_a, track_list_b)

def test_identical_attributes_except_ids():
    # Test case with identical attributes except IDs
    track_list_a = [create_track(1), create_track(2), create_track(3)]
    track_list_b = [create_track(2)]
    codeflash_output = sub_tracks(track_list_a, track_list_b)

def test_all_elements_in_track_list_b_not_in_track_list_a():
    # Test case where all elements in track_list_b are not in track_list_a
    track_list_a = [create_track(1), create_track(2), create_track(3)]
    track_list_b = [create_track(4), create_track(5), create_track(6)]
    codeflash_output = sub_tracks(track_list_a, track_list_b)

if __name__ == "__main__":
    pytest.main()
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.

Codeflash

Certainly! Here is a rewritten and optimized version of this program.



### Explanation.
1. **Set Comprehension for `track_ids_b`**: Using a set comprehension to create `track_ids_b` is efficient.
2. **List Comprehension for Filtering**: Using a list comprehension to filter out the tracks from `track_list_a` that have their `internal_track_id` in `track_ids_b` is both faster and more memory efficient.

This solution avoids the intermediate dictionary creation, which reduces both the time complexity and memory footprint.
@codeflash-ai codeflash-ai bot added the ⚡️ codeflash Optimization PR opened by Codeflash AI label Feb 3, 2025
@codeflash-ai codeflash-ai bot requested a review from misrasaurabh1 February 3, 2025 08:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
⚡️ codeflash Optimization PR opened by Codeflash AI
Projects
None yet
Development

Successfully merging this pull request may close these issues.

0 participants