From 97e65524a67cf4e7f32d80efff038ea227475a49 Mon Sep 17 00:00:00 2001 From: kartikvirendrar Date: Tue, 9 Sep 2025 14:41:57 +0530 Subject: [PATCH] added support for s3 buckets with azure containers --- ai-services/background-music-api/main.py | 54 +++----- backend/organization/tasks.py | 36 ++--- backend/organization/utils.py | 31 ++--- backend/project/utils.py | 32 ++--- backend/transcript/tasks.py | 2 - backend/transcript/utils/ytt_align.py | 44 +++---- backend/utils/storage_factory.py | 118 +++++++++++++++++ backend/voiceover/utils.py | 160 +++++++++-------------- backend/youtube/utils.py | 1 - 9 files changed, 259 insertions(+), 219 deletions(-) create mode 100644 backend/utils/storage_factory.py diff --git a/ai-services/background-music-api/main.py b/ai-services/background-music-api/main.py index 934fec6f..67ea8eb9 100644 --- a/ai-services/background-music-api/main.py +++ b/ai-services/background-music-api/main.py @@ -3,10 +3,10 @@ from yt_dlp.utils import DownloadError from yt_dlp.extractor import get_info_extractor from moviepy.editor import VideoFileClip, AudioFileClip, concatenate_audioclips +from utils.storage_factory import get_storage_provider from spleeter.separator import Separator import shutil from moviepy.video.io.ffmpeg_tools import ffmpeg_extract_subclip -from azure.storage.blob import BlobServiceClient from config import ( storage_account_key, connection_string, @@ -102,36 +102,26 @@ def utils_add_bg_music(file_path, video_link): def upload_audio_to_azure_blob(file_path, export_type, export): - blob_service_client = BlobServiceClient.from_connection_string(connection_string) if export == False: AudioSegment.from_wav(file_path + "final.wav").export( file_path + "final.flac", format="flac" ) - full_path_audio = file_path + "final.flac" - blob_client_audio = blob_service_client.get_blob_client( - container=container_name, blob=file_path.split("/")[-1] + ".flac" - ) + local_file_to_upload = file_path + "final.flac" + remote_file_name = file_path.split("/")[-1] + ".flac" else: - full_path_audio = file_path.replace(".flac", "") + "." + export_type - blob_client_audio = blob_service_client.get_blob_client( - container=container_name, - blob=file_path.split("/")[-1].replace(".flac", "") + "." + export_type, - ) - with open(full_path_audio, "rb") as data: - try: - if not blob_client_audio.exists(): - blob_client_audio.upload_blob(data) - print("Audio uploaded successfully!") - print(blob_client_audio.url) - else: - blob_client_audio.delete_blob() - print("Old Audio deleted successfully!") - blob_client_audio.upload_blob(data) - print("New audio uploaded successfully!") - except Exception as e: - print("This audio can't be uploaded") - return blob_client_audio.url + local_file_to_upload = file_path.replace(".flac", "") + "." + export_type + remote_file_name = file_path.split("/")[-1].replace(".flac", "") + "." + export_type + storage = get_storage_provider() + + try: + url = storage.upload(local_file_to_upload, remote_file_name) + print("Audio uploaded successfully!") + print(url) + return url + except Exception as e: + print("This audio can't be uploaded") + return None if __name__ == "__main__": os.mkdir("temporary_audio_storage") @@ -143,16 +133,12 @@ class BGMusicRequest(BaseModel): def download_from_azure_blob(file_path): - blob_service_client = BlobServiceClient.from_connection_string(connection_string) - encoded_file_path = file_path.split("/")[-1] - encoded_url_path = urllib.parse.unquote(encoded_file_path) - blob_client = blob_service_client.get_blob_client( - container=container_name, blob=encoded_url_path - ) - with open(file=file_path.split("/")[-1], mode="wb") as sample_blob: - download_stream = blob_client.download_blob() - sample_blob.write(download_stream.readall()) + remote_file_path = urllib.parse.unquote(file_path.split("/")[-1]) + local_destination_path = file_path.split("/")[-1] + + storage = get_storage_provider() + storage.download(remote_file_path, local_destination_path) @app.post("/add_background_music") async def add_background_music(audio_request: BGMusicRequest): diff --git a/backend/organization/tasks.py b/backend/organization/tasks.py index 16cbcec1..804feeba 100644 --- a/backend/organization/tasks.py +++ b/backend/organization/tasks.py @@ -10,9 +10,8 @@ get_org_report_projects_email, ) from users.models import User -from azure.storage.blob import BlobServiceClient -from datetime import datetime, timedelta -from config import storage_account_key, connection_string, reports_container_name +from utils.storage_factory import get_storage_provider +from datetime import datetime, timedelta, timezone @shared_task() @@ -41,28 +40,13 @@ def send_email_with_projects_report(org_id, user_id): @shared_task(name="delete_reports") def delete_reports(): - blob_service_client = BlobServiceClient.from_connection_string(connection_string) - container_client = blob_service_client.get_container_client(reports_container_name) + storage = get_storage_provider(reports_container=True) + + one_week_ago_date = (datetime.now(timezone.utc) - timedelta(days=7)).date() - current_date = datetime.now() + objects = storage.list_objects() - # Calculate one week ago - one_week_ago = current_date - timedelta(days=7) - - # Convert the specific date to UTC format - specific_date_utc = datetime.strptime(one_week_ago, "%Y-%m-%d").replace( - tzinfo=datetime.timezone.utc - ) - - # List all blobs in the container - blobs = container_client.list_blobs() - - for blob in blobs: - properties = blob.get_blob_properties() - last_modified = properties["last_modified"].astimezone(datetime.timezone.utc) - - # Check if the blob was created on the specific date - if last_modified.date() == specific_date_utc.date(): - blob_client = container_client.get_blob_client(blob.name) - blob_client.delete_blob() - print(f"Deleted: {blob.name}") + for obj in objects: + if obj.last_modified.date() == one_week_ago_date: + storage.delete(obj.name) + print(f"Deleted: {obj.name}") \ No newline at end of file diff --git a/backend/organization/utils.py b/backend/organization/utils.py index 115bd916..5ca517be 100644 --- a/backend/organization/utils.py +++ b/backend/organization/utils.py @@ -27,7 +27,7 @@ from voiceover.models import VoiceOver from video.models import Video from task.models import Task -from azure.storage.blob import BlobServiceClient +from utils.storage_factory import get_storage_provider from config import storage_account_key, connection_string, reports_container_name from django.conf import settings import logging @@ -37,27 +37,20 @@ def send_mail_with_report(subject, body, user, csv_file_paths): - blob_service_client = BlobServiceClient.from_connection_string(connection_string) + storage = get_storage_provider(reports_container=True) report_urls = [] for file_path in csv_file_paths: - blob_client = blob_service_client.get_blob_client( - container=reports_container_name, blob=file_path - ) - with open(file_path, "rb") as data: - try: - if not blob_client.exists(): - blob_client.upload_blob(data) - logging.info("Report uploaded successfully!") - logging.info(blob_client.url) - else: - blob_client.delete_blob() - logging.info("Old Report deleted successfully!") - blob_client.upload_blob(data) - logging.info("New Report uploaded successfully!") - except Exception as e: - logging.info("This report can't be uploaded") - report_urls.append(blob_client.url) + local_file = file_path + remote_file = file_path + + try: + url = storage.upload(local_file, remote_file) + logging.info("Report uploaded successfully!") + logging.info(url) + report_urls.append(url) + except Exception as e: + logging.info("This report can't be uploaded") if len(report_urls) == 1: try: diff --git a/backend/project/utils.py b/backend/project/utils.py index cbe9c4f8..fce1e537 100644 --- a/backend/project/utils.py +++ b/backend/project/utils.py @@ -21,12 +21,12 @@ import logging import os from translation.metadata import TRANSLATION_LANGUAGE_CHOICES +from utils.storage_factory import get_storage_provider from voiceover.metadata import VOICEOVER_LANGUAGE_CHOICES from transcript.models import Transcript from translation.models import Translation from voiceover.models import VoiceOver from video.models import Video -from azure.storage.blob import BlobServiceClient from config import storage_account_key, connection_string, reports_container_name from django.conf import settings @@ -227,27 +227,21 @@ def get_reports_for_users(pk, start, end): def send_mail_with_report(subject, body, user, csv_file_paths): - blob_service_client = BlobServiceClient.from_connection_string(connection_string) + storage = get_storage_provider(reports_container=True) report_urls = [] for file_path in csv_file_paths: - blob_client = blob_service_client.get_blob_client( - container=reports_container_name, blob=file_path - ) - with open(file_path, "rb") as data: - try: - if not blob_client.exists(): - blob_client.upload_blob(data) - logging.info("Report uploaded successfully!") - logging.info(blob_client.url) - else: - blob_client.delete_blob() - logging.info("Old Report deleted successfully!") - blob_client.upload_blob(data) - logging.info("New Report uploaded successfully!") - except Exception as e: - logging.info("This report can't be uploaded") - report_urls.append(blob_client.url) + local_file = file_path + remote_file = file_path + + try: + url = storage.upload(local_file, remote_file) + logging.info("Report uploaded successfully!") + logging.info(url) + report_urls.append(url) + except Exception as e: + logging.info("This report can't be uploaded") + if len(report_urls) == 1: try: diff --git a/backend/transcript/tasks.py b/backend/transcript/tasks.py index e1533d73..81c622b1 100644 --- a/backend/transcript/tasks.py +++ b/backend/transcript/tasks.py @@ -3,8 +3,6 @@ from backend.celery import celery_app import json import logging -from azure.storage.blob import BlobServiceClient -import logging from config import ( storage_account_key, connection_string, diff --git a/backend/transcript/utils/ytt_align.py b/backend/transcript/utils/ytt_align.py index 35416e5b..dc13605b 100644 --- a/backend/transcript/utils/ytt_align.py +++ b/backend/transcript/utils/ytt_align.py @@ -5,7 +5,7 @@ import subprocess import json from config import align_json_url -from azure.storage.blob import BlobServiceClient +from utils.storage_factory import get_storage_provider import logging from config import ( storage_account_key, @@ -54,27 +54,27 @@ def align_json_api(transcript_obj): def download_ytt_from_azure(file_name): - blob_service_client = BlobServiceClient.from_connection_string(connection_string) - blob_client = blob_service_client.get_blob_client( - container=container_name, blob=file_name - ) - with open(file=file_name, mode="wb") as sample_blob: - download_stream = blob_client.download_blob() - sample_blob.write(download_stream.readall()) + storage = get_storage_provider() + + remote_object_name = file_name + local_destination_path = file_name + + storage.download(remote_object_name, local_destination_path) def upload_ytt_to_azure(transcript_obj, file_name): - blob_service_client = BlobServiceClient.from_connection_string(connection_string) - blob_client_json = blob_service_client.get_blob_client( - container=container_name, blob=file_name - ) - with open(file_name, "rb") as data: - if not blob_client_json.exists(): - blob_client_json.upload_blob(data) - logging.info(blob_client_json.url) - transcript_obj.payload["ytt_azure_url"] = blob_client_json.url - transcript_obj.save() - else: - blob_client_json.delete_blob() - blob_client_json.upload_blob(data) - logging.info(blob_client_json.url) + storage = get_storage_provider() + + local_file = file_name + remote_file = file_name + + file_exists = storage.exists(remote_file) + + url = storage.upload(local_file, remote_file) + + if not file_exists: + logging.info(url) + transcript_obj.payload["ytt_azure_url"] = url + transcript_obj.save() + else: + logging.info(url) \ No newline at end of file diff --git a/backend/utils/storage_factory.py b/backend/utils/storage_factory.py new file mode 100644 index 00000000..e66ec174 --- /dev/null +++ b/backend/utils/storage_factory.py @@ -0,0 +1,118 @@ +import os +from abc import ABC, abstractmethod +import boto3 +from botocore.exceptions import ClientError +from azure.storage.blob import BlobServiceClient + +class StorageProvider(ABC): + + @abstractmethod + def upload(self, local_file_path: str, remote_file_path: str) -> str: + pass + + @abstractmethod + def download(self, remote_file_path: str, local_file_path: str): + pass + + @abstractmethod + def delete(self, remote_file_path: str): + pass + + @abstractmethod + def exists(self, remote_file_path: str) -> bool: + pass + + @abstractmethod + def read_bytes(self, remote_file_path: str) -> bytes: + """Reads the content of a remote file into memory as bytes.""" + pass + + +class S3StorageProvider(StorageProvider): + def __init__(self, bucket_name: str, region: str): + self.s3_resource = boto3.resource('s3') + self.bucket_name = bucket_name + self.region = region + + def upload(self, local_file_path: str, remote_file_path: str) -> str: + s3_object = self.s3_resource.Object(self.bucket_name, remote_file_path) + s3_object.upload_file(local_file_path) + return f"https://{self.bucket_name}.s3.{self.region}.amazonaws.com/{remote_file_path}" + + def download(self, remote_file_path: str, local_file_path: str): + s3_object = self.s3_resource.Object(self.bucket_name, remote_file_path) + s3_object.download_file(local_file_path) + + def delete(self, remote_file_path: str): + s3_object = self.s3_resource.Object(self.bucket_name, remote_file_path) + s3_object.delete() + + def exists(self, remote_file_path: str) -> bool: + try: + self.s3_resource.Object(self.bucket_name, remote_file_path).load() + return True + except ClientError as e: + if e.response['Error']['Code'] == '404': + return False + else: + raise + + def read_bytes(self, remote_file_path: str) -> bytes: + s3_object = self.s3_resource.Object(self.bucket_name, remote_file_path) + return s3_object.get()['Body'].read() + +class AzureStorageProvider(StorageProvider): + def __init__(self, connection_string: str, container_name: str): + self.blob_service_client = BlobServiceClient.from_connection_string(connection_string) + self.container_name = container_name + + def upload(self, local_file_path: str, remote_file_path: str) -> str: + blob_client = self.blob_service_client.get_blob_client( + container=self.container_name, blob=remote_file_path + ) + with open(local_file_path, "rb") as data: + blob_client.upload_blob(data, overwrite=True) + return blob_client.url + + def download(self, remote_file_path: str, local_file_path: str): + blob_client = self.blob_service_client.get_blob_client( + container=self.container_name, blob=remote_file_path + ) + with open(file=local_file_path, mode="wb") as local_blob: + download_stream = blob_client.download_blob() + local_blob.write(download_stream.readall()) + + def delete(self, remote_file_path: str): + blob_client = self.blob_service_client.get_blob_client( + container=self.container_name, blob=remote_file_path + ) + blob_client.delete_blob() + + def exists(self, remote_file_path: str) -> bool: + blob_client = self.blob_service_client.get_blob_client( + container=self.container_name, blob=remote_file_path + ) + return blob_client.exists() + + def read_bytes(self, remote_file_path: str) -> bytes: + blob_client = self.blob_service_client.get_blob_client( + container=self.container_name, blob=remote_file_path + ) + download_stream = blob_client.download_blob() + return download_stream.readall() + +def get_storage_provider(reports_container=False) -> StorageProvider: + provider = os.environ.get("STORAGE_PROVIDER", "AZURE") + + if provider == "S3": + return S3StorageProvider( + bucket_name=os.environ["AMAZON_S3_BUCKET"] if not reports_container else os.environ["AMAZON_S3_BUCKET_REPORTS"], + region=os.environ["AWS_REGION"] + ) + elif provider == "AZURE": + return AzureStorageProvider( + connection_string=os.environ["AZURE_STORAGE_CONNECTION_STRING"], + container_name=os.environ["AZURE_STORAGE_CONTAINER_NAME"] if not reports_container else os.environ["AZURE_STORAGE_REPORTS_CONTAINER_NAME"] + ) + else: + raise ValueError(f"Unknown storage provider: {provider}") \ No newline at end of file diff --git a/backend/voiceover/utils.py b/backend/voiceover/utils.py index d7e65907..5bb99a07 100644 --- a/backend/voiceover/utils.py +++ b/backend/voiceover/utils.py @@ -2,7 +2,6 @@ from uuid import UUID import uuid import json -from azure.storage.blob import BlobServiceClient import logging from config import ( storage_account_key, @@ -18,6 +17,7 @@ ) from pydub import AudioSegment import io +from utils.storage_factory import get_storage_provider from video.models import Video from datetime import datetime, date, timedelta import os @@ -73,91 +73,67 @@ def validate_uuid4(val): def download_from_azure_blob(file_path): - blob_service_client = BlobServiceClient.from_connection_string(connection_string) - encoded_file_path = file_path.split("/")[-1] - encoded_url_path = urllib.parse.unquote(encoded_file_path) - blob_client = blob_service_client.get_blob_client( - container=container_name, blob=encoded_url_path - ) - with open(file=file_path.split("/")[-1], mode="wb") as sample_blob: - download_stream = blob_client.download_blob() - sample_blob.write(download_stream.readall()) + local_destination_path = file_path.split("/")[-1] + remote_file_path = urllib.parse.unquote(local_destination_path) + storage = get_storage_provider() -def download_json_from_azure_blob(app_name, video_id, task_id, target_language): - blob_service_client = BlobServiceClient.from_connection_string(connection_string) - container_client = blob_service_client.get_container_client(container_name) + storage.download(remote_file_path, local_destination_path) - # Create the exact filename +def download_json_from_azure_blob(app_name, video_id, task_id, target_language): file_name = "{}_Video_{}_{}_{}.json".format( app_name, video_id, task_id, target_language ) print(file_name) - # Get blob client - blob_client = blob_service_client.get_blob_client( - container=container_name, blob=file_name - ) - # Download the blob + storage = get_storage_provider() + try: - download_stream = blob_client.download_blob() - print(download_stream) - file_content = download_stream.readall().decode("utf-8") + file_content_bytes = storage.read_bytes(file_name) + + file_content = file_content_bytes.decode("utf-8") json_data = json.loads(file_content) return json_data except Exception as e: raise FileNotFoundError( - f"File {file_name} not found in the container. Error: {str(e)}" + f"File {file_name} not found in the storage container/bucket. Error: {str(e)}" ) def upload_video(file_path): - full_path = file_path + ".mp4" - blob_service_client = BlobServiceClient.from_connection_string(connection_string) - blob_client = blob_service_client.get_blob_client( - container=container_name, blob=file_path.split("/")[-1] + ".mp4" - ) - with open(full_path, "rb") as data: - try: - if not blob_client.exists(): - blob_client.upload_blob(data) - logging.info("Video uploaded successfully!") - logging.info(blob_client.url) - else: - blob_client.delete_blob() - logging.info("Old Video deleted successfully!") - blob_client.upload_blob(data) - logging.info("New video uploaded successfully!") - except Exception as e: - logging.info("This video can't be uploaded") - return blob_client.url + local_file_to_upload = file_path + ".mp4" + remote_file_name = file_path.split("/")[-1] + ".mp4" + + storage = get_storage_provider() + + try: + url = storage.upload(local_file_to_upload, remote_file_name) + logging.info("Video uploaded successfully!") + logging.info(url) + return url + except Exception as e: + logging.info("This video can't be uploaded") + return None def upload_json(file_path, voice_over_obj): - blob_service_client = BlobServiceClient.from_connection_string(connection_string) voice_over_payload = voice_over_obj.payload json_object = json.dumps(voice_over_payload) - with open(file_path.split("/")[-1] + ".json", "w") as outfile: + local_file_name = file_path.split("/")[-1] + ".json" + remote_file_name = local_file_name + + with open(local_file_name, "w") as outfile: outfile.write(json_object) - blob_client_json = blob_service_client.get_blob_client( - container=container_name, blob=file_path.split("/")[-1] + ".json" - ) + storage = get_storage_provider() - with open(file_path.split("/")[-1] + ".json", "rb") as data: - try: - if not blob_client_json.exists(): - blob_client_json.upload_blob(data) - logging.info("Voice Over payload uploaded successfully!") - logging.info(blob_client_json.url) - else: - blob_client_json.delete_blob() - logging.info("Old Voice Over payload deleted successfully!") - blob_client_json.upload_blob(data) - logging.info("New Voice Over payload successfully!") - except Exception as e: - logging.info("This Voice Over payload can't be uploaded") + try: + url = storage.upload(local_file_name, remote_file_name) + logging.info("Voice Over payload uploaded successfully!") + logging.info(url) + except Exception as e: + logging.info("This Voice Over payload can't be uploaded") def uploadToBlobStorage(file_path, voice_over_obj): @@ -178,50 +154,42 @@ def uploadToBlobStorage(file_path, voice_over_obj): def upload_audio_to_azure_blob(file_path, export_type, export): - blob_service_client = BlobServiceClient.from_connection_string(connection_string) if export == False: AudioSegment.from_wav(file_path + "final.wav").export( file_path + "final.flac", format="flac" ) - full_path_audio = file_path + "final.flac" - blob_client_audio = blob_service_client.get_blob_client( - container=container_name, blob=file_path.split("/")[-1] + ".flac" - ) + local_file_to_upload = file_path + "final.flac" + remote_file_name = file_path.split("/")[-1] + ".flac" else: - full_path_audio = file_path.replace(".flac", "") + "." + export_type - blob_client_audio = blob_service_client.get_blob_client( - container=container_name, - blob=file_path.split("/")[-1].replace(".flac", "") + "." + export_type, - ) - with open(full_path_audio, "rb") as data: - try: - if not blob_client_audio.exists(): - blob_client_audio.upload_blob(data) - logging.info("Audio uploaded successfully!") - logging.info(blob_client_audio.url) - else: - blob_client_audio.delete_blob() - logging.info("Old Audio deleted successfully!") - blob_client_audio.upload_blob(data) - logging.info("New audio uploaded successfully!") - except Exception as e: - logging.info("This audio can't be uploaded") - return blob_client_audio.url + local_file_to_upload = file_path.replace(".flac", "") + "." + export_type + remote_file_name = file_path.split("/")[-1].replace(".flac", "") + "." + export_type + + storage = get_storage_provider() + + try: + url = storage.upload(local_file_to_upload, remote_file_name) + logging.info("Audio uploaded successfully!") + logging.info(url) + return url + except Exception as e: + logging.info("This audio can't be uploaded") + return None def upload_zip_to_azure(zip_file_path): - blob_service_client = BlobServiceClient.from_connection_string(connection_string) - blob_client_zip = blob_service_client.get_blob_client( - container=container_name, blob=zip_file_path - ) - with open(zip_file_path, "rb") as f: - try: - blob_client_zip.upload_blob(f) - logging.info("Audio zip uploaded successfully!") - logging.info(blob_client_zip.url) - except Exception as e: - logging.info("This audio_zip can't be uploaded") - return blob_client_zip.url + storage = get_storage_provider() + + local_file = zip_file_path + remote_file = zip_file_path + + try: + url = storage.upload(local_file, remote_file) + logging.info("Audio zip uploaded successfully!") + logging.info(url) + return url + except Exception as e: + logging.info("This audio_zip can't be uploaded") + return None def get_tts_output(tts_input, target_language, multiple_speaker, gender, id): diff --git a/backend/youtube/utils.py b/backend/youtube/utils.py index a2961133..3c40c21b 100644 --- a/backend/youtube/utils.py +++ b/backend/youtube/utils.py @@ -1,7 +1,6 @@ import requests from uuid import UUID import json -from azure.storage.blob import BlobServiceClient import logging from config import ( connection_string,