feat(ringtone): implement ringtone management features including upload, retrieval, and deletion; enhance identity management with new API endpoints for creating, switching, and deleting identities

This commit is contained in:
2026-01-01 19:53:29 -06:00
parent 668520e576
commit 4a6ab03106
9 changed files with 886 additions and 68 deletions

View File

File diff suppressed because it is too large Load Diff

View File

@@ -131,6 +131,10 @@ class ConfigManager:
60, 60,
) )
# ringtone config
self.custom_ringtone_enabled = self.BoolConfig(self, "custom_ringtone_enabled", False)
self.ringtone_filename = self.StringConfig(self, "ringtone_filename", None)
# map config # map config
self.map_offline_enabled = self.BoolConfig(self, "map_offline_enabled", False) self.map_offline_enabled = self.BoolConfig(self, "map_offline_enabled", False)
self.map_offline_path = self.StringConfig(self, "map_offline_path", None) self.map_offline_path = self.StringConfig(self, "map_offline_path", None)
@@ -183,7 +187,7 @@ class ConfigManager:
config_value = self.manager.get(self.key, default_value=None) config_value = self.manager.get(self.key, default_value=None)
if config_value is None: if config_value is None:
return self.default_value return self.default_value
return config_value == "true" return str(config_value).lower() == "true"
def set(self, value: bool): def set(self, value: bool):
self.manager.set(self.key, "true" if value else "false") self.manager.set(self.key, "true" if value else "false")

View File

@@ -8,6 +8,7 @@ from .schema import DatabaseSchema
from .telemetry import TelemetryDAO from .telemetry import TelemetryDAO
from .telephone import TelephoneDAO from .telephone import TelephoneDAO
from .voicemails import VoicemailDAO from .voicemails import VoicemailDAO
from .ringtones import RingtoneDAO
class Database: class Database:
@@ -21,6 +22,7 @@ class Database:
self.telephone = TelephoneDAO(self.provider) self.telephone = TelephoneDAO(self.provider)
self.telemetry = TelemetryDAO(self.provider) self.telemetry = TelemetryDAO(self.provider)
self.voicemails = VoicemailDAO(self.provider) self.voicemails = VoicemailDAO(self.provider)
self.ringtones = RingtoneDAO(self.provider)
def initialize(self): def initialize(self):
self.schema.initialize() self.schema.initialize()

View File

@@ -18,6 +18,13 @@ class ConfigDAO:
self.provider.execute("DELETE FROM config WHERE key = ?", (key,)) self.provider.execute("DELETE FROM config WHERE key = ?", (key,))
else: else:
now = datetime.now(UTC) now = datetime.now(UTC)
# handle booleans specifically to ensure they are stored as "true"/"false"
if isinstance(value, bool):
value_str = "true" if value else "false"
else:
value_str = str(value)
self.provider.execute( self.provider.execute(
""" """
INSERT INTO config (key, value, created_at, updated_at) INSERT INTO config (key, value, created_at, updated_at)
@@ -26,7 +33,7 @@ class ConfigDAO:
value = EXCLUDED.value, value = EXCLUDED.value,
updated_at = EXCLUDED.updated_at updated_at = EXCLUDED.updated_at
""", """,
(key, str(value), now, now), (key, value_str, now, now),
) )
def delete(self, key): def delete(self, key):

View File

@@ -18,6 +18,10 @@ class DatabaseProvider:
msg = "Database path must be provided for the first initialization" msg = "Database path must be provided for the first initialization"
raise ValueError(msg) raise ValueError(msg)
cls._instance = cls(db_path) cls._instance = cls(db_path)
elif db_path is not None and cls._instance.db_path != db_path:
# If a different path is provided, close the old one and create new
cls._instance.close()
cls._instance = cls(db_path)
return cls._instance return cls._instance
@property @property

View File

