mirror of
https://github.com/rommapp/romm.git
synced 2025-12-22 10:27:13 +00:00
start work on gamelist.xml extraction
This commit is contained in:
38
backend/alembic/versions/0055_gamelist_xml.py
Normal file
38
backend/alembic/versions/0055_gamelist_xml.py
Normal file
@@ -0,0 +1,38 @@
|
||||
"""
|
||||
|
||||
Revision ID: 0055_gamelist_xml
|
||||
Revises: 0054_add_platform_metadata_slugs
|
||||
Create Date: 2025-10-16 23:07:05.145056
|
||||
|
||||
"""
|
||||
|
||||
import sqlalchemy as sa
|
||||
from alembic import op
|
||||
from sqlalchemy.dialects import postgresql
|
||||
|
||||
revision = "0055_gamelist_xml"
|
||||
down_revision = "0054_add_platform_metadata_slugs"
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
with op.batch_alter_table("roms", schema=None) as batch_op:
|
||||
batch_op.add_column(sa.Column("gamelist_id", sa.Integer(), nullable=True))
|
||||
batch_op.add_column(
|
||||
sa.Column(
|
||||
"gamelist_metadata",
|
||||
sa.JSON().with_variant(
|
||||
postgresql.JSONB(astext_type=sa.Text()), "postgresql"
|
||||
),
|
||||
nullable=True,
|
||||
)
|
||||
)
|
||||
batch_op.create_index("idx_roms_gamelist_id", ["gamelist_id"], unique=False)
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
with op.batch_alter_table("roms", schema=None) as batch_op:
|
||||
batch_op.drop_index("idx_roms_gamelist_id")
|
||||
batch_op.drop_column("gamelist_metadata")
|
||||
batch_op.drop_column("gamelist_id")
|
||||
@@ -199,6 +199,7 @@ class ConfigManager:
|
||||
self._raw_config,
|
||||
"scan.priority.metadata",
|
||||
[
|
||||
"gamelist",
|
||||
"igdb",
|
||||
"moby",
|
||||
"ss",
|
||||
@@ -214,6 +215,7 @@ class ConfigManager:
|
||||
self._raw_config,
|
||||
"scan.priority.artwork",
|
||||
[
|
||||
"gamelist",
|
||||
"igdb",
|
||||
"moby",
|
||||
"ss",
|
||||
|
||||
192
backend/endpoints/gamelist.py
Normal file
192
backend/endpoints/gamelist.py
Normal file
@@ -0,0 +1,192 @@
|
||||
from typing import Annotated, List, Optional
|
||||
|
||||
from fastapi import HTTPException, Query, Request, status
|
||||
from fastapi.responses import Response
|
||||
from pydantic import BaseModel
|
||||
|
||||
from decorators.auth import protected_route
|
||||
from handler.auth.constants import Scope
|
||||
from handler.export.gamelist_exporter import GamelistExporter
|
||||
from logger.formatter import BLUE
|
||||
from logger.formatter import highlight as hl
|
||||
from logger.logger import log
|
||||
from utils.router import APIRouter
|
||||
|
||||
router = APIRouter(
|
||||
prefix="/gamelist",
|
||||
tags=["gamelist"],
|
||||
)
|
||||
|
||||
|
||||
class GamelistExportRequest(BaseModel):
|
||||
platform_ids: List[int]
|
||||
rom_ids: Optional[List[int]] = None
|
||||
|
||||
|
||||
@protected_route(router.post, "/export", [Scope.ROMS_READ])
|
||||
async def export_gamelist(
|
||||
request: Request,
|
||||
platform_ids: Annotated[
|
||||
List[int], Query(description="List of platform IDs to export")
|
||||
],
|
||||
rom_ids: Annotated[
|
||||
Optional[List[int]],
|
||||
Query(description="Optional list of specific ROM IDs to export"),
|
||||
] = None,
|
||||
) -> Response:
|
||||
"""Export platforms/ROMs to gamelist.xml format"""
|
||||
|
||||
if not platform_ids:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
detail="At least one platform ID must be provided",
|
||||
)
|
||||
|
||||
try:
|
||||
exporter = GamelistExporter()
|
||||
|
||||
# If only one platform, return single XML
|
||||
if len(platform_ids) == 1:
|
||||
platform_id = platform_ids[0]
|
||||
xml_content = exporter.export_platform(platform_id, rom_ids)
|
||||
|
||||
log.info(
|
||||
f"Exported gamelist for platform {hl(str(platform_id), color=BLUE)} "
|
||||
f"with {hl(str(len(rom_ids) if rom_ids else 'all'), color=BLUE)} ROMs"
|
||||
)
|
||||
|
||||
return Response(
|
||||
content=xml_content,
|
||||
media_type="application/xml",
|
||||
headers={
|
||||
"Content-Disposition": f"attachment; filename=gamelist_{platform_id}.xml"
|
||||
},
|
||||
)
|
||||
else:
|
||||
# Multiple platforms - return as zip or individual files
|
||||
# For now, return the first platform's XML
|
||||
# TODO: Implement zip export for multiple platforms
|
||||
platform_id = platform_ids[0]
|
||||
xml_content = exporter.export_platform(platform_id, rom_ids)
|
||||
|
||||
log.info(
|
||||
f"Exported gamelist for platform {hl(str(platform_id), color=BLUE)} "
|
||||
f"(first of {len(platform_ids)} platforms)"
|
||||
)
|
||||
|
||||
return Response(
|
||||
content=xml_content,
|
||||
media_type="application/xml",
|
||||
headers={
|
||||
"Content-Disposition": f"attachment; filename=gamelist_{platform_id}.xml"
|
||||
},
|
||||
)
|
||||
|
||||
except ValueError as e:
|
||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e)) from e
|
||||
except Exception as e:
|
||||
log.error(f"Failed to export gamelist: {e}")
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail="Failed to export gamelist",
|
||||
) from e
|
||||
|
||||
|
||||
@protected_route(router.post, "/export/platform/{platform_id}", [Scope.ROMS_READ])
|
||||
async def export_platform_gamelist(
|
||||
request: Request,
|
||||
platform_id: int,
|
||||
rom_ids: Annotated[
|
||||
Optional[List[int]],
|
||||
Query(description="Optional list of specific ROM IDs to export"),
|
||||
] = None,
|
||||
) -> Response:
|
||||
"""Export a specific platform to gamelist.xml format"""
|
||||
|
||||
try:
|
||||
exporter = GamelistExporter()
|
||||
xml_content = exporter.export_platform(platform_id, rom_ids)
|
||||
|
||||
log.info(
|
||||
f"Exported gamelist for platform {hl(str(platform_id), color=BLUE)} "
|
||||
f"with {hl(str(len(rom_ids) if rom_ids else 'all'), color=BLUE)} ROMs"
|
||||
)
|
||||
|
||||
return Response(
|
||||
content=xml_content,
|
||||
media_type="application/xml",
|
||||
headers={
|
||||
"Content-Disposition": f"attachment; filename=gamelist_{platform_id}.xml"
|
||||
},
|
||||
)
|
||||
|
||||
except ValueError as e:
|
||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e)) from e
|
||||
except Exception as e:
|
||||
log.error(f"Failed to export platform gamelist: {e}")
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail="Failed to export platform gamelist",
|
||||
) from e
|
||||
|
||||
|
||||
@protected_route(
|
||||
router.get, "/export/platform/{platform_id}/preview", [Scope.ROMS_READ]
|
||||
)
|
||||
async def preview_platform_gamelist(
|
||||
request: Request,
|
||||
platform_id: int,
|
||||
rom_ids: Annotated[
|
||||
Optional[List[int]],
|
||||
Query(description="Optional list of specific ROM IDs to preview"),
|
||||
] = None,
|
||||
) -> dict:
|
||||
"""Preview gamelist export for a platform (returns metadata without full XML)"""
|
||||
|
||||
try:
|
||||
from handler.database import db_platform_handler, db_rom_handler
|
||||
|
||||
platform = db_platform_handler.get_platform(platform_id)
|
||||
if not platform:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail=f"Platform with ID {platform_id} not found",
|
||||
)
|
||||
|
||||
# Get ROMs for the platform
|
||||
if rom_ids:
|
||||
roms = [db_rom_handler.get_rom(rom_id) for rom_id in rom_ids]
|
||||
roms = [rom for rom in roms if rom and rom.platform_id == platform_id]
|
||||
else:
|
||||
roms = db_rom_handler.get_roms_scalar(platform_id=platform_id)
|
||||
|
||||
# Return preview metadata
|
||||
return {
|
||||
"platform": {
|
||||
"id": platform.id,
|
||||
"name": platform.name,
|
||||
"fs_slug": platform.fs_slug,
|
||||
},
|
||||
"rom_count": len(roms),
|
||||
"roms": [
|
||||
{
|
||||
"id": rom.id,
|
||||
"name": rom.name or rom.fs_name,
|
||||
"fs_name": rom.fs_name,
|
||||
"has_cover": bool(rom.url_cover),
|
||||
"has_screenshots": bool(rom.url_screenshots),
|
||||
"has_summary": bool(rom.summary),
|
||||
}
|
||||
for rom in roms
|
||||
if rom and not rom.missing_from_fs
|
||||
],
|
||||
}
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
log.error(f"Failed to preview platform gamelist: {e}")
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail="Failed to preview platform gamelist",
|
||||
) from e
|
||||
236
backend/handler/export/gamelist_exporter.py
Normal file
236
backend/handler/export/gamelist_exporter.py
Normal file
@@ -0,0 +1,236 @@
|
||||
import os
|
||||
import xml.etree.ElementTree as ET
|
||||
from datetime import datetime
|
||||
from typing import Dict, List, Optional
|
||||
|
||||
from handler.database import db_platform_handler, db_rom_handler
|
||||
from logger.logger import log
|
||||
from models.rom import Rom
|
||||
|
||||
|
||||
class GamelistExporter:
|
||||
"""Export RomM collections to EmulationStation gamelist.xml format"""
|
||||
|
||||
def __init__(self):
|
||||
self.base_path = "/romm/library" # Base path for ROM files
|
||||
|
||||
def _format_release_date(self, date_str: Optional[str]) -> str:
|
||||
"""Format release date to YYYYMMDDTHHMMSS format"""
|
||||
if not date_str:
|
||||
return ""
|
||||
|
||||
try:
|
||||
# Parse date string (assuming YYYY-MM-DD format)
|
||||
if len(date_str) >= 10:
|
||||
year = date_str[:4]
|
||||
month = date_str[5:7]
|
||||
day = date_str[8:10]
|
||||
return f"{year}{month}{day}T000000"
|
||||
except (ValueError, IndexError):
|
||||
pass
|
||||
|
||||
return ""
|
||||
|
||||
def _format_rating(self, rating: Optional[float]) -> str:
|
||||
"""Format rating to 0.0-1.0 scale"""
|
||||
if rating is None:
|
||||
return ""
|
||||
|
||||
# Ensure rating is in valid range
|
||||
if 0.0 <= rating <= 1.0:
|
||||
return str(rating)
|
||||
|
||||
return ""
|
||||
|
||||
def _get_relative_path(self, file_path: str, platform_dir: str) -> str:
|
||||
"""Convert absolute path to relative path for gamelist.xml"""
|
||||
if not file_path:
|
||||
return ""
|
||||
|
||||
# Handle file:// URLs
|
||||
if file_path.startswith("file://"):
|
||||
file_path = file_path[7:]
|
||||
|
||||
# Get relative path from platform directory
|
||||
try:
|
||||
rel_path = os.path.relpath(file_path, platform_dir)
|
||||
return f"./{rel_path}"
|
||||
except ValueError:
|
||||
# If paths are on different drives, return the original path
|
||||
return file_path
|
||||
|
||||
def _get_media_path(
|
||||
self, media_url: str, platform_dir: str, media_type: str
|
||||
) -> str:
|
||||
"""Get relative path for media files"""
|
||||
if not media_url:
|
||||
return ""
|
||||
|
||||
# Handle file:// URLs
|
||||
if media_url.startswith("file://"):
|
||||
file_path = media_url[7:]
|
||||
return self._get_relative_path(file_path, platform_dir)
|
||||
|
||||
# Handle HTTP URLs - store as-is for now
|
||||
return media_url
|
||||
|
||||
def _create_game_element(self, rom: Rom, platform_dir: str) -> ET.Element:
|
||||
"""Create a <game> element for a ROM"""
|
||||
game = ET.Element("game")
|
||||
|
||||
# Basic game info
|
||||
ET.SubElement(game, "path").text = f"./{rom.fs_name}"
|
||||
ET.SubElement(game, "name").text = rom.name or rom.fs_name
|
||||
|
||||
if rom.summary:
|
||||
ET.SubElement(game, "desc").text = rom.summary
|
||||
|
||||
# Media files
|
||||
if rom.url_cover:
|
||||
image_path = self._get_media_path(rom.url_cover, platform_dir, "image")
|
||||
if image_path:
|
||||
ET.SubElement(game, "image").text = image_path
|
||||
|
||||
if rom.url_screenshots:
|
||||
for screenshot_url in rom.url_screenshots:
|
||||
video_path = self._get_media_path(screenshot_url, platform_dir, "video")
|
||||
if video_path:
|
||||
ET.SubElement(game, "video").text = video_path
|
||||
break # Only use first screenshot as video
|
||||
|
||||
# Additional metadata
|
||||
if hasattr(rom, "developer") and rom.developer:
|
||||
ET.SubElement(game, "developer").text = rom.developer
|
||||
|
||||
if hasattr(rom, "publisher") and rom.publisher:
|
||||
ET.SubElement(game, "publisher").text = rom.publisher
|
||||
|
||||
if hasattr(rom, "genre") and rom.genre:
|
||||
ET.SubElement(game, "genre").text = rom.genre
|
||||
|
||||
if hasattr(rom, "players") and rom.players:
|
||||
ET.SubElement(game, "players").text = rom.players
|
||||
|
||||
if hasattr(rom, "lang") and rom.lang:
|
||||
ET.SubElement(game, "lang").text = rom.lang
|
||||
|
||||
if hasattr(rom, "region") and rom.region:
|
||||
ET.SubElement(game, "region").text = rom.region
|
||||
|
||||
if hasattr(rom, "releasedate") and rom.releasedate:
|
||||
release_date = self._format_release_date(rom.releasedate)
|
||||
if release_date:
|
||||
ET.SubElement(game, "releasedate").text = release_date
|
||||
|
||||
if hasattr(rom, "rating") and rom.rating is not None:
|
||||
rating = self._format_rating(rom.rating)
|
||||
if rating:
|
||||
ET.SubElement(game, "rating").text = rating
|
||||
|
||||
# Add external ID if available
|
||||
if rom.igdb_id:
|
||||
ET.SubElement(game, "id").text = str(rom.igdb_id)
|
||||
|
||||
# Add scraping info
|
||||
scrap = ET.SubElement(game, "scrap")
|
||||
scrap.set("name", "RomM")
|
||||
scrap.set("date", datetime.now().strftime("%Y%m%dT%H%M%S"))
|
||||
|
||||
return game
|
||||
|
||||
def export_platform(
|
||||
self, platform_id: int, rom_ids: Optional[List[int]] = None
|
||||
) -> str:
|
||||
"""Export a platform's ROMs to gamelist.xml format
|
||||
|
||||
Args:
|
||||
platform_id: Platform ID to export
|
||||
rom_ids: Optional list of specific ROM IDs to export
|
||||
|
||||
Returns:
|
||||
XML string in gamelist.xml format
|
||||
"""
|
||||
platform = db_platform_handler.get_platform(platform_id)
|
||||
if not platform:
|
||||
raise ValueError(f"Platform with ID {platform_id} not found")
|
||||
|
||||
# Get ROMs for the platform
|
||||
if rom_ids:
|
||||
roms = [db_rom_handler.get_rom(rom_id) for rom_id in rom_ids]
|
||||
roms = [rom for rom in roms if rom and rom.platform_id == platform_id]
|
||||
else:
|
||||
roms = db_rom_handler.get_roms_scalar(platform_id=platform_id)
|
||||
|
||||
# Create root element
|
||||
root = ET.Element("gameList")
|
||||
|
||||
# Platform directory for relative paths
|
||||
platform_dir = os.path.join(self.base_path, "roms", platform.fs_slug)
|
||||
|
||||
# Add games
|
||||
for rom in roms:
|
||||
if rom and not rom.missing_from_fs:
|
||||
game_element = self._create_game_element(rom, platform_dir)
|
||||
root.append(game_element)
|
||||
|
||||
# Convert to XML string
|
||||
ET.indent(root, space=" ", level=0)
|
||||
xml_str = ET.tostring(root, encoding="unicode", xml_declaration=True)
|
||||
|
||||
log.info(f"Exported {len(roms)} ROMs for platform {platform.name}")
|
||||
return xml_str
|
||||
|
||||
def export_multiple_platforms(
|
||||
self, platform_ids: List[int], rom_ids: Optional[List[int]] = None
|
||||
) -> Dict[str, str]:
|
||||
"""Export multiple platforms to separate gamelist.xml files
|
||||
|
||||
Args:
|
||||
platform_ids: List of platform IDs to export
|
||||
rom_ids: Optional list of specific ROM IDs to export
|
||||
|
||||
Returns:
|
||||
Dictionary mapping platform names to XML strings
|
||||
"""
|
||||
results = {}
|
||||
|
||||
for platform_id in platform_ids:
|
||||
try:
|
||||
platform = db_platform_handler.get_platform(platform_id)
|
||||
if platform:
|
||||
xml_content = self.export_platform(platform_id, rom_ids)
|
||||
results[platform.fs_slug] = xml_content
|
||||
except Exception as e:
|
||||
log.error(f"Failed to export platform {platform_id}: {e}")
|
||||
|
||||
return results
|
||||
|
||||
def export_roms_to_file(
|
||||
self, platform_id: int, output_path: str, rom_ids: Optional[List[int]] = None
|
||||
) -> bool:
|
||||
"""Export platform ROMs to a gamelist.xml file
|
||||
|
||||
Args:
|
||||
platform_id: Platform ID to export
|
||||
output_path: Path where to save the gamelist.xml file
|
||||
rom_ids: Optional list of specific ROM IDs to export
|
||||
|
||||
Returns:
|
||||
True if successful, False otherwise
|
||||
"""
|
||||
try:
|
||||
xml_content = self.export_platform(platform_id, rom_ids)
|
||||
|
||||
# Ensure output directory exists
|
||||
os.makedirs(os.path.dirname(output_path), exist_ok=True)
|
||||
|
||||
# Write to file
|
||||
with open(output_path, "w", encoding="utf-8") as f:
|
||||
f.write(xml_content)
|
||||
|
||||
log.info(f"Exported gamelist.xml to {output_path}")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
log.error(f"Failed to export gamelist.xml to {output_path}: {e}")
|
||||
return False
|
||||
@@ -68,39 +68,64 @@ class FSResourcesHandler(FSHandler):
|
||||
cover_file = f"{entity.fs_resources_path}/cover"
|
||||
await self.make_directory(f"{cover_file}")
|
||||
|
||||
httpx_client = ctx_httpx_client.get()
|
||||
try:
|
||||
async with httpx_client.stream("GET", url_cover, timeout=120) as response:
|
||||
if response.status_code == status.HTTP_200_OK:
|
||||
# Check if content is gzipped from response headers
|
||||
is_gzipped = (
|
||||
response.headers.get("content-encoding", "").lower() == "gzip"
|
||||
)
|
||||
# Handle file:// URLs for gamelist.xml
|
||||
if url_cover.startswith("file://"):
|
||||
try:
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
|
||||
async with await self.write_file_streamed(
|
||||
path=cover_file, filename=f"{size.value}.png"
|
||||
) as f:
|
||||
if is_gzipped:
|
||||
# Content is gzipped, decompress it
|
||||
content = await response.aread()
|
||||
try:
|
||||
decompressed_content = gzip.decompress(content)
|
||||
await f.write(decompressed_content)
|
||||
except gzip.BadGzipFile:
|
||||
await f.write(content)
|
||||
else:
|
||||
# Content is not gzipped, stream directly
|
||||
async for chunk in response.aiter_raw():
|
||||
await f.write(chunk)
|
||||
file_path = Path(url_cover[7:]) # Remove "file://" prefix
|
||||
if file_path.exists():
|
||||
# Copy the file to the resources directory
|
||||
dest_path = self.validate_path(f"{cover_file}/{size.value}.png")
|
||||
shutil.copy2(file_path, dest_path)
|
||||
|
||||
if ENABLE_SCHEDULED_CONVERT_IMAGES_TO_WEBP:
|
||||
self.image_converter.convert_to_webp(
|
||||
self.validate_path(f"{cover_file}/{size.value}.png"),
|
||||
force=True,
|
||||
self.image_converter.convert_to_webp(dest_path, force=True)
|
||||
else:
|
||||
log.warning(f"File not found: {file_path}")
|
||||
return None
|
||||
except Exception as exc:
|
||||
log.error(f"Unable to copy cover file {url_cover}: {str(exc)}")
|
||||
return None
|
||||
else:
|
||||
# Handle HTTP URLs
|
||||
httpx_client = ctx_httpx_client.get()
|
||||
try:
|
||||
async with httpx_client.stream(
|
||||
"GET", url_cover, timeout=120
|
||||
) as response:
|
||||
if response.status_code == status.HTTP_200_OK:
|
||||
# Check if content is gzipped from response headers
|
||||
is_gzipped = (
|
||||
response.headers.get("content-encoding", "").lower()
|
||||
== "gzip"
|
||||
)
|
||||
except httpx.TransportError as exc:
|
||||
log.error(f"Unable to fetch cover at {url_cover}: {str(exc)}")
|
||||
return None
|
||||
|
||||
async with await self.write_file_streamed(
|
||||
path=cover_file, filename=f"{size.value}.png"
|
||||
) as f:
|
||||
if is_gzipped:
|
||||
# Content is gzipped, decompress it
|
||||
content = await response.aread()
|
||||
try:
|
||||
decompressed_content = gzip.decompress(content)
|
||||
await f.write(decompressed_content)
|
||||
except gzip.BadGzipFile:
|
||||
await f.write(content)
|
||||
else:
|
||||
# Content is not gzipped, stream directly
|
||||
async for chunk in response.aiter_raw():
|
||||
await f.write(chunk)
|
||||
|
||||
if ENABLE_SCHEDULED_CONVERT_IMAGES_TO_WEBP:
|
||||
self.image_converter.convert_to_webp(
|
||||
self.validate_path(f"{cover_file}/{size.value}.png"),
|
||||
force=True,
|
||||
)
|
||||
except httpx.TransportError as exc:
|
||||
log.error(f"Unable to fetch cover at {url_cover}: {str(exc)}")
|
||||
return None
|
||||
|
||||
if size == CoverSize.SMALL:
|
||||
try:
|
||||
@@ -213,35 +238,55 @@ class FSResourcesHandler(FSHandler):
|
||||
"""
|
||||
screenshot_path = f"{rom.fs_resources_path}/screenshots"
|
||||
|
||||
httpx_client = ctx_httpx_client.get()
|
||||
try:
|
||||
async with httpx_client.stream(
|
||||
"GET", url_screenhot, timeout=120
|
||||
) as response:
|
||||
if response.status_code == status.HTTP_200_OK:
|
||||
# Check if content is gzipped from response headers
|
||||
is_gzipped = (
|
||||
response.headers.get("content-encoding", "").lower() == "gzip"
|
||||
)
|
||||
# Handle file:// URLs for gamelist.xml
|
||||
if url_screenhot.startswith("file://"):
|
||||
try:
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
|
||||
async with await self.write_file_streamed(
|
||||
path=screenshot_path, filename=f"{idx}.jpg"
|
||||
) as f:
|
||||
if is_gzipped:
|
||||
# Content is gzipped, decompress it
|
||||
content = await response.aread()
|
||||
try:
|
||||
decompressed_content = gzip.decompress(content)
|
||||
await f.write(decompressed_content)
|
||||
except gzip.BadGzipFile:
|
||||
await f.write(content)
|
||||
else:
|
||||
# Content is not gzipped, stream directly
|
||||
async for chunk in response.aiter_raw():
|
||||
await f.write(chunk)
|
||||
except httpx.TransportError as exc:
|
||||
log.error(f"Unable to fetch screenshot at {url_screenhot}: {str(exc)}")
|
||||
return None
|
||||
file_path = Path(url_screenhot[7:]) # Remove "file://" prefix
|
||||
if file_path.exists():
|
||||
# Copy the file to the resources directory
|
||||
dest_path = self.validate_path(f"{screenshot_path}/{idx}.jpg")
|
||||
shutil.copy2(file_path, dest_path)
|
||||
else:
|
||||
log.warning(f"Screenshot file not found: {file_path}")
|
||||
return None
|
||||
except Exception as exc:
|
||||
log.error(f"Unable to copy screenshot file {url_screenhot}: {str(exc)}")
|
||||
return None
|
||||
else:
|
||||
# Handle HTTP URLs
|
||||
httpx_client = ctx_httpx_client.get()
|
||||
try:
|
||||
async with httpx_client.stream(
|
||||
"GET", url_screenhot, timeout=120
|
||||
) as response:
|
||||
if response.status_code == status.HTTP_200_OK:
|
||||
# Check if content is gzipped from response headers
|
||||
is_gzipped = (
|
||||
response.headers.get("content-encoding", "").lower()
|
||||
== "gzip"
|
||||
)
|
||||
|
||||
async with await self.write_file_streamed(
|
||||
path=screenshot_path, filename=f"{idx}.jpg"
|
||||
) as f:
|
||||
if is_gzipped:
|
||||
# Content is gzipped, decompress it
|
||||
content = await response.aread()
|
||||
try:
|
||||
decompressed_content = gzip.decompress(content)
|
||||
await f.write(decompressed_content)
|
||||
except gzip.BadGzipFile:
|
||||
await f.write(content)
|
||||
else:
|
||||
# Content is not gzipped, stream directly
|
||||
async for chunk in response.aiter_raw():
|
||||
await f.write(chunk)
|
||||
except httpx.TransportError as exc:
|
||||
log.error(f"Unable to fetch screenshot at {url_screenhot}: {str(exc)}")
|
||||
return None
|
||||
|
||||
def _get_screenshot_path(self, rom: Rom, idx: str):
|
||||
"""Returns rom cover filesystem path adapted to frontend folder structure
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
from .flashpoint_handler import FlashpointHandler
|
||||
from .gamelist_handler import GamelistHandler
|
||||
from .hasheous_handler import HasheousHandler
|
||||
from .hltb_handler import HowLongToBeatHandler
|
||||
from .igdb_handler import IGDBHandler
|
||||
@@ -21,3 +22,4 @@ meta_hasheous_handler = HasheousHandler()
|
||||
meta_tgdb_handler = TGDBHandler()
|
||||
meta_flashpoint_handler = FlashpointHandler()
|
||||
meta_hltb_handler = HowLongToBeatHandler()
|
||||
meta_gamelist_handler = GamelistHandler()
|
||||
|
||||
328
backend/handler/metadata/gamelist_handler.py
Normal file
328
backend/handler/metadata/gamelist_handler.py
Normal file
@@ -0,0 +1,328 @@
|
||||
import os
|
||||
import xml.etree.ElementTree as ET
|
||||
from typing import NotRequired
|
||||
|
||||
from handler.filesystem import fs_rom_handler
|
||||
from logger.logger import log
|
||||
from models.platform import Platform
|
||||
|
||||
from .base_handler import BaseRom, MetadataHandler
|
||||
|
||||
|
||||
class GamelistRom(BaseRom):
|
||||
"""ROM data extracted from gamelist.xml"""
|
||||
|
||||
gamelist_id: str | None
|
||||
developer: NotRequired[str | None]
|
||||
publisher: NotRequired[str | None]
|
||||
genre: NotRequired[str | None]
|
||||
players: NotRequired[str | None]
|
||||
lang: NotRequired[str | None]
|
||||
region: NotRequired[str | None]
|
||||
releasedate: NotRequired[str | None]
|
||||
rating: NotRequired[float | None]
|
||||
hidden: NotRequired[bool]
|
||||
marquee: NotRequired[str | None]
|
||||
thumbnail: NotRequired[str | None]
|
||||
|
||||
|
||||
class GamelistHandler(MetadataHandler):
|
||||
"""Handler for EmulationStation gamelist.xml metadata source"""
|
||||
|
||||
@classmethod
|
||||
def is_enabled(cls) -> bool:
|
||||
"""Gamelist handler is always enabled (no API keys required)"""
|
||||
return True
|
||||
|
||||
def _parse_release_date(self, date_str: str | None) -> str | None:
|
||||
"""Parse release date from YYYYMMDDTHHMMSS format"""
|
||||
if not date_str:
|
||||
return None
|
||||
|
||||
try:
|
||||
# Convert YYYYMMDDTHHMMSS to YYYY-MM-DD
|
||||
if len(date_str) >= 8:
|
||||
year = date_str[:4]
|
||||
month = date_str[4:6]
|
||||
day = date_str[6:8]
|
||||
return f"{year}-{month}-{day}"
|
||||
except (ValueError, IndexError):
|
||||
pass
|
||||
|
||||
return None
|
||||
|
||||
def _parse_rating(self, rating_str: str | None) -> float | None:
|
||||
"""Parse rating from 0.0-1.0 scale"""
|
||||
if not rating_str:
|
||||
return None
|
||||
|
||||
try:
|
||||
rating = float(rating_str)
|
||||
# Ensure rating is in valid range
|
||||
if 0.0 <= rating <= 1.0:
|
||||
return rating
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
|
||||
return None
|
||||
|
||||
def _resolve_media_path(
|
||||
self, media_path: str | None, gamelist_dir: str
|
||||
) -> str | None:
|
||||
"""Convert relative media path to file:// URL"""
|
||||
if not media_path:
|
||||
return None
|
||||
|
||||
# Handle relative paths starting with ./
|
||||
if media_path.startswith("./"):
|
||||
media_path = media_path[2:]
|
||||
|
||||
# Build absolute path
|
||||
abs_path = os.path.join(gamelist_dir, media_path)
|
||||
|
||||
# Check if file exists
|
||||
if os.path.exists(abs_path):
|
||||
return f"file://{os.path.abspath(abs_path)}"
|
||||
|
||||
return None
|
||||
|
||||
def _find_gamelist_files(self, platform: Platform) -> list[tuple[str, str]]:
|
||||
"""Find gamelist.xml files for a platform
|
||||
|
||||
Returns:
|
||||
List of (gamelist_path, rom_path) tuples
|
||||
"""
|
||||
gamelist_files = []
|
||||
|
||||
# Get platform ROM directory
|
||||
roms_path = fs_rom_handler.get_roms_fs_structure(platform.fs_slug)
|
||||
platform_dir = os.path.join(roms_path, platform.fs_slug)
|
||||
|
||||
# Check for platform-level gamelist.xml
|
||||
platform_gamelist = os.path.join(platform_dir, "gamelist.xml")
|
||||
if os.path.exists(platform_gamelist):
|
||||
gamelist_files.append((platform_gamelist, platform_dir))
|
||||
|
||||
# Check for ROM-level gamelist.xml files (for multi-file ROMs)
|
||||
try:
|
||||
for rom_name in os.listdir(platform_dir):
|
||||
rom_path = os.path.join(platform_dir, rom_name)
|
||||
if os.path.isdir(rom_path):
|
||||
# Check if this is a multi-file ROM directory
|
||||
rom_gamelist = os.path.join(rom_path, "gamelist.xml")
|
||||
if os.path.exists(rom_gamelist):
|
||||
gamelist_files.append((rom_gamelist, rom_path))
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
return gamelist_files
|
||||
|
||||
def _parse_gamelist_xml(
|
||||
self, gamelist_path: str, rom_dir: str
|
||||
) -> dict[str, GamelistRom]:
|
||||
"""Parse a gamelist.xml file and return ROM data indexed by filename"""
|
||||
roms_data = {}
|
||||
|
||||
try:
|
||||
tree = ET.parse(gamelist_path)
|
||||
root = tree.getroot()
|
||||
|
||||
for game in root.findall("game"):
|
||||
# Get ROM path from XML
|
||||
path_elem = game.find("path")
|
||||
if path_elem is None or path_elem.text is None:
|
||||
continue
|
||||
|
||||
rom_path = path_elem.text
|
||||
|
||||
# Handle relative paths
|
||||
if rom_path.startswith("./"):
|
||||
rom_path = rom_path[2:]
|
||||
|
||||
# Extract filename for matching
|
||||
rom_filename = os.path.basename(rom_path)
|
||||
|
||||
# Extract metadata
|
||||
name_elem = game.find("name")
|
||||
desc_elem = game.find("desc")
|
||||
image_elem = game.find("image")
|
||||
video_elem = game.find("video")
|
||||
marquee_elem = game.find("marquee")
|
||||
thumbnail_elem = game.find("thumbnail")
|
||||
rating_elem = game.find("rating")
|
||||
releasedate_elem = game.find("releasedate")
|
||||
developer_elem = game.find("developer")
|
||||
publisher_elem = game.find("publisher")
|
||||
genre_elem = game.find("genre")
|
||||
players_elem = game.find("players")
|
||||
lang_elem = game.find("lang")
|
||||
region_elem = game.find("region")
|
||||
id_elem = game.find("id")
|
||||
hidden_elem = game.find("hidden")
|
||||
|
||||
# Build ROM data
|
||||
rom_data = GamelistRom(
|
||||
gamelist_id=id_elem.text if id_elem is not None else None,
|
||||
name=name_elem.text if name_elem is not None else "",
|
||||
summary=desc_elem.text if desc_elem is not None else "",
|
||||
url_cover="",
|
||||
url_screenshots=[],
|
||||
url_manual="",
|
||||
developer=(
|
||||
developer_elem.text if developer_elem is not None else None
|
||||
),
|
||||
publisher=(
|
||||
publisher_elem.text if publisher_elem is not None else None
|
||||
),
|
||||
genre=genre_elem.text if genre_elem is not None else None,
|
||||
players=players_elem.text if players_elem is not None else None,
|
||||
lang=lang_elem.text if lang_elem is not None else None,
|
||||
region=region_elem.text if region_elem is not None else None,
|
||||
releasedate=self._parse_release_date(
|
||||
releasedate_elem.text if releasedate_elem is not None else None
|
||||
),
|
||||
rating=self._parse_rating(
|
||||
rating_elem.text if rating_elem is not None else None
|
||||
),
|
||||
hidden=(
|
||||
hidden_elem.text == "true" if hidden_elem is not None else False
|
||||
),
|
||||
marquee=None,
|
||||
thumbnail=None,
|
||||
)
|
||||
|
||||
# Handle media files
|
||||
if image_elem is not None and image_elem.text:
|
||||
cover_url = self._resolve_media_path(image_elem.text, rom_dir)
|
||||
if cover_url:
|
||||
rom_data["url_cover"] = cover_url
|
||||
|
||||
if video_elem is not None and video_elem.text:
|
||||
# Store video as first screenshot for now
|
||||
video_url = self._resolve_media_path(video_elem.text, rom_dir)
|
||||
if video_url:
|
||||
rom_data["url_screenshots"] = [video_url]
|
||||
|
||||
if marquee_elem is not None and marquee_elem.text:
|
||||
rom_data["marquee"] = self._resolve_media_path(
|
||||
marquee_elem.text, rom_dir
|
||||
)
|
||||
|
||||
if thumbnail_elem is not None and thumbnail_elem.text:
|
||||
rom_data["thumbnail"] = self._resolve_media_path(
|
||||
thumbnail_elem.text, rom_dir
|
||||
)
|
||||
|
||||
# Store by filename for matching
|
||||
roms_data[rom_filename] = rom_data
|
||||
|
||||
except ET.ParseError as e:
|
||||
log.warning(f"Failed to parse gamelist.xml at {gamelist_path}: {e}")
|
||||
except Exception as e:
|
||||
log.error(f"Error reading gamelist.xml at {gamelist_path}: {e}")
|
||||
|
||||
return roms_data
|
||||
|
||||
async def get_rom(self, fs_name: str, platform: Platform) -> GamelistRom:
|
||||
"""Get ROM metadata from gamelist.xml files"""
|
||||
if not self.is_enabled():
|
||||
return GamelistRom(
|
||||
gamelist_id=None,
|
||||
name="",
|
||||
summary="",
|
||||
url_cover="",
|
||||
url_screenshots=[],
|
||||
url_manual="",
|
||||
developer=None,
|
||||
publisher=None,
|
||||
genre=None,
|
||||
players=None,
|
||||
lang=None,
|
||||
region=None,
|
||||
releasedate=None,
|
||||
rating=None,
|
||||
hidden=False,
|
||||
marquee=None,
|
||||
thumbnail=None,
|
||||
)
|
||||
|
||||
# Find all gamelist.xml files for this platform
|
||||
gamelist_files = self._find_gamelist_files(platform)
|
||||
|
||||
if not gamelist_files:
|
||||
return GamelistRom(
|
||||
gamelist_id=None,
|
||||
name="",
|
||||
summary="",
|
||||
url_cover="",
|
||||
url_screenshots=[],
|
||||
url_manual="",
|
||||
developer=None,
|
||||
publisher=None,
|
||||
genre=None,
|
||||
players=None,
|
||||
lang=None,
|
||||
region=None,
|
||||
releasedate=None,
|
||||
rating=None,
|
||||
hidden=False,
|
||||
marquee=None,
|
||||
thumbnail=None,
|
||||
)
|
||||
|
||||
# Parse all gamelist files
|
||||
all_roms_data = {}
|
||||
for gamelist_path, rom_dir in gamelist_files:
|
||||
roms_data = self._parse_gamelist_xml(gamelist_path, rom_dir)
|
||||
all_roms_data.update(roms_data)
|
||||
|
||||
# Try to find exact match first
|
||||
if fs_name in all_roms_data:
|
||||
log.debug(f"Found exact gamelist match for {fs_name}")
|
||||
return all_roms_data[fs_name]
|
||||
|
||||
# Try to find match by filename without extension
|
||||
fs_name_no_ext = fs_rom_handler.get_file_name_with_no_extension(fs_name)
|
||||
for rom_filename, rom_data in all_roms_data.items():
|
||||
rom_filename_no_ext = fs_rom_handler.get_file_name_with_no_extension(
|
||||
rom_filename
|
||||
)
|
||||
if rom_filename_no_ext == fs_name_no_ext:
|
||||
log.debug(
|
||||
f"Found gamelist match by name for {fs_name} -> {rom_filename}"
|
||||
)
|
||||
return rom_data
|
||||
|
||||
# Try fuzzy matching
|
||||
best_match, best_score = self.find_best_match(
|
||||
fs_name_no_ext,
|
||||
[
|
||||
fs_rom_handler.get_file_name_with_no_extension(f)
|
||||
for f in all_roms_data.keys()
|
||||
],
|
||||
min_similarity_score=0.8,
|
||||
)
|
||||
|
||||
if best_match:
|
||||
log.debug(f"Found fuzzy gamelist match for {fs_name} -> {best_match}")
|
||||
return all_roms_data[best_match]
|
||||
|
||||
return GamelistRom(
|
||||
gamelist_id=None,
|
||||
name="",
|
||||
summary="",
|
||||
url_cover="",
|
||||
url_screenshots=[],
|
||||
url_manual="",
|
||||
developer=None,
|
||||
publisher=None,
|
||||
genre=None,
|
||||
players=None,
|
||||
lang=None,
|
||||
region=None,
|
||||
releasedate=None,
|
||||
rating=None,
|
||||
hidden=False,
|
||||
marquee=None,
|
||||
thumbnail=None,
|
||||
)
|
||||
@@ -11,6 +11,7 @@ from handler.filesystem import fs_asset_handler, fs_firmware_handler
|
||||
from handler.filesystem.roms_handler import FSRom
|
||||
from handler.metadata import (
|
||||
meta_flashpoint_handler,
|
||||
meta_gamelist_handler,
|
||||
meta_hasheous_handler,
|
||||
meta_hltb_handler,
|
||||
meta_igdb_handler,
|
||||
@@ -23,6 +24,7 @@ from handler.metadata import (
|
||||
meta_tgdb_handler,
|
||||
)
|
||||
from handler.metadata.flashpoint_handler import FLASHPOINT_PLATFORM_LIST, FlashpointRom
|
||||
from handler.metadata.gamelist_handler import GamelistRom
|
||||
from handler.metadata.hasheous_handler import HASHEOUS_PLATFORM_LIST, HasheousRom
|
||||
from handler.metadata.hltb_handler import HLTB_PLATFORM_LIST, HLTBRom
|
||||
from handler.metadata.igdb_handler import IGDB_PLATFORM_LIST, IGDBRom
|
||||
@@ -67,6 +69,7 @@ class MetadataSource(enum.StrEnum):
|
||||
SGDB = "sgdb" # SteamGridDB
|
||||
FLASHPOINT = "flashpoint" # Flashpoint Project
|
||||
HLTB = "hltb" # HowLongToBeat
|
||||
GAMELIST = "gamelist" # EmulationStation gamelist.xml
|
||||
|
||||
|
||||
def get_main_platform_igdb_id(platform: Platform):
|
||||
@@ -332,6 +335,9 @@ async def scan_rom(
|
||||
"launchbox_id": rom.launchbox_id,
|
||||
"hasheous_id": rom.hasheous_id,
|
||||
"tgdb_id": rom.tgdb_id,
|
||||
"gamelist_id": rom.gamelist_id,
|
||||
"flashpoint_id": rom.flashpoint_id,
|
||||
"hltb_id": rom.hltb_id,
|
||||
"name": rom.name,
|
||||
"slug": rom.slug,
|
||||
"summary": rom.summary,
|
||||
@@ -341,6 +347,9 @@ async def scan_rom(
|
||||
"ra_metadata": rom.ra_metadata,
|
||||
"launchbox_metadata": rom.launchbox_metadata,
|
||||
"hasheous_metadata": rom.hasheous_metadata,
|
||||
"gamelist_metadata": rom.gamelist_metadata,
|
||||
"flashpoint_metadata": rom.flashpoint_metadata,
|
||||
"hltb_metadata": rom.hltb_metadata,
|
||||
"path_cover_s": rom.path_cover_s,
|
||||
"path_cover_l": rom.path_cover_l,
|
||||
"path_screenshots": rom.path_screenshots,
|
||||
@@ -454,6 +463,17 @@ async def scan_rom(
|
||||
|
||||
return IGDBRom(igdb_id=None)
|
||||
|
||||
async def fetch_gamelist_rom() -> GamelistRom:
|
||||
if MetadataSource.GAMELIST in metadata_sources and (
|
||||
newly_added
|
||||
or scan_type == ScanType.COMPLETE
|
||||
or (scan_type == ScanType.PARTIAL and not rom.gamelist_id)
|
||||
or (scan_type == ScanType.UNIDENTIFIED and rom.is_unidentified)
|
||||
):
|
||||
return await meta_gamelist_handler.get_rom(rom_attrs["fs_name"], platform)
|
||||
|
||||
return GamelistRom(gamelist_id=None)
|
||||
|
||||
async def fetch_flashpoint_rom() -> FlashpointRom:
|
||||
if (
|
||||
MetadataSource.FLASHPOINT in metadata_sources
|
||||
@@ -622,6 +642,7 @@ async def scan_rom(
|
||||
hasheous_handler_rom,
|
||||
flashpoint_handler_rom,
|
||||
hltb_handler_rom,
|
||||
gamelist_handler_rom,
|
||||
) = await asyncio.gather(
|
||||
fetch_igdb_rom(playmatch_hash_match, hasheous_hash_match),
|
||||
fetch_moby_rom(),
|
||||
@@ -631,6 +652,7 @@ async def scan_rom(
|
||||
fetch_hasheous_rom(hasheous_hash_match),
|
||||
fetch_flashpoint_rom(),
|
||||
fetch_hltb_rom(),
|
||||
fetch_gamelist_rom(),
|
||||
)
|
||||
|
||||
metadata_handlers = {
|
||||
@@ -642,6 +664,7 @@ async def scan_rom(
|
||||
MetadataSource.HASHEOUS: hasheous_handler_rom,
|
||||
MetadataSource.FLASHPOINT: flashpoint_handler_rom,
|
||||
MetadataSource.HLTB: hltb_handler_rom,
|
||||
MetadataSource.GAMELIST: gamelist_handler_rom,
|
||||
}
|
||||
|
||||
# Determine which metadata sources are available
|
||||
@@ -705,6 +728,7 @@ async def scan_rom(
|
||||
and not hasheous_handler_rom.get("hasheous_id")
|
||||
and not flashpoint_handler_rom.get("flashpoint_id")
|
||||
and not hltb_handler_rom.get("hltb_id")
|
||||
and not gamelist_handler_rom.get("gamelist_id")
|
||||
):
|
||||
log.warning(
|
||||
f"{hl(rom_attrs['fs_name'])} not identified {emoji.EMOJI_CROSS_MARK}",
|
||||
@@ -725,6 +749,7 @@ async def scan_rom(
|
||||
ss_handler_rom.get("name", None),
|
||||
moby_handler_rom.get("name", None),
|
||||
launchbox_handler_rom.get("name", None),
|
||||
gamelist_handler_rom.get("name", None),
|
||||
rom_attrs["fs_name_no_tags"],
|
||||
]
|
||||
game_names = [name for name in game_names if name]
|
||||
|
||||
@@ -29,6 +29,7 @@ from endpoints import (
|
||||
configs,
|
||||
feeds,
|
||||
firmware,
|
||||
gamelist,
|
||||
heartbeat,
|
||||
platform,
|
||||
raw,
|
||||
@@ -130,6 +131,7 @@ app.include_router(raw.router, prefix="/api")
|
||||
app.include_router(screenshots.router, prefix="/api")
|
||||
app.include_router(firmware.router, prefix="/api")
|
||||
app.include_router(collections.router, prefix="/api")
|
||||
app.include_router(gamelist.router, prefix="/api")
|
||||
|
||||
app.mount("/ws", socket_handler.socket_app)
|
||||
|
||||
|
||||
@@ -2,7 +2,7 @@ from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from sqlalchemy import String, func, select
|
||||
from sqlalchemy import Integer, String, func, select
|
||||
from sqlalchemy.orm import Mapped, column_property, mapped_column, relationship
|
||||
|
||||
from models.base import BaseModel
|
||||
@@ -19,18 +19,18 @@ class Platform(BaseModel):
|
||||
__tablename__ = "platforms"
|
||||
|
||||
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
|
||||
igdb_id: Mapped[int | None]
|
||||
sgdb_id: Mapped[int | None]
|
||||
moby_id: Mapped[int | None]
|
||||
ss_id: Mapped[int | None]
|
||||
ra_id: Mapped[int | None]
|
||||
launchbox_id: Mapped[int | None]
|
||||
hasheous_id: Mapped[int | None]
|
||||
tgdb_id: Mapped[int | None]
|
||||
flashpoint_id: Mapped[int | None]
|
||||
igdb_slug: Mapped[str | None]
|
||||
moby_slug: Mapped[str | None]
|
||||
hltb_slug: Mapped[str | None]
|
||||
igdb_id: Mapped[int | None] = mapped_column(Integer(), default=None)
|
||||
sgdb_id: Mapped[int | None] = mapped_column(Integer(), default=None)
|
||||
moby_id: Mapped[int | None] = mapped_column(Integer(), default=None)
|
||||
ss_id: Mapped[int | None] = mapped_column(Integer(), default=None)
|
||||
ra_id: Mapped[int | None] = mapped_column(Integer(), default=None)
|
||||
launchbox_id: Mapped[int | None] = mapped_column(Integer(), default=None)
|
||||
hasheous_id: Mapped[int | None] = mapped_column(Integer(), default=None)
|
||||
tgdb_id: Mapped[int | None] = mapped_column(Integer(), default=None)
|
||||
flashpoint_id: Mapped[int | None] = mapped_column(Integer(), default=None)
|
||||
igdb_slug: Mapped[str | None] = mapped_column(String(length=100), default=None)
|
||||
moby_slug: Mapped[str | None] = mapped_column(String(length=100), default=None)
|
||||
hltb_slug: Mapped[str | None] = mapped_column(String(length=100), default=None)
|
||||
slug: Mapped[str] = mapped_column(String(length=100))
|
||||
fs_slug: Mapped[str] = mapped_column(String(length=100))
|
||||
name: Mapped[str] = mapped_column(String(length=400))
|
||||
|
||||
@@ -145,6 +145,7 @@ class Rom(BaseModel):
|
||||
tgdb_id: Mapped[int | None] = mapped_column(Integer(), default=None)
|
||||
flashpoint_id: Mapped[str | None] = mapped_column(String(length=100), default=None)
|
||||
hltb_id: Mapped[int | None] = mapped_column(Integer(), default=None)
|
||||
gamelist_id: Mapped[str | None] = mapped_column(Integer(), default=None)
|
||||
|
||||
__table_args__ = (
|
||||
Index("idx_roms_igdb_id", "igdb_id"),
|
||||
@@ -157,6 +158,7 @@ class Rom(BaseModel):
|
||||
Index("idx_roms_tgdb_id", "tgdb_id"),
|
||||
Index("idx_roms_flashpoint_id", "flashpoint_id"),
|
||||
Index("idx_roms_hltb_id", "hltb_id"),
|
||||
Index("idx_roms_gamelist_id", "gamelist_id"),
|
||||
)
|
||||
|
||||
fs_name: Mapped[str] = mapped_column(String(length=FILE_NAME_MAX_LENGTH))
|
||||
@@ -193,6 +195,9 @@ class Rom(BaseModel):
|
||||
hltb_metadata: Mapped[dict[str, Any] | None] = mapped_column(
|
||||
CustomJSON(), default=dict
|
||||
)
|
||||
gamelist_metadata: Mapped[dict[str, Any] | None] = mapped_column(
|
||||
CustomJSON(), default=dict
|
||||
)
|
||||
|
||||
path_cover_s: Mapped[str | None] = mapped_column(Text, default="")
|
||||
path_cover_l: Mapped[str | None] = mapped_column(Text, default="")
|
||||
|
||||
@@ -52,6 +52,7 @@ filesystem: {} # { roms_folder: 'roms' } For example if your folder structure is
|
||||
# scan:
|
||||
# priority:
|
||||
# metadata: # Top-level metadata source priority
|
||||
# - "gamelist" # EmulationStation (highest priority)
|
||||
# - "igdb" # IGDB
|
||||
# - "moby" # MobyGames
|
||||
# - "ss" # Screenscraper
|
||||
@@ -59,8 +60,9 @@ filesystem: {} # { roms_folder: 'roms' } For example if your folder structure is
|
||||
# - "lb" # Launchbox
|
||||
# - "hasheous" # Hasheous
|
||||
# - "flashpoint" # Flashpoint Project
|
||||
# - "hltb" # HowLongToBeat
|
||||
# - "hltb" # HowLongToBeat (lowest priority)
|
||||
# artwork: # Cover art and screenshots
|
||||
# - "gamelist" # EmulationStation
|
||||
# - "igdb" # IGDB
|
||||
# - "moby" # MobyGames
|
||||
# - "ss" # Screenscraper
|
||||
|
||||
Reference in New Issue
Block a user