|
| 1 | +#!/usr/bin/env python3 |
| 2 | + |
| 3 | +import os |
| 4 | +import re |
| 5 | +from typing import Dict |
| 6 | +from datetime import datetime, timedelta, timezone |
| 7 | +from googleapiclient.discovery import build |
| 8 | +import google.generativeai as genai |
| 9 | + |
| 10 | + |
| 11 | +# ——— ENV variables ——— |
| 12 | +YT_API_KEY = os.environ.get('YT_API_KEY') |
| 13 | +GEMINI_API_KEY = os.environ.get('GEMINI_API_KEY') |
| 14 | + |
| 15 | +# ——— CONSTANTS ——— |
| 16 | +SERVICE_TYPE = 'youtube' |
| 17 | +SERVICE_VERSION = 'v3' |
| 18 | +MODEL_NAME = 'gemini-1.5-flash-latest' |
| 19 | + |
| 20 | +DEFAULT_MAX_API_CALLS = 5 |
| 21 | +DEFAULT_MAX_RESULTS_PER_PAGE = 50 |
| 22 | +DEFAULT_MAX_RESULTS = 20 |
| 23 | +DEFAULT_MIN_VIDEO_DURATION_MINUTES = 10 |
| 24 | +DEFAULT_MAX_VIDEO_DURATION_MINUTES = 60 |
| 25 | +DEFAULT_NO_OF_PREV_DAYS = 14 |
| 26 | + |
| 27 | +DEFAULT_MAX_RESULTS = 5 |
| 28 | +REGEX_PATTERN = r'\b(10|[1-9](\.\d+)?|0?\.\d+)\b' |
| 29 | +DEFAULT_SCORE = 5.0 |
| 30 | + |
| 31 | + |
| 32 | +class TimeUtils: |
| 33 | + @staticmethod |
| 34 | + def get_timestamp_n_days_from_now(days: int) -> str: |
| 35 | + """ |
| 36 | + Get the timestamp for a date n days ago in ISO 8601 format. |
| 37 | + """ |
| 38 | + date_before_n_days = datetime.now(timezone.utc) - timedelta(days=days) |
| 39 | + formatted_date = date_before_n_days \ |
| 40 | + .isoformat('T') \ |
| 41 | + .replace('+00:00', 'Z') |
| 42 | + return formatted_date |
| 43 | + |
| 44 | + @staticmethod |
| 45 | + def is_duration_in_mins(duration: str) -> bool: |
| 46 | + """ |
| 47 | + Check if the duration is in minutes. |
| 48 | + """ |
| 49 | + return 'H' in duration or 'M' not in duration |
| 50 | + |
| 51 | + @staticmethod |
| 52 | + def derive_total_seconds_from_duration(duration: str) -> int: |
| 53 | + """ |
| 54 | + Derive total seconds from duration (ISO 8601 format, e.g. "PT5M30S"). |
| 55 | + """ |
| 56 | + parts = duration.split('M') |
| 57 | + mins = int(parts[0]) |
| 58 | + secs = parts[1].replace('S', '') if len(parts) > 1 else '0' |
| 59 | + seconds = int(secs) if secs else 0 |
| 60 | + total_seconds = mins * 60 + seconds |
| 61 | + return total_seconds |
| 62 | + |
| 63 | + @staticmethod |
| 64 | + def is_video_duration_in_range( |
| 65 | + total_seconds: int, |
| 66 | + *, |
| 67 | + min_duration: int = DEFAULT_MIN_VIDEO_DURATION_MINUTES, |
| 68 | + max_duration: int = DEFAULT_MAX_VIDEO_DURATION_MINUTES) -> bool: |
| 69 | + """ |
| 70 | + Check if the video duration is within the specified range in minutes. |
| 71 | + """ |
| 72 | + return min_duration * 60 <= total_seconds <= max_duration * 60 |
| 73 | + |
| 74 | + |
| 75 | +class VideoDetailsExtractor: |
| 76 | + """ |
| 77 | + A class to encapsulate YouTube video extraction logic. |
| 78 | + This class can be extended or modified for more complex behaviors. |
| 79 | + """ |
| 80 | + |
| 81 | + __platform_conn = build( |
| 82 | + serviceName=SERVICE_TYPE, |
| 83 | + version=SERVICE_VERSION, |
| 84 | + developerKey=YT_API_KEY |
| 85 | + ) |
| 86 | + |
| 87 | + def __init__( |
| 88 | + self, |
| 89 | + query: str, |
| 90 | + *, |
| 91 | + no_prev_days: int = DEFAULT_NO_OF_PREV_DAYS, |
| 92 | + max_pages: int = DEFAULT_MAX_API_CALLS, |
| 93 | + max_results: int = DEFAULT_MAX_RESULTS) -> None: |
| 94 | + """ |
| 95 | + Initialize the VideoDetailsExtractor. |
| 96 | + """ |
| 97 | + self.__filtered_videos = [] |
| 98 | + self.__next_page_token = None |
| 99 | + self.__page_count = 0 |
| 100 | + self.__max_pages = max_pages |
| 101 | + |
| 102 | + self.query = query |
| 103 | + self.__targeted_date = TimeUtils \ |
| 104 | + .get_timestamp_n_days_from_now(no_prev_days) |
| 105 | + self.__search_response = self.get_new_search_response() |
| 106 | + self.__max_results = max_results |
| 107 | + |
| 108 | + self.scan_videos() |
| 109 | + |
| 110 | + def get_new_search_response(self) -> dict: |
| 111 | + """ |
| 112 | + Fetch a new search response for the given query. |
| 113 | + """ |
| 114 | + search_config = { |
| 115 | + "q": self.query, |
| 116 | + "part": "id,snippet", |
| 117 | + "type": "video", |
| 118 | + "order": "relevance", |
| 119 | + "publishedAfter": self.__targeted_date, |
| 120 | + "maxResults": DEFAULT_MAX_RESULTS_PER_PAGE, |
| 121 | + "pageToken": self.__next_page_token |
| 122 | + } |
| 123 | + |
| 124 | + new_search_response = VideoDetailsExtractor.__platform_conn \ |
| 125 | + .search() \ |
| 126 | + .list(**search_config) \ |
| 127 | + .execute() |
| 128 | + |
| 129 | + self.__page_count += 1 |
| 130 | + |
| 131 | + return new_search_response |
| 132 | + |
| 133 | + def get_video_ids_from_search_response(self) -> list: |
| 134 | + """ |
| 135 | + Extract video IDs from the search response. |
| 136 | + """ |
| 137 | + items_list = self.__search_response.get('items', []) |
| 138 | + return [item['id']['videoId'] for item in items_list] |
| 139 | + |
| 140 | + def filter_videos(self) -> None: |
| 141 | + """ |
| 142 | + Filter videos based on duration and recency. |
| 143 | + This method processes the search response to filter videos that meet the criteria. |
| 144 | + """ |
| 145 | + video_ids = self.get_video_ids_from_search_response() |
| 146 | + |
| 147 | + if not video_ids: |
| 148 | + print("No video IDs found in the search response.") |
| 149 | + return |
| 150 | + |
| 151 | + details_config = { |
| 152 | + "part": "contentDetails,snippet", |
| 153 | + "id": ",".join(video_ids) |
| 154 | + } |
| 155 | + |
| 156 | + details = VideoDetailsExtractor.__platform_conn \ |
| 157 | + .videos() \ |
| 158 | + .list(**details_config) \ |
| 159 | + .execute() |
| 160 | + |
| 161 | + for item in details.get('items', []): |
| 162 | + try: |
| 163 | + duration = item['contentDetails']['duration'].replace('PT', '') |
| 164 | + |
| 165 | + if TimeUtils.is_duration_in_mins(duration): |
| 166 | + continue |
| 167 | + |
| 168 | + total_seconds = TimeUtils \ |
| 169 | + .derive_total_seconds_from_duration(duration) |
| 170 | + |
| 171 | + if TimeUtils.is_video_duration_in_range(total_seconds): |
| 172 | + video_details = { |
| 173 | + 'id': item['id'], |
| 174 | + 'title': item['snippet']['title'], |
| 175 | + 'duration': total_seconds, |
| 176 | + 'publishedAt': item['snippet']['publishedAt'] |
| 177 | + } |
| 178 | + self.__filtered_videos.append(video_details) |
| 179 | + |
| 180 | + if len(self.__filtered_videos) >= DEFAULT_MAX_RESULTS: |
| 181 | + break |
| 182 | + |
| 183 | + except Exception as e: |
| 184 | + print(f"Error processing video {item.get('id', 'N/A')}: {e}") |
| 185 | + continue |
| 186 | + |
| 187 | + print( |
| 188 | + f"Filtered {len(self.__filtered_videos)} videos based on criteria.") |
| 189 | + |
| 190 | + def has_filtered_videos_reached_limit(self) -> bool: |
| 191 | + """ |
| 192 | + Check if the maximum number of filtered videos has been reached. |
| 193 | + """ |
| 194 | + return len(self.__filtered_videos) < self.__max_results |
| 195 | + |
| 196 | + def has_page_token_reached_limit(self) -> bool: |
| 197 | + """ |
| 198 | + Check if the maximum number of API calls has been reached. |
| 199 | + """ |
| 200 | + return self.__page_count >= self.__max_pages |
| 201 | + |
| 202 | + def update_next_page_token(self) -> None: |
| 203 | + """ |
| 204 | + Update the next page token based on the search response. |
| 205 | + """ |
| 206 | + self.__next_page_token = \ |
| 207 | + self.__search_response.get('nextPageToken', None) |
| 208 | + |
| 209 | + def scan_videos(self) -> None: |
| 210 | + """ |
| 211 | + Scan for videos that meet the specified criteria. |
| 212 | + This method keeps searching until it finds enough videos that meet the criteria |
| 213 | + or exhausts the search results. |
| 214 | + """ |
| 215 | + while self.has_filtered_videos_reached_limit() and self.has_page_token_reached_limit(): |
| 216 | + self.__search_response = self.get_new_search_response() |
| 217 | + self.filter_videos() |
| 218 | + self.update_next_page_token() |
| 219 | + if not self.__next_page_token: |
| 220 | + break |
| 221 | + |
| 222 | + def get_video_details(self) -> list: |
| 223 | + """ |
| 224 | + Fetch video details for a list of filtered video based that were previously computed. |
| 225 | + """ |
| 226 | + if not self.__filtered_videos: |
| 227 | + print('No suitable videos found after applying filters.') |
| 228 | + return self.__filtered_videos |
| 229 | + |
| 230 | + |
| 231 | +class GenModel: |
| 232 | + """ |
| 233 | + A class to encapsulate the Gemini model for scoring video titles. |
| 234 | + This class can be extended or modified for more complex behaviors. |
| 235 | + """ |
| 236 | + _model = None |
| 237 | + |
| 238 | + @classmethod |
| 239 | + def _initialize_model(cls): |
| 240 | + """ |
| 241 | + Initialize the Gemini model if it hasn't been initialized yet. |
| 242 | + """ |
| 243 | + if cls._model is None: |
| 244 | + genai.configure(api_key=GEMINI_API_KEY) |
| 245 | + cls._model = genai.GenerativeModel(MODEL_NAME) |
| 246 | + |
| 247 | + @staticmethod |
| 248 | + def get_prompt_for_title(title: str, query: str) -> str: |
| 249 | + """ |
| 250 | + Generate a prompt for the Gemini model to score the title based on the query. |
| 251 | + """ |
| 252 | + return ( |
| 253 | + f"Query: {query}\n" |
| 254 | + f"Title: {title}\n" |
| 255 | + "Rate relevance & quality 1–10 (just give the number)." |
| 256 | + ) |
| 257 | + |
| 258 | + @classmethod |
| 259 | + def get_score_for_title(cls, title: str, query: str) -> float: |
| 260 | + """ |
| 261 | + Get the score for a video title based on the query using the Gemini model. |
| 262 | + If the model is not initialized, it will initialize it first. |
| 263 | + If the score cannot be parsed, it returns a default score. |
| 264 | + """ |
| 265 | + cls._initialize_model() |
| 266 | + prompt = cls.get_prompt_for_title(title, query) |
| 267 | + try: |
| 268 | + response = cls._model.generate_content(prompt) |
| 269 | + score_text = response.text.strip() |
| 270 | + match = re.search(REGEX_PATTERN, score_text) |
| 271 | + return float(match.group()) if match else DEFAULT_SCORE |
| 272 | + except (ValueError, AttributeError) as e: |
| 273 | + print(f"[Error] Failed to parse score for '{title}': {e}") |
| 274 | + return DEFAULT_SCORE |
| 275 | + except Exception as e: |
| 276 | + print(f"[Error] API call failed for '{title}': {e}") |
| 277 | + return DEFAULT_SCORE |
| 278 | + |
| 279 | + |
| 280 | +class VideoProcessor: |
| 281 | + """ |
| 282 | + A class to process video details and rank them based on a scoring model. |
| 283 | + This class can be extended or modified for more complex behaviors. |
| 284 | + """ |
| 285 | + def __init__(self, scorer=GenModel): |
| 286 | + """ |
| 287 | + Initialize the VideoProcessor with a scoring model. |
| 288 | + """ |
| 289 | + self.scorer = scorer |
| 290 | + |
| 291 | + def find_and_rank_videos(self, query: str, num_results: int = DEFAULT_MAX_RESULTS): |
| 292 | + """ |
| 293 | + Find and rank videos based on the query. |
| 294 | + This method uses the VideoDetailsExtractor to find videos and the scoring model to rank them. |
| 295 | + """ |
| 296 | + videos = VideoDetailsExtractor(query).get_video_details() |
| 297 | + if not videos: |
| 298 | + return [] |
| 299 | + |
| 300 | + for video in videos: |
| 301 | + video['score'] = self.scorer.get_score_for_title( |
| 302 | + video['title'], query) |
| 303 | + |
| 304 | + return sorted(videos, key=lambda v: v['score'], reverse=True)[:num_results] |
| 305 | + |
| 306 | + |
| 307 | +if __name__ == '__main__': |
| 308 | + required_env_vars = ['YT_API_KEY', 'GEMINI_API_KEY'] |
| 309 | + |
| 310 | + if any([env_var not in os.environ for env_var in required_env_vars]): |
| 311 | + raise KeyError( |
| 312 | + "Error: YouTube and/or Gemini API keys not set in environment variables.") |
| 313 | + |
| 314 | + user_query = input("Enter your search: ") |
| 315 | + |
| 316 | + video_processor = VideoProcessor() |
| 317 | + pick_best = video_processor.find_and_rank_videos(user_query) |
| 318 | + |
0 commit comments