Skip to content

⚡️ Speed up function joint_tracks by 12% #60

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from

Conversation

codeflash-ai[bot]
Copy link

@codeflash-ai codeflash-ai bot commented Feb 3, 2025

📄 12% (0.12x) speedup for joint_tracks in supervision/tracker/byte_tracker/core.py

⏱️ Runtime : 189 microseconds 169 microseconds (best of 311 runs)

📝 Explanation and details

o3-mini
I removed the list concatenation (which creates an intermediate list) and replaced it with two separate loops. This avoids the extra allocation and iteration, which can be beneficial especially when the lists are large.

Correctness verification report:

Test Status
⚙️ Existing Unit Tests 🔘 None Found
🌀 Generated Regression Tests 21 Passed
⏪ Replay Tests 🔘 None Found
🔎 Concolic Coverage Tests 🔘 None Found
📊 Tests Coverage 100.0%
🌀 Generated Regression Tests Details
from typing import List

import numpy as np
# function to test
import numpy.typing as npt
# imports
import pytest  # used for our unit tests
from supervision.tracker.byte_tracker.core import joint_tracks
from supervision.tracker.byte_tracker.kalman_filter import KalmanFilter
from supervision.tracker.byte_tracker.single_object_track import TrackState
from supervision.tracker.byte_tracker.utils import IdCounter


class STrack:
    def __init__(
        self,
        tlwh: npt.NDArray[np.float32],
        score: npt.NDArray[np.float32],
        minimum_consecutive_frames: int,
        shared_kalman: KalmanFilter,
        internal_id_counter: IdCounter,
        external_id_counter: IdCounter,
    ):
        self.state = TrackState.New
        self.is_activated = False
        self.start_frame = 0
        self.frame_id = 0

        self._tlwh = np.asarray(tlwh, dtype=np.float32)
        self.kalman_filter = None
        self.shared_kalman = shared_kalman
        self.mean, self.covariance = None, None
        self.is_activated = False

        self.score = score
        self.tracklet_len = 0

        self.minimum_consecutive_frames = minimum_consecutive_frames

        self.internal_id_counter = internal_id_counter
        self.external_id_counter = external_id_counter
        self.internal_track_id = self.internal_id_counter.NO_ID
        self.external_track_id = self.external_id_counter.NO_ID

    def predict(self) -> None:
        mean_state = self.mean.copy()
        if self.state != TrackState.Tracked:
            mean_state[7] = 0
        self.mean, self.covariance = self.kalman_filter.predict(
            mean_state, self.covariance
        )

    @staticmethod
    def multi_predict(stracks: List[STrack], shared_kalman: KalmanFilter) -> None:
        if len(stracks) > 0:
            multi_mean = []
            multi_covariance = []
            for i, st in enumerate(stracks):
                multi_mean.append(st.mean.copy())
                multi_covariance.append(st.covariance)
                if st.state != TrackState.Tracked:
                    multi_mean[i][7] = 0

            multi_mean, multi_covariance = shared_kalman.multi_predict(
                np.asarray(multi_mean), np.asarray(multi_covariance)
            )
            for i, (mean, cov) in enumerate(zip(multi_mean, multi_covariance)):
                stracks[i].mean = mean
                stracks[i].covariance = cov

    def activate(self, kalman_filter: KalmanFilter, frame_id: int) -> None:
        """Start a new tracklet"""
        self.kalman_filter = kalman_filter
        self.internal_track_id = self.internal_id_counter.new_id()
        self.mean, self.covariance = self.kalman_filter.initiate(
            self.tlwh_to_xyah(self._tlwh)
        )

        self.tracklet_len = 0
        self.state = TrackState.Tracked
        if frame_id == 1:
            self.is_activated = True

        if self.minimum_consecutive_frames == 1:
            self.external_track_id = self.external_id_counter.new_id()

        self.frame_id = frame_id
        self.start_frame = frame_id

    def re_activate(self, new_track: STrack, frame_id: int) -> None:
        self.mean, self.covariance = self.kalman_filter.update(
            self.mean, self.covariance, self.tlwh_to_xyah(new_track.tlwh)
        )
        self.tracklet_len = 0
        self.state = TrackState.Tracked

        self.frame_id = frame_id
        self.score = new_track.score

    def update(self, new_track: STrack, frame_id: int) -> None:
        """
        Update a matched track
        :type new_track: STrack
        :type frame_id: int
        :type update_feature: bool
        :return:
        """
        self.frame_id = frame_id
        self.tracklet_len += 1

        new_tlwh = new_track.tlwh
        self.mean, self.covariance = self.kalman_filter.update(
            self.mean, self.covariance, self.tlwh_to_xyah(new_tlwh)
        )
        self.state = TrackState.Tracked
        if self.tracklet_len == self.minimum_consecutive_frames:
            self.is_activated = True
            if self.external_track_id == self.external_id_counter.NO_ID:
                self.external_track_id = self.external_id_counter.new_id()

        self.score = new_track.score

    @property
    def tlwh(self) -> npt.NDArray[np.float32]:
        """Get current position in bounding box format `(top left x, top left y,
        width, height)`.
        """
        if self.mean is None:
            return self._tlwh.copy()
        ret = self.mean[:4].copy()
        ret[2] *= ret[3]
        ret[:2] -= ret[2:] / 2
        return ret

    @property
    def tlbr(self) -> npt.NDArray[np.float32]:
        """Convert bounding box to format `(min x, min y, max x, max y)`, i.e.,
        `(top left, bottom right)`.
        """
        ret = self.tlwh.copy()
        ret[2:] += ret[:2]
        return ret

    @staticmethod
    def tlwh_to_xyah(tlwh) -> npt.NDArray[np.float32]:
        """Convert bounding box to format `(center x, center y, aspect ratio,
        height)`, where the aspect ratio is `width / height`.
        """
        ret = np.asarray(tlwh).copy()
        ret[:2] += ret[2:] / 2
        ret[2] /= ret[3]
        return ret

    def to_xyah(self) -> npt.NDArray[np.float32]:
        return self.tlwh_to_xyah(self.tlwh)

    @staticmethod
    def tlbr_to_tlwh(tlbr) -> npt.NDArray[np.float32]:
        ret = np.asarray(tlbr).copy()
        ret[2:] -= ret[:2]
        return ret

    @staticmethod
    def tlwh_to_tlbr(tlwh) -> npt.NDArray[np.float32]:
        ret = np.asarray(tlwh).copy()
        ret[2:] += ret[:2]
        return ret

    def __repr__(self) -> str:
        return "OT_{}_({}-{})".format(
            self.internal_track_id, self.start_frame, self.frame_id
        )