@@ -0,0 +1,64 @@
from datetime import UTC, datetime
from .provider import DatabaseProvider
class RingtoneDAO:
def __init__(self, provider: DatabaseProvider):
self.provider = provider
def get_all(self):
return self.provider.fetchall("SELECT * FROM ringtones ORDER BY created_at DESC")
def get_by_id(self, ringtone_id):
return self.provider.fetchone("SELECT * FROM ringtones WHERE id = ?", (ringtone_id,))
def get_primary(self):
return self.provider.fetchone("SELECT * FROM ringtones WHERE is_primary = 1")
def add(self, filename, storage_filename, display_name=None):
now = datetime.now(UTC)
if display_name is None:
display_name = filename
# check if this is the first ringtone, if so make it primary
count = self.provider.fetchone("SELECT COUNT(*) as count FROM ringtones")["count"]
is_primary = 1 if count == 0 else 0
cursor = self.provider.execute(
"INSERT INTO ringtones (filename, display_name, storage_filename, is_primary, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?)",
(filename, display_name, storage_filename, is_primary, now, now)
)
return cursor.lastrowid
def update(self, ringtone_id, display_name=None, is_primary=None):
now = datetime.now(UTC)
if is_primary == 1:
# reset others
self.provider.execute("UPDATE ringtones SET is_primary = 0, updated_at = ?", (now,))
if display_name is not None and is_primary is not None:
self.provider.execute(
"UPDATE ringtones SET display_name = ?, is_primary = ?, updated_at = ? WHERE id = ?",
(display_name, is_primary, now, ringtone_id)
)
elif display_name is not None:
self.provider.execute(
"UPDATE ringtones SET display_name = ?, updated_at = ? WHERE id = ?",
(display_name, now, ringtone_id)
)
elif is_primary is not None:
self.provider.execute(
"UPDATE ringtones SET is_primary = ?, updated_at = ? WHERE id = ?",
(is_primary, now, ringtone_id)
)
def delete(self, ringtone_id):
# if deleting primary, make another one primary if exists
ringtone = self.get_by_id(ringtone_id)
if ringtone and ringtone["is_primary"] == 1:
self.provider.execute("DELETE FROM ringtones WHERE id = ?", (ringtone_id,))
next_ringtone = self.provider.fetchone("SELECT id FROM ringtones LIMIT 1")
if next_ringtone:
self.update(next_ringtone["id"], is_primary=1)
else:
self.provider.execute("DELETE FROM ringtones WHERE id = ?", (ringtone_id,))

View File

@@ -2,7 +2,7 @@ from .provider import DatabaseProvider
class DatabaseSchema: class DatabaseSchema:
LATEST_VERSION = 16 LATEST_VERSION = 17
def __init__(self, provider: DatabaseProvider): def __init__(self, provider: DatabaseProvider):
self.provider = provider self.provider = provider
@@ -227,6 +227,17 @@ class DatabaseSchema:
UNIQUE(destination_hash, timestamp) UNIQUE(destination_hash, timestamp)
) )
""", """,
"ringtones": """
CREATE TABLE IF NOT EXISTS ringtones (
id INTEGER PRIMARY KEY AUTOINCREMENT,
filename TEXT,
display_name TEXT,
storage_filename TEXT,
is_primary INTEGER DEFAULT 0,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
""",
} }
for table_name, create_sql in tables.items(): for table_name, create_sql in tables.items():
@@ -501,6 +512,19 @@ class DatabaseSchema:
except Exception: except Exception:
pass pass
if current_version < 17:
self.provider.execute("""
CREATE TABLE IF NOT EXISTS ringtones (
id INTEGER PRIMARY KEY AUTOINCREMENT,
filename TEXT,
display_name TEXT,
storage_filename TEXT,
is_primary INTEGER DEFAULT 0,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
# Update version in config # Update version in config
self.provider.execute( self.provider.execute(
""" """

View File

@@ -45,3 +45,6 @@ class TelephoneDAO:
"SELECT * FROM call_history ORDER BY timestamp DESC LIMIT ?", "SELECT * FROM call_history ORDER BY timestamp DESC LIMIT ?",
(limit,), (limit,),
) )
def clear_call_history(self):
self.provider.execute("DELETE FROM call_history")

View File

@@ -0,0 +1,64 @@
import os
import shutil
import subprocess
import RNS
class RingtoneManager:
def __init__(self, config, storage_dir):
self.config = config
self.storage_dir = os.path.join(storage_dir, "ringtones")
# Ensure directory exists
os.makedirs(self.storage_dir, exist_ok=True)
# Paths to executables
self.ffmpeg_path = self._find_ffmpeg()
self.has_ffmpeg = self.ffmpeg_path is not None
if self.has_ffmpeg:
RNS.log(f"Ringtone: Found ffmpeg at {self.ffmpeg_path}", RNS.LOG_DEBUG)
else:
RNS.log("Ringtone: ffmpeg not found", RNS.LOG_ERROR)
def _find_ffmpeg(self):
path = shutil.which("ffmpeg")
if path:
return path
return None
def convert_to_ringtone(self, input_path, ringtone_id=None):
if not self.has_ffmpeg:
msg = "ffmpeg is required for audio conversion"
raise RuntimeError(msg)
import secrets
filename = f"ringtone_{secrets.token_hex(8)}.opus"
opus_path = os.path.join(self.storage_dir, filename)
subprocess.run(
[
self.ffmpeg_path,
"-i",
input_path,
"-c:a",
"libopus",
"-b:a",
"32k",
"-vbr",
"on",
opus_path,
],
check=True,
)
return filename
def remove_ringtone(self, filename):
opus_path = os.path.join(self.storage_dir, filename)
if os.path.exists(opus_path):
os.remove(opus_path)
return True
def get_ringtone_path(self, filename):
return os.path.join(self.storage_dir, filename)