Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 87 additions & 2 deletions RomM/api.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import base64
import datetime
import json
import math
import os
Expand All @@ -11,6 +12,7 @@

import platform_maps
from filesystem import Filesystem
from imageutils import ImageUtils
from models import Collection, Platform, Rom
from PIL import Image
from status import Status, View
Expand All @@ -28,6 +30,7 @@ class API:
def __init__(self):
self.status = Status()
self.file_system = Filesystem()
self.image_utils = ImageUtils()

self.host = os.getenv("HOST", "")
self.username = os.getenv("USERNAME", "")
Expand All @@ -37,6 +40,11 @@ def __init__(self):
self._include_collections = set(self._getenv_list("INCLUDE_COLLECTIONS"))
self._exclude_collections = set(self._getenv_list("EXCLUDE_COLLECTIONS"))
self._collection_type = os.getenv("COLLECTION_TYPE", "collection")
self._download_assets = os.getenv("DOWNLOAD_ASSETS", "false") in ("true", "1")
self._fullscreen_assets = os.getenv("FULLSCREEN_ASSETS", "false") in (
"true",
"1",
)

if self.username and self.password:
credentials = f"{self.username}:{self.password}"
Expand Down Expand Up @@ -234,6 +242,7 @@ def fetch_platforms(self) -> None:
self.status.valid_host = False
self.status.valid_credentials = False
return

platforms = json.loads(response.read().decode("utf-8"))
_platforms: list[Platform] = []

Expand All @@ -251,7 +260,7 @@ def fetch_platforms(self) -> None:

for platform in platforms:
if platform["rom_count"] > 0:
platform_slug = platform["slug"].lower()
platform_slug: str = platform["slug"].lower()
if (
platform_maps._env_maps
and platform_slug in platform_maps._env_platforms
Expand Down Expand Up @@ -458,7 +467,7 @@ def fetch_roms(self) -> None:

_roms = []
for rom in roms:
platform_slug = rom["platform_slug"].lower()
platform_slug: str = rom["platform_slug"].lower()
if (
platform_maps._env_maps
and platform_slug in platform_maps._env_platforms
Expand All @@ -476,13 +485,17 @@ def fetch_roms(self) -> None:
)
if mapped_folder.lower() not in roms_subfolders:
continue

if view == View.PLATFORMS and platform_slug != selected_platform_slug:
continue

_roms.append(
Rom(
id=rom["id"],
name=rom["name"],
summary=rom["summary"],
fs_name=rom["fs_name"],
platform_id=rom["platform_id"],
platform_slug=rom["platform_slug"],
fs_extension=rom["fs_extension"],
fs_size=self._human_readable_size(rom["fs_size_bytes"]),
Expand All @@ -492,6 +505,15 @@ def fetch_roms(self) -> None:
regions=rom["regions"],
revision=rom["revision"],
tags=rom["tags"],
path_cover_small=rom.get("path_cover_small", ""),
path_cover_large=rom.get("path_cover_large", ""),
merged_screenshots=rom["merged_screenshots"],
first_release_date=rom["first_release_date"],
average_rating=rom["average_rating"],
genres=rom["genres"],
franchises=rom["franchises"],
companies=rom["companies"],
age_ratings=rom["age_ratings"],
)
)

Expand Down Expand Up @@ -532,6 +554,7 @@ def download_rom(self) -> None:
except ValueError:
self._reset_download_status()
return

try:
if request.type not in ("http", "https"):
self._reset_download_status()
Expand Down Expand Up @@ -608,5 +631,67 @@ def download_rom(self) -> None:
except URLError:
self._reset_download_status(valid_host=True)
return

filename = self._sanitize_filename(rom.fs_name).split(".")[0]
catalogue_path = self.file_system.get_catalogue_platform_path(
rom.platform_slug
)
if rom.summary and catalogue_path:
text_path = os.path.join(
catalogue_path,
"text",
f"{filename}.txt",
)
os.makedirs(os.path.dirname(text_path), exist_ok=True)
with open(text_path, "w") as f:
f.write(rom.summary)
f.write("\n\n")

if rom.first_release_date:
dt = datetime.datetime.fromtimestamp(
rom.first_release_date / 1000
)
formatted_date = dt.strftime("%Y-%m-%d")
f.write(f"First release date: {formatted_date}\n")

if rom.average_rating:
f.write(f"Average rating: {rom.average_rating}\n")

if rom.genres:
f.write(f"Genres: {', '.join(rom.genres)}\n")

if rom.franchises:
f.write(f"Franchises: {', '.join(rom.franchises)}\n")

if rom.companies:
f.write(f"Companies: {', '.join(rom.companies)}\n")

# Don't download covers and previews if the user disabled the option
if not self._download_assets:
continue

# Check if the catalogue path is set and valid
catalogue_path = self.file_system.get_catalogue_platform_path(
rom.platform_slug
)
if not catalogue_path:
continue

box_path = os.path.join(catalogue_path, "box", f"{filename}.png")
preview_path = os.path.join(catalogue_path, "preview", f"{filename}.png")

# Download cover and preview images
os.makedirs(os.path.dirname(box_path), exist_ok=True)
os.makedirs(os.path.dirname(preview_path), exist_ok=True)

self.image_utils.process_assets(
fullscreen=self._fullscreen_assets,
cover_url=f"{self.host}{rom.path_cover_small}",
screenshot_url=f"{self.host}{rom.merged_screenshots[0]}",
box_path=box_path,
preview_path=preview_path,
headers=self.headers,
)

# End of download
self._reset_download_status(valid_host=True, valid_credentials=True)
4 changes: 4 additions & 0 deletions RomM/env.template
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ COLLECTION_TYPE=collection
# Do not display collections with these names (comma separated)
# EXCLUDE_COLLECTIONS=""

# Download cover images and screenshots
DOWNLOAD_ASSETS=1
FULLSCREEN_ASSETS=1

# Map RomM slugs to filesystem directories
# For example, if your PlayStation directory is called "psx":
# CUSTOM_MAPS='{"ps": "psx"}'
Expand Down
43 changes: 40 additions & 3 deletions RomM/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ class Filesystem:

# Storage paths for ROMs
_sd1_roms_storage_path: str
_sd2_roms_storage_path: str | None
_sd2_roms_storage_path: str | None = None
_sd1_catalogue_path: str | None = None
_sd2_catalogue_path: str | None = None

# Resources path: Use current working directory + "resources"
resources_path = os.path.join(os.getcwd(), "resources")
Expand All @@ -35,15 +37,15 @@ def __init__(self) -> None:
if self.is_muos:
self._sd1_roms_storage_path = "/mnt/mmc/ROMS"
self._sd2_roms_storage_path = "/mnt/sdcard/ROMS"
self._sd1_catalogue_path = "/mnt/mmc/MUOS/info/catalogue"
self._sd2_catalogue_path = "/mnt/sdcard/MUOS/info/catalogue"
elif self.is_spruceos:
self._sd1_roms_storage_path = "/mnt/SDCARD/Roms"
self._sd2_roms_storage_path = None
else:
# Go up two levels from the script's directory (e.g., from roms/ports/romm to roms/)
base_path = os.path.abspath(os.path.join(os.getcwd(), "..", ".."))
# Default to the ROMs directory, overridable via environment variable
self._sd1_roms_storage_path = os.environ.get("ROMS_STORAGE_PATH", base_path)
self._sd2_roms_storage_path = None

# Ensure the ROMs storage path exists
if self._sd2_roms_storage_path and not os.path.exists(
Expand All @@ -70,6 +72,14 @@ def _get_sd2_roms_storage_path(self) -> Optional[str]:
"""Return the secondary ROMs storage path if available."""
return self._sd2_roms_storage_path

def _get_sd1_catalogue_path(self) -> Optional[str]:
"""Return the catalogue path for SD1."""
return self._sd1_catalogue_path

def _get_sd2_catalogue_path(self) -> Optional[str]:
"""Return the catalogue path for SD2."""
return self._sd2_catalogue_path

def _get_platform_storage_dir_from_mapping(self, platform: str) -> str:
"""
Return the platform-specific storage path,
Expand Down Expand Up @@ -111,6 +121,20 @@ def _get_sd2_platforms_storage_path(self, platform: str) -> Optional[str]:
return os.path.join(self._sd2_roms_storage_path, platforms_dir)
return None

def get_sd1_catalogue_platform_path(self, platform: str) -> str:
if not self._sd1_catalogue_path:
raise ValueError("SD1 catalogue path is not set.")

platforms_dir = self._get_platform_storage_dir_from_mapping(platform)
return os.path.join(self._sd1_catalogue_path, platforms_dir)

def get_sd2_catalogue_platform_path(self, platform: str) -> str:
if not self._sd2_catalogue_path:
raise ValueError("SD2 catalogue path is not set.")

platforms_dir = self._get_platform_storage_dir_from_mapping(platform)
return os.path.join(self._sd2_catalogue_path, platforms_dir)

###
# PUBLIC METHODS
###
Expand Down Expand Up @@ -138,6 +162,19 @@ def get_platforms_storage_path(self, platform: str) -> str:

return self._get_sd1_platforms_storage_path(platform)

def get_catalogue_path(self, platform: str) -> str | None:
"""Return the catalogue path for a specific platform."""
if self._current_sd == 2:
return self._get_sd2_catalogue_path()

return self._get_sd1_catalogue_path()

def get_catalogue_platform_path(self, platform: str) -> str:
if self._current_sd == 2:
return self.get_sd2_catalogue_platform_path(platform)

return self.get_sd1_catalogue_platform_path(platform)

def is_rom_in_device(self, rom: Rom) -> bool:
"""Check if a ROM exists in the storage path."""
rom_path = os.path.join(
Expand Down
118 changes: 118 additions & 0 deletions RomM/imageutils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
from typing import Optional

from PIL import Image, ImageDraw


class ImageUtils:
_instance: Optional["ImageUtils"] = None
_initialized: bool = False

screen_width = 640
screen_height = 480

def __new__(cls):
if cls._instance is None:
cls._instance = super(ImageUtils, cls).__new__(cls)
return cls._instance

def __init__(self) -> None:
if self._initialized:
return

self.fade_mask = self.generate_fade_mask()
self._initialized = True

def generate_fade_mask(self) -> Image.Image:
fade_mask = Image.new("L", (self.screen_width, self.screen_height), 0)
draw = ImageDraw.Draw(fade_mask)
x_crit = self.screen_width / 3.0

for x in range(self.screen_width):
if x < x_crit:
t = x / x_crit
alpha = int((t**2) * (255 / 3)) # a x = x_crit, alpha = 255/3 ≈ 85
else:
t = (x - x_crit) / (self.screen_width - x_crit)
alpha = int(85 + t * (255 - 85))
draw.line([(x, 0), (x, self.screen_height)], fill=alpha)

return fade_mask

def add_rounded_corners(self, image, radius):
rounded_mask = Image.new("L", image.size, 0)
draw = ImageDraw.Draw(rounded_mask)
draw.rounded_rectangle(
(0, 0, image.size[0], image.size[1]), radius=radius, fill=255
)
image.putalpha(rounded_mask)
return image

def load_image_from_url(self, url: str, headers) -> Image.Image | None:
from io import BytesIO
from urllib.request import Request, urlopen

try:
req = Request(url, headers=headers)
with urlopen(req, timeout=60) as response: # trunk-ignore(bandit/B310)
data = response.read()
return Image.open(BytesIO(data)).convert("RGBA")
except Exception as e:
print(f"Error loading image from URL {url}: {e}")
return None

def process_assets(
self,
fullscreen: bool,
cover_url: str,
screenshot_url: str,
box_path: str,
preview_path: str,
headers,
) -> None:
if not cover_url and not screenshot_url:
return

final_width, final_height = self.screen_width, self.screen_height
background = None
preview = (
self.load_image_from_url(screenshot_url, headers)
if screenshot_url
else None
)

if preview:
preview = preview.resize((final_width, final_height))
preview.save(preview_path)

if fullscreen:
if preview:
background = preview
else:
background = Image.new(
"RGBA", (final_width, final_height), (0, 0, 0, 0)
)
background.putalpha(self.fade_mask)

foreground = self.load_image_from_url(cover_url, headers) if cover_url else None

if foreground:
max_cover_width = 215
max_cover_height = int(final_height * 3 / 5)
scale_w = max_cover_width / foreground.width
scale_h = max_cover_height / foreground.height
scale = min(scale_w, scale_h)
new_cover_width = int(foreground.width * scale)
new_cover_height = int(foreground.height * scale)
foreground = foreground.resize((new_cover_width, new_cover_height))
foreground = self.add_rounded_corners(foreground, radius=20)

fg_x = final_width - new_cover_width - 20
fg_y = (final_height - new_cover_height) // 2

if background:
background.paste(foreground, (fg_x, fg_y), foreground)
else:
background = foreground

if background:
background.save(box_path)
Loading