from supervision.tracker.byte_tracker.core import joint_tracks

# unit tests

# Helper function to create a dummy STrack object
def create_dummy_track(internal_id, external_id, shared_kalman, internal_id_counter, external_id_counter):
    return STrack(
        tlwh=np.array([0, 0, 0, 0], dtype=np.float32),
        score=np.array([0], dtype=np.float32),
        minimum_consecutive_frames=1,
        shared_kalman=shared_kalman,
        internal_id_counter=internal_id_counter,
        external_id_counter=external_id_counter,
    )

# Test cases

def test_combining_non_overlapping_lists():
    # Setup
    shared_kalman = KalmanFilter()
    internal_id_counter = IdCounter()
    external_id_counter = IdCounter()
    
    track_list_a = [create_dummy_track(i, i, shared_kalman, internal_id_counter, external_id_counter) for i in range(2)]
    track_list_b = [create_dummy_track(i + 2, i + 2, shared_kalman, internal_id_counter, external_id_counter) for i in range(2)]
    
    # Execute
    codeflash_output = joint_tracks(track_list_a, track_list_b)

def test_combining_overlapping_lists():
    # Setup
    shared_kalman = KalmanFilter()
    internal_id_counter = IdCounter()
    external_id_counter = IdCounter()
    
    track_list_a = [create_dummy_track(1, 1, shared_kalman, internal_id_counter, external_id_counter)]
    track_list_b = [create_dummy_track(1, 1, shared_kalman, internal_id_counter, external_id_counter)]
    
    # Execute
    codeflash_output = joint_tracks(track_list_a, track_list_b)

def test_empty_lists():
    # Setup
    track_list_a = []
    track_list_b = []
    
    # Execute
    codeflash_output = joint_tracks(track_list_a, track_list_b)

def test_one_empty_list():
    # Setup
    shared_kalman = KalmanFilter()
    internal_id_counter = IdCounter()
    external_id_counter = IdCounter()
    
    track_list_a = [create_dummy_track(1, 1, shared_kalman, internal_id_counter, external_id_counter)]
    track_list_b = []
    
    # Execute
    codeflash_output = joint_tracks(track_list_a, track_list_b)

def test_all_duplicates():
    # Setup
    shared_kalman = KalmanFilter()
    internal_id_counter = IdCounter()
    external_id_counter = IdCounter()
    
    track_list_a = [create_dummy_track(1, 1, shared_kalman, internal_id_counter, external_id_counter)]
    track_list_b = [create_dummy_track(1, 1, shared_kalman, internal_id_counter, external_id_counter)]
    
    # Execute
    codeflash_output = joint_tracks(track_list_a, track_list_b)

def test_partial_duplicates():
    # Setup
    shared_kalman = KalmanFilter()
    internal_id_counter = IdCounter()
    external_id_counter = IdCounter()
    
    track_list_a = [create_dummy_track(1, 1, shared_kalman, internal_id_counter, external_id_counter)]
    track_list_b = [create_dummy_track(1, 1, shared_kalman, internal_id_counter, external_id_counter), create_dummy_track(2, 2, shared_kalman, internal_id_counter, external_id_counter)]
    
    # Execute
    codeflash_output = joint_tracks(track_list_a, track_list_b)

def test_large_lists_no_duplicates():
    # Setup
    shared_kalman = KalmanFilter()
    internal_id_counter = IdCounter()
    external_id_counter = IdCounter()
    
    track_list_a = [create_dummy_track(i, i, shared_kalman, internal_id_counter, external_id_counter) for i in range(1000)]
    track_list_b = [create_dummy_track(i + 1000, i + 1000, shared_kalman, internal_id_counter, external_id_counter) for i in range(1000)]
    
    # Execute
    codeflash_output = joint_tracks(track_list_a, track_list_b)

def test_large_lists_some_duplicates():
    # Setup
    shared_kalman = KalmanFilter()
    internal_id_counter = IdCounter()
    external_id_counter = IdCounter()
    
    track_list_a = [create_dummy_track(i, i, shared_kalman, internal_id_counter, external_id_counter) for i in range(1000)]
    track_list_b = [create_dummy_track(i, i, shared_kalman, internal_id_counter, external_id_counter) for i in range(500, 1500)]
    
    # Execute
    codeflash_output = joint_tracks(track_list_a, track_list_b)

def test_stress_test_maximum_size():
    # Setup
    shared_kalman = KalmanFilter()
    internal_id_counter = IdCounter()
    external_id_counter = IdCounter()
    
    track_list_a = [create_dummy_track(i, i, shared_kalman, internal_id_counter, external_id_counter) for i in range(1000)]
    track_list_b = [create_dummy_track(i + 100000, i + 100000, shared_kalman, internal_id_counter, external_id_counter) for i in range(1000)]
    
    # Execute
    codeflash_output = joint_tracks(track_list_a, track_list_b)

def test_handling_different_data_types():
    # Setup
    shared_kalman = KalmanFilter()
    internal_id_counter = IdCounter()
    external_id_counter = IdCounter()
    
    track_list_a = [create_dummy_track('a', 'a', shared_kalman, internal_id_counter, external_id_counter)]
    track_list_b = [create_dummy_track('b', 'b', shared_kalman, internal_id_counter, external_id_counter)]
    
    # Execute
    codeflash_output = joint_tracks(track_list_a, track_list_b)

def test_invalid_track_objects():
    # Setup
    shared_kalman = KalmanFilter()
    internal_id_counter = IdCounter()
    external_id_counter = IdCounter()
    
    track_list_a = [create_dummy_track(1, 1, shared_kalman, internal_id_counter, external_id_counter)]
    track_list_b = ['invalid_track']
    
    # Execute & Verify
    with pytest.raises(AttributeError):
        joint_tracks(track_list_a, track_list_b)

def test_malformed_track_objects():
    # Setup
    class MalformedTrack:
        pass

    shared_kalman = KalmanFilter()
    internal_id_counter = IdCounter()
    external_id_counter = IdCounter()
    
    track_list_a = [create_dummy_track(1, 1, shared_kalman, internal_id_counter, external_id_counter)]
    track_list_b = [MalformedTrack()]
    
    # Execute & Verify
    with pytest.raises(AttributeError):
        joint_tracks(track_list_a, track_list_b)
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.

from typing import List

import numpy as np
# imports
import pytest  # used for our unit tests
from supervision.tracker.byte_tracker.core import joint_tracks
# function to test
from supervision.tracker.byte_tracker.kalman_filter import KalmanFilter
from supervision.tracker.byte_tracker.single_object_track import TrackState
from supervision.tracker.byte_tracker.utils import IdCounter


class STrack:
    def __init__(
        self,
        tlwh: np.ndarray,
        score: np.ndarray,
        minimum_consecutive_frames: int,
        shared_kalman: KalmanFilter,
        internal_id_counter: IdCounter,
        external_id_counter: IdCounter,
    ):
        self.state = TrackState.New
        self.is_activated = False
        self.start_frame = 0
        self.frame_id = 0

        self._tlwh = np.asarray(tlwh, dtype=np.float32)
        self.kalman_filter = None
        self.shared_kalman = shared_kalman
        self.mean, self.covariance = None, None
        self.is_activated = False

        self.score = score
        self.tracklet_len = 0

        self.minimum_consecutive_frames = minimum_consecutive_frames

        self.internal_id_counter = internal_id_counter
        self.external_id_counter = external_id_counter
        self.internal_track_id = self.internal_id_counter.NO_ID
        self.external_track_id = self.external_id_counter.NO_ID

    def predict(self) -> None:
        mean_state = self.mean.copy()
        if self.state != TrackState.Tracked:
            mean_state[7] = 0
        self.mean, self.covariance = self.kalman_filter.predict(
            mean_state, self.covariance
        )

    @staticmethod
    def multi_predict(stracks: List[STrack], shared_kalman: KalmanFilter) -> None:
        if len(stracks) > 0:
            multi_mean = []
            multi_covariance = []
            for i, st in enumerate(stracks):
                multi_mean.append(st.mean.copy())
                multi_covariance.append(st.covariance)
                if st.state != TrackState.Tracked:
                    multi_mean[i][7] = 0

            multi_mean, multi_covariance = shared_kalman.multi_predict(
                np.asarray(multi_mean), np.asarray(multi_covariance)
            )
            for i, (mean, cov) in enumerate(zip(multi_mean, multi_covariance)):
                stracks[i].mean = mean
                stracks[i].covariance = cov

    def activate(self, kalman_filter: KalmanFilter, frame_id: int) -> None:
        """Start a new tracklet"""
        self.kalman_filter = kalman_filter
        self.internal_track_id = self.internal_id_counter.new_id()
        self.mean, self.covariance = self.kalman_filter.initiate(
            self.tlwh_to_xyah(self._tlwh)
        )

        self.tracklet_len = 0
        self.state = TrackState.Tracked
        if frame_id == 1:
            self.is_activated = True

        if self.minimum_consecutive_frames == 1:
            self.external_track_id = self.external_id_counter.new_id()

        self.frame_id = frame_id
        self.start_frame = frame_id

    def re_activate(self, new_track: STrack, frame_id: int) -> None:
        self.mean, self.covariance = self.kalman_filter.update(
            self.mean, self.covariance, self.tlwh_to_xyah(new_track.tlwh)
        )
        self.tracklet_len = 0
        self.state = TrackState.Tracked

        self.frame_id = frame_id
        self.score = new_track.score

    def update(self, new_track: STrack, frame_id: int) -> None:
        """
        Update a matched track
        :type new_track: STrack
        :type frame_id: int
        :type update_feature: bool
        :return:
        """
        self.frame_id = frame_id
        self.tracklet_len += 1

        new_tlwh = new_track.tlwh
        self.mean, self.covariance = self.kalman_filter.update(
            self.mean, self.covariance, self.tlwh_to_xyah(new_tlwh)
        )
        self.state = TrackState.Tracked
        if self.tracklet_len == self.minimum_consecutive_frames:
            self.is_activated = True
            if self.external_track_id == self.external_id_counter.NO_ID:
                self.external_track_id = self.external_id_counter.new_id()

        self.score = new_track.score

    @property
    def tlwh(self) -> np.ndarray:
        """Get current position in bounding box format `(top left x, top left y,
        width, height)`.
        """
        if self.mean is None:
            return self._tlwh.copy()
        ret = self.mean[:4].copy()
        ret[2] *= ret[3]
        ret[:2] -= ret[2:] / 2
        return ret

    @property
    def tlbr(self) -> np.ndarray:
        """Convert bounding box to format `(min x, min y, max x, max y)`, i.e.,
        `(top left, bottom right)`.
        """
        ret = self.tlwh.copy()
        ret[2:] += ret[:2]
        return ret

    @staticmethod
    def tlwh_to_xyah(tlwh) -> np.ndarray:
        """Convert bounding box to format `(center x, center y, aspect ratio,
        height)`, where the aspect ratio is `width / height`.
        """
        ret = np.asarray(tlwh).copy()
        ret[:2] += ret[2:] / 2
        ret[2] /= ret[3]
        return ret

    def to_xyah(self) -> np.ndarray:
        return self.tlwh_to_xyah(self.tlwh)

    @staticmethod
    def tlbr_to_tlwh(tlbr) -> np.ndarray:
        ret = np.asarray(tlbr).copy()
        ret[2:] -= ret[:2]
        return ret

    @staticmethod
    def tlwh_to_tlbr(tlwh) -> np.ndarray:
        ret = np.asarray(tlwh).copy()
        ret[2:] += ret[:2]
        return ret

    def __repr__(self) -> str:
        return "OT_{}_({}-{})".format(
            self.internal_track_id, self.start_frame, self.frame_id
        )
from supervision.tracker.byte_tracker.core import joint_tracks


# unit tests
def test_joint_tracks_both_empty():
    codeflash_output = joint_tracks([], [])

def test_joint_tracks_one_empty():
    track_a = STrack([0, 0, 1, 1], 0.9, 1, None, IdCounter(), IdCounter())
    codeflash_output = joint_tracks([track_a], [])
    codeflash_output = joint_tracks([], [track_a])

def test_joint_tracks_no_duplicates():
    track_a = STrack([0, 0, 1, 1], 0.9, 1, None, IdCounter(), IdCounter())
    track_b = STrack([1, 1, 1, 1], 0.8, 1, None, IdCounter(), IdCounter())
    track_a.internal_track_id = 1
    track_b.internal_track_id = 2
    codeflash_output = joint_tracks([track_a], [track_b])

def test_joint_tracks_with_duplicates():
    track_a = STrack([0, 0, 1, 1], 0.9, 1, None, IdCounter(), IdCounter())
    track_b = STrack([1, 1, 1, 1], 0.8, 1, None, IdCounter(), IdCounter())
    track_a.internal_track_id = 1
    track_b.internal_track_id = 1
    codeflash_output = joint_tracks([track_a], [track_b])

def test_joint_tracks_large_number_of_tracks():
    tracks_a = [STrack([i, i, 1, 1], 0.9, 1, None, IdCounter(), IdCounter()) for i in range(1000)]
    tracks_b = [STrack([i, i, 1, 1], 0.9, 1, None, IdCounter(), IdCounter()) for i in range(1000, 2000)]
    for i, track in enumerate(tracks_a):
        track.internal_track_id = i
    for i, track in enumerate(tracks_b):
        track.internal_track_id = i + 1000
    codeflash_output = joint_tracks(tracks_a, tracks_b)

def test_joint_tracks_interleaved_duplicates():
    track_a1 = STrack([0, 0, 1, 1], 0.9, 1, None, IdCounter(), IdCounter())
    track_a2 = STrack([1, 1, 1, 1], 0.8, 1, None, IdCounter(), IdCounter())
    track_b1 = STrack([2, 2, 1, 1], 0.7, 1, None, IdCounter(), IdCounter())
    track_b2 = STrack([3, 3, 1, 1], 0.6, 1, None, IdCounter(), IdCounter())
    track_a1.internal_track_id = 1
    track_a2.internal_track_id = 3
    track_b1.internal_track_id = 2
    track_b2.internal_track_id = 3
    codeflash_output = joint_tracks([track_a1, track_a2], [track_b1, track_b2])

def test_joint_tracks_mixed_unique_and_duplicates():
    track_a1 = STrack([0, 0, 1, 1], 0.9, 1, None, IdCounter(), IdCounter())
    track_a2 = STrack([1, 1, 1, 1], 0.8, 1, None, IdCounter(), IdCounter())
    track_b1 = STrack([2, 2, 1, 1], 0.7, 1, None, IdCounter(), IdCounter())
    track_b2 = STrack([3, 3, 1, 1], 0.6, 1, None, IdCounter(), IdCounter())
    track_a1.internal_track_id = 1
    track_a2.internal_track_id = 2
    track_b1.internal_track_id = 3
    track_b2.internal_track_id = 2
    codeflash_output = joint_tracks([track_a1, track_a2], [track_b1, track_b2])

def test_joint_tracks_single_element_lists():
    track_a = STrack([0, 0, 1, 1], 0.9, 1, None, IdCounter(), IdCounter())
    track_b = STrack([1, 1, 1, 1], 0.8, 1, None, IdCounter(), IdCounter())
    track_a.internal_track_id = 1
    track_b.internal_track_id = 2
    codeflash_output = joint_tracks([track_a], [track_b])

Codeflash

o3-mini
I removed the list concatenation (which creates an intermediate list) and replaced it with two separate loops. This avoids the extra allocation and iteration, which can be beneficial especially when the lists are large.
@codeflash-ai codeflash-ai bot added the ⚡️ codeflash Optimization PR opened by Codeflash AI label Feb 3, 2025
@codeflash-ai codeflash-ai bot requested a review from misrasaurabh1 February 3, 2025 08:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
⚡️ codeflash Optimization PR opened by Codeflash AI
Projects
None yet
Development

Successfully merging this pull request may close these issues.

0 participants