feat(logging): implement persistent logging with anomaly detection and database integration for debug logs
This commit is contained in:
@@ -79,6 +79,7 @@ from meshchatx.src.backend.nomadnet_utils import (
|
||||
convert_nomadnet_field_data_to_map,
|
||||
convert_nomadnet_string_data_to_map,
|
||||
)
|
||||
from meshchatx.src.backend.persistent_log_handler import PersistentLogHandler
|
||||
from meshchatx.src.backend.recovery import CrashRecovery
|
||||
from meshchatx.src.backend.rnprobe_handler import RNProbeHandler
|
||||
from meshchatx.src.backend.sideband_commands import SidebandCommands
|
||||
@@ -89,31 +90,8 @@ import collections
|
||||
import logging
|
||||
|
||||
|
||||
class MemoryLogHandler(logging.Handler):
|
||||
def __init__(self, capacity=5000):
|
||||
super().__init__()
|
||||
self.logs = collections.deque(maxlen=capacity)
|
||||
|
||||
def emit(self, record):
|
||||
try:
|
||||
msg = self.format(record)
|
||||
self.logs.append(
|
||||
{
|
||||
"timestamp": datetime.now(UTC).isoformat(),
|
||||
"level": record.levelname,
|
||||
"module": record.module,
|
||||
"message": msg,
|
||||
}
|
||||
)
|
||||
except Exception:
|
||||
self.handleError(record)
|
||||
|
||||
def get_logs(self):
|
||||
return list(self.logs)
|
||||
|
||||
|
||||
# Global log handler
|
||||
memory_log_handler = MemoryLogHandler()
|
||||
memory_log_handler = PersistentLogHandler()
|
||||
logging.basicConfig(
|
||||
level=logging.INFO, handlers=[memory_log_handler, logging.StreamHandler(sys.stdout)]
|
||||
)
|
||||
@@ -517,6 +495,9 @@ class ReticulumMeshChat:
|
||||
self.contexts[identity_hash] = context
|
||||
self.current_context = context
|
||||
context.setup()
|
||||
|
||||
# Link database to memory log handler
|
||||
memory_log_handler.set_database(context.database)
|
||||
|
||||
def _checkpoint_and_close(self):
|
||||
# delegated to database instance
|
||||
@@ -1989,10 +1970,39 @@ class ReticulumMeshChat:
|
||||
async def call_html_redirect(request):
|
||||
return web.HTTPFound("/#/popout/call")
|
||||
|
||||
# serve ping
|
||||
# serve debug logs
|
||||
@routes.get("/api/v1/debug/logs")
|
||||
async def get_debug_logs(request):
|
||||
return web.json_response(memory_log_handler.get_logs())
|
||||
search = request.query.get("search")
|
||||
level = request.query.get("level")
|
||||
module = request.query.get("module")
|
||||
is_anomaly = parse_bool_query_param(request.query.get("is_anomaly"))
|
||||
limit = int(request.query.get("limit", 100))
|
||||
offset = int(request.query.get("offset", 0))
|
||||
|
||||
logs = memory_log_handler.get_logs(
|
||||
limit=limit,
|
||||
offset=offset,
|
||||
search=search,
|
||||
level=level,
|
||||
module=module,
|
||||
is_anomaly=is_anomaly,
|
||||
)
|
||||
total = memory_log_handler.get_total_count(
|
||||
search=search,
|
||||
level=level,
|
||||
module=module,
|
||||
is_anomaly=is_anomaly,
|
||||
)
|
||||
|
||||
return web.json_response(
|
||||
{
|
||||
"logs": logs,
|
||||
"total": total,
|
||||
"limit": limit,
|
||||
"offset": offset,
|
||||
}
|
||||
)
|
||||
|
||||
@routes.post("/api/v1/database/snapshot")
|
||||
async def create_db_snapshot(request):
|
||||
@@ -3166,6 +3176,7 @@ class ReticulumMeshChat:
|
||||
"download_stats": {
|
||||
"avg_download_speed_bps": avg_download_speed_bps,
|
||||
},
|
||||
"emergency": getattr(self, "emergency", False),
|
||||
"integrity_issues": getattr(self, "integrity_issues", []),
|
||||
"user_guidance": self.build_user_guidance_messages(),
|
||||
"tutorial_seen": self.config.get("tutorial_seen", "false")
|
||||
@@ -3913,6 +3924,12 @@ class ReticulumMeshChat:
|
||||
d = dict(row)
|
||||
remote_identity_hash = d.get("remote_identity_hash")
|
||||
if remote_identity_hash:
|
||||
# try to resolve name if unknown or missing
|
||||
if not d.get("remote_identity_name") or d.get("remote_identity_name") == "Unknown":
|
||||
resolved_name = self.get_name_for_identity_hash(remote_identity_hash)
|
||||
if resolved_name:
|
||||
d["remote_identity_name"] = resolved_name
|
||||
|
||||
lxmf_hash = self.get_lxmf_destination_hash_for_identity_hash(
|
||||
remote_identity_hash,
|
||||
)
|
||||
@@ -4307,52 +4324,68 @@ class ReticulumMeshChat:
|
||||
|
||||
@routes.get("/api/v1/telephone/ringtones/status")
|
||||
async def telephone_ringtone_status(request):
|
||||
caller_hash = request.query.get("caller_hash")
|
||||
try:
|
||||
caller_hash = request.query.get("caller_hash")
|
||||
|
||||
ringtone_id = None
|
||||
ringtone_id = None
|
||||
|
||||
# 1. check contact preferred ringtone
|
||||
if caller_hash:
|
||||
contact = self.database.contacts.get_contact_by_identity_hash(
|
||||
caller_hash
|
||||
# 1. check contact preferred ringtone
|
||||
if caller_hash:
|
||||
contact = self.database.contacts.get_contact_by_identity_hash(
|
||||
caller_hash
|
||||
)
|
||||
if contact and contact.get("preferred_ringtone_id"):
|
||||
ringtone_id = contact["preferred_ringtone_id"]
|
||||
|
||||
# 2. check global preferred for non-contacts
|
||||
if ringtone_id is None:
|
||||
preferred_id = self.config.ringtone_preferred_id.get()
|
||||
if preferred_id:
|
||||
ringtone_id = preferred_id
|
||||
|
||||
# 3. fallback to primary
|
||||
if ringtone_id is None:
|
||||
primary = self.database.ringtones.get_primary()
|
||||
if primary:
|
||||
ringtone_id = primary["id"]
|
||||
|
||||
# 4. handle random if selected (-1)
|
||||
if ringtone_id == -1:
|
||||
import random
|
||||
|
||||
ringtones = self.database.ringtones.get_all()
|
||||
if ringtones:
|
||||
ringtone_id = random.choice(ringtones)["id"] # noqa: S311
|
||||
else:
|
||||
ringtone_id = None
|
||||
|
||||
has_custom = ringtone_id is not None
|
||||
ringtone = (
|
||||
self.database.ringtones.get_by_id(ringtone_id)
|
||||
if has_custom
|
||||
else None
|
||||
)
|
||||
if contact and contact.get("preferred_ringtone_id"):
|
||||
ringtone_id = contact["preferred_ringtone_id"]
|
||||
|
||||
# 2. check global preferred for non-contacts
|
||||
if ringtone_id is None:
|
||||
preferred_id = self.config.ringtone_preferred_id.get()
|
||||
if preferred_id:
|
||||
ringtone_id = preferred_id
|
||||
|
||||
# 3. fallback to primary
|
||||
if ringtone_id is None:
|
||||
primary = self.database.ringtones.get_primary()
|
||||
if primary:
|
||||
ringtone_id = primary["id"]
|
||||
|
||||
# 4. handle random if selected (-1)
|
||||
if ringtone_id == -1:
|
||||
import random
|
||||
|
||||
ringtones = self.database.ringtones.get_all()
|
||||
if ringtones:
|
||||
ringtone_id = random.choice(ringtones)["id"] # noqa: S311
|
||||
|
||||
has_custom = ringtone_id is not None
|
||||
ringtone = (
|
||||
self.database.ringtones.get_by_id(ringtone_id) if has_custom else None
|
||||
)
|
||||
|
||||
return web.json_response(
|
||||
{
|
||||
"has_custom_ringtone": has_custom,
|
||||
"enabled": self.config.custom_ringtone_enabled.get(),
|
||||
"filename": ringtone["filename"] if ringtone else None,
|
||||
"id": ringtone_id if ringtone_id != -1 else None,
|
||||
"volume": self.config.ringtone_volume.get() / 100.0,
|
||||
},
|
||||
)
|
||||
return web.json_response(
|
||||
{
|
||||
"has_custom_ringtone": has_custom and ringtone is not None,
|
||||
"enabled": self.config.custom_ringtone_enabled.get(),
|
||||
"filename": ringtone["filename"] if ringtone else None,
|
||||
"id": ringtone_id if ringtone_id != -1 else None,
|
||||
"volume": self.config.ringtone_volume.get() / 100.0,
|
||||
},
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Error in telephone_ringtone_status: {e}")
|
||||
return web.json_response(
|
||||
{
|
||||
"has_custom_ringtone": False,
|
||||
"enabled": self.config.custom_ringtone_enabled.get(),
|
||||
"filename": None,
|
||||
"id": None,
|
||||
"volume": self.config.ringtone_volume.get() / 100.0,
|
||||
},
|
||||
)
|
||||
|
||||
@routes.get("/api/v1/telephone/ringtones/{id}/audio")
|
||||
async def telephone_ringtone_audio(request):
|
||||
@@ -7171,6 +7204,7 @@ class ReticulumMeshChat:
|
||||
# update lxmf user icon name in config
|
||||
if "lxmf_user_icon_name" in data:
|
||||
self.config.lxmf_user_icon_name.set(data["lxmf_user_icon_name"])
|
||||
self.database.misc.clear_last_sent_icon_hashes()
|
||||
self.update_identity_metadata_cache()
|
||||
|
||||
# update lxmf user icon foreground colour in config
|
||||
@@ -7178,6 +7212,7 @@ class ReticulumMeshChat:
|
||||
self.config.lxmf_user_icon_foreground_colour.set(
|
||||
data["lxmf_user_icon_foreground_colour"],
|
||||
)
|
||||
self.database.misc.clear_last_sent_icon_hashes()
|
||||
self.update_identity_metadata_cache()
|
||||
|
||||
# update lxmf user icon background colour in config
|
||||
@@ -7185,6 +7220,7 @@ class ReticulumMeshChat:
|
||||
self.config.lxmf_user_icon_background_colour.set(
|
||||
data["lxmf_user_icon_background_colour"],
|
||||
)
|
||||
self.database.misc.clear_last_sent_icon_hashes()
|
||||
self.update_identity_metadata_cache()
|
||||
|
||||
# update archiver settings
|
||||
|
||||
@@ -6,6 +6,7 @@ from datetime import UTC, datetime
|
||||
from .announces import AnnounceDAO
|
||||
from .config import ConfigDAO
|
||||
from .contacts import ContactsDAO
|
||||
from .debug_logs import DebugLogsDAO
|
||||
from .legacy_migrator import LegacyMigrator
|
||||
from .map_drawings import MapDrawingsDAO
|
||||
from .messages import MessageDAO
|
||||
@@ -32,6 +33,7 @@ class Database:
|
||||
self.ringtones = RingtoneDAO(self.provider)
|
||||
self.contacts = ContactsDAO(self.provider)
|
||||
self.map_drawings = MapDrawingsDAO(self.provider)
|
||||
self.debug_logs = DebugLogsDAO(self.provider)
|
||||
|
||||
def initialize(self):
|
||||
self.schema.initialize()
|
||||
|
||||
87
meshchatx/src/backend/database/debug_logs.py
Normal file
87
meshchatx/src/backend/database/debug_logs.py
Normal file
@@ -0,0 +1,87 @@
|
||||
from datetime import UTC, datetime
|
||||
from .provider import DatabaseProvider
|
||||
|
||||
|
||||
class DebugLogsDAO:
|
||||
def __init__(self, provider: DatabaseProvider):
|
||||
self.provider = provider
|
||||
|
||||
def insert_log(self, level, module, message, is_anomaly=0, anomaly_type=None):
|
||||
sql = """
|
||||
INSERT INTO debug_logs (timestamp, level, module, message, is_anomaly, anomaly_type)
|
||||
VALUES (?, ?, ?, ?, ?, ?)
|
||||
"""
|
||||
self.provider.execute(
|
||||
sql,
|
||||
(
|
||||
datetime.now(UTC).timestamp(),
|
||||
level,
|
||||
module,
|
||||
message,
|
||||
is_anomaly,
|
||||
anomaly_type,
|
||||
),
|
||||
)
|
||||
|
||||
def get_logs(self, limit=100, offset=0, search=None, level=None, module=None, is_anomaly=None):
|
||||
sql = "SELECT * FROM debug_logs WHERE 1=1"
|
||||
params = []
|
||||
|
||||
if search:
|
||||
sql += " AND (message LIKE ? OR module LIKE ?)"
|
||||
params.extend([f"%{search}%", f"%{search}%"])
|
||||
|
||||
if level:
|
||||
sql += " AND level = ?"
|
||||
params.append(level)
|
||||
|
||||
if module:
|
||||
sql += " AND module = ?"
|
||||
params.append(module)
|
||||
|
||||
if is_anomaly is not None:
|
||||
sql += " AND is_anomaly = ?"
|
||||
params.append(1 if is_anomaly else 0)
|
||||
|
||||
sql += " ORDER BY timestamp DESC LIMIT ? OFFSET ?"
|
||||
params.extend([limit, offset])
|
||||
|
||||
return self.provider.fetchall(sql, tuple(params))
|
||||
|
||||
def get_total_count(self, search=None, level=None, module=None, is_anomaly=None):
|
||||
sql = "SELECT COUNT(*) as count FROM debug_logs WHERE 1=1"
|
||||
params = []
|
||||
|
||||
if search:
|
||||
sql += " AND (message LIKE ? OR module LIKE ?)"
|
||||
params.extend([f"%{search}%", f"%{search}%"])
|
||||
|
||||
if level:
|
||||
sql += " AND level = ?"
|
||||
params.append(level)
|
||||
|
||||
if module:
|
||||
sql += " AND module = ?"
|
||||
params.append(module)
|
||||
|
||||
if is_anomaly is not None:
|
||||
sql += " AND is_anomaly = ?"
|
||||
params.append(1 if is_anomaly else 0)
|
||||
|
||||
row = self.provider.fetchone(sql, tuple(params))
|
||||
return row["count"] if row else 0
|
||||
|
||||
def cleanup_old_logs(self, max_logs=10000):
|
||||
"""Removes old logs keeping only the newest max_logs."""
|
||||
count = self.get_total_count()
|
||||
if count > max_logs:
|
||||
# Find the timestamp of the N-th newest log
|
||||
sql = "SELECT timestamp FROM debug_logs ORDER BY timestamp DESC LIMIT 1 OFFSET ?"
|
||||
row = self.provider.fetchone(sql, (max_logs - 1,))
|
||||
if row:
|
||||
cutoff_ts = row["timestamp"]
|
||||
self.provider.execute("DELETE FROM debug_logs WHERE timestamp < ?", (cutoff_ts,))
|
||||
|
||||
def get_anomalies(self, limit=50):
|
||||
return self.get_logs(limit=limit, is_anomaly=True)
|
||||
|
||||
@@ -200,13 +200,14 @@ class MiscDAO:
|
||||
now = datetime.now(UTC)
|
||||
self.provider.execute(
|
||||
"""
|
||||
INSERT INTO crawl_tasks (destination_hash, page_path, status, retry_count, created_at)
|
||||
VALUES (?, ?, ?, ?, ?)
|
||||
INSERT INTO crawl_tasks (destination_hash, page_path, status, retry_count, created_at, updated_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?)
|
||||
ON CONFLICT(destination_hash, page_path) DO UPDATE SET
|
||||
status = EXCLUDED.status,
|
||||
retry_count = EXCLUDED.retry_count
|
||||
retry_count = EXCLUDED.retry_count,
|
||||
updated_at = EXCLUDED.updated_at
|
||||
""",
|
||||
(destination_hash, page_path, status, retry_count, now),
|
||||
(destination_hash, page_path, status, retry_count, now, now),
|
||||
)
|
||||
|
||||
def get_pending_crawl_tasks(self):
|
||||
@@ -220,6 +221,8 @@ class MiscDAO:
|
||||
"page_path",
|
||||
"status",
|
||||
"retry_count",
|
||||
"last_retry_at",
|
||||
"next_retry_at",
|
||||
"updated_at",
|
||||
}
|
||||
filtered_kwargs = {k: v for k, v in kwargs.items() if k in allowed_keys}
|
||||
@@ -323,3 +326,6 @@ class MiscDAO:
|
||||
""",
|
||||
(destination_hash, icon_hash, now, now),
|
||||
)
|
||||
|
||||
def clear_last_sent_icon_hashes(self):
|
||||
self.provider.execute("DELETE FROM lxmf_last_sent_icon_hashes")
|
||||
|
||||
@@ -2,7 +2,7 @@ from .provider import DatabaseProvider
|
||||
|
||||
|
||||
class DatabaseSchema:
|
||||
LATEST_VERSION = 32
|
||||
LATEST_VERSION = 34
|
||||
|
||||
def __init__(self, provider: DatabaseProvider):
|
||||
self.provider = provider
|
||||
@@ -253,6 +253,7 @@ class DatabaseSchema:
|
||||
next_retry_at DATETIME,
|
||||
status TEXT DEFAULT 'pending',
|
||||
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
||||
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
||||
UNIQUE(destination_hash, page_path)
|
||||
)
|
||||
""",
|
||||
@@ -386,6 +387,18 @@ class DatabaseSchema:
|
||||
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
|
||||
)
|
||||
""",
|
||||
"debug_logs": """
|
||||
CREATE TABLE IF NOT EXISTS debug_logs (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
timestamp REAL,
|
||||
level TEXT,
|
||||
module TEXT,
|
||||
message TEXT,
|
||||
is_anomaly INTEGER DEFAULT 0,
|
||||
anomaly_type TEXT,
|
||||
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
|
||||
)
|
||||
""",
|
||||
}
|
||||
|
||||
for table_name, create_sql in tables.items():
|
||||
@@ -446,6 +459,16 @@ class DatabaseSchema:
|
||||
self.provider.execute(
|
||||
"CREATE UNIQUE INDEX IF NOT EXISTS idx_lxmf_telemetry_dest_ts_unique ON lxmf_telemetry(destination_hash, timestamp)",
|
||||
)
|
||||
elif table_name == "debug_logs":
|
||||
self.provider.execute(
|
||||
"CREATE INDEX IF NOT EXISTS idx_debug_logs_timestamp ON debug_logs(timestamp)",
|
||||
)
|
||||
self.provider.execute(
|
||||
"CREATE INDEX IF NOT EXISTS idx_debug_logs_level ON debug_logs(level)",
|
||||
)
|
||||
self.provider.execute(
|
||||
"CREATE INDEX IF NOT EXISTS idx_debug_logs_anomaly ON debug_logs(is_anomaly)",
|
||||
)
|
||||
|
||||
def migrate(self, current_version):
|
||||
if current_version < 7:
|
||||
@@ -861,6 +884,38 @@ class DatabaseSchema:
|
||||
("changelog_seen_version", "0.0.0"),
|
||||
)
|
||||
|
||||
if current_version < 33:
|
||||
self.provider.execute("""
|
||||
CREATE TABLE IF NOT EXISTS debug_logs (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
timestamp REAL,
|
||||
level TEXT,
|
||||
module TEXT,
|
||||
message TEXT,
|
||||
is_anomaly INTEGER DEFAULT 0,
|
||||
anomaly_type TEXT,
|
||||
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
|
||||
)
|
||||
""")
|
||||
self.provider.execute(
|
||||
"CREATE INDEX IF NOT EXISTS idx_debug_logs_timestamp ON debug_logs(timestamp)",
|
||||
)
|
||||
self.provider.execute(
|
||||
"CREATE INDEX IF NOT EXISTS idx_debug_logs_level ON debug_logs(level)",
|
||||
)
|
||||
self.provider.execute(
|
||||
"CREATE INDEX IF NOT EXISTS idx_debug_logs_anomaly ON debug_logs(is_anomaly)",
|
||||
)
|
||||
|
||||
if current_version < 34:
|
||||
# Add updated_at to crawl_tasks
|
||||
try:
|
||||
self.provider.execute(
|
||||
"ALTER TABLE crawl_tasks ADD COLUMN updated_at DATETIME DEFAULT CURRENT_TIMESTAMP",
|
||||
)
|
||||
except Exception: # noqa: S110
|
||||
pass
|
||||
|
||||
# Update version in config
|
||||
self.provider.execute(
|
||||
"""
|
||||
|
||||
@@ -74,7 +74,9 @@ class IdentityContext:
|
||||
self.community_interfaces_manager = None
|
||||
self.local_lxmf_destination = None
|
||||
self.announce_handlers = []
|
||||
self.integrity_manager = IntegrityManager(self.storage_path, self.database_path)
|
||||
self.integrity_manager = IntegrityManager(
|
||||
self.storage_path, self.database_path, self.identity_hash
|
||||
)
|
||||
|
||||
self.running = False
|
||||
|
||||
@@ -134,7 +136,9 @@ class IdentityContext:
|
||||
self.config,
|
||||
self.app.get_public_path(),
|
||||
project_root=os.path.dirname(
|
||||
os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||
os.path.dirname(
|
||||
os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||
)
|
||||
),
|
||||
storage_dir=self.storage_path,
|
||||
)
|
||||
|
||||
@@ -8,9 +8,10 @@ from datetime import UTC, datetime
|
||||
class IntegrityManager:
|
||||
"""Manages the integrity of the database and identity files at rest."""
|
||||
|
||||
def __init__(self, storage_dir, database_path):
|
||||
def __init__(self, storage_dir, database_path, identity_hash=None):
|
||||
self.storage_dir = Path(storage_dir)
|
||||
self.database_path = Path(database_path)
|
||||
self.identity_hash = identity_hash
|
||||
self.manifest_path = self.storage_dir / "integrity-manifest.json"
|
||||
self.issues = []
|
||||
|
||||
@@ -64,6 +65,16 @@ class IntegrityManager:
|
||||
# New files are also a concern for integrity
|
||||
issues.append(f"New file detected: {rel_path}")
|
||||
|
||||
if issues:
|
||||
m_date = manifest.get("date", "Unknown")
|
||||
m_time = manifest.get("time", "Unknown")
|
||||
m_id = manifest.get("identity", "Unknown")
|
||||
issues.insert(0, f"Last integrity snapshot: {m_date} {m_time} (Identity: {m_id})")
|
||||
|
||||
# Check if identity matches
|
||||
if self.identity_hash and m_id != "Unknown" and self.identity_hash != m_id:
|
||||
issues.append(f"Identity mismatch! Manifest belongs to: {m_id}")
|
||||
|
||||
self.issues = issues
|
||||
return len(issues) == 0, issues
|
||||
except Exception as e:
|
||||
@@ -89,9 +100,13 @@ class IntegrityManager:
|
||||
rel_path = str(full_path.relative_to(self.storage_dir))
|
||||
files[rel_path] = self._hash_file(full_path)
|
||||
|
||||
now = datetime.now(UTC)
|
||||
manifest = {
|
||||
"version": 1,
|
||||
"timestamp": datetime.now(UTC).timestamp(),
|
||||
"timestamp": now.timestamp(),
|
||||
"date": now.strftime("%Y-%m-%d"),
|
||||
"time": now.strftime("%H:%M:%S"),
|
||||
"identity": self.identity_hash,
|
||||
"files": files,
|
||||
}
|
||||
|
||||
|
||||
165
meshchatx/src/backend/persistent_log_handler.py
Normal file
165
meshchatx/src/backend/persistent_log_handler.py
Normal file
@@ -0,0 +1,165 @@
|
||||
import collections
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
from datetime import UTC, datetime
|
||||
|
||||
|
||||
class PersistentLogHandler(logging.Handler):
|
||||
def __init__(self, database=None, capacity=5000, flush_interval=5):
|
||||
super().__init__()
|
||||
self.database = database
|
||||
self.logs_buffer = collections.deque(maxlen=capacity)
|
||||
self.flush_interval = flush_interval
|
||||
self.last_flush_time = time.time()
|
||||
self.lock = threading.RLock()
|
||||
self.flush_lock = threading.Lock()
|
||||
|
||||
# Anomaly detection state
|
||||
self.recent_messages = collections.deque(maxlen=100)
|
||||
self.flooding_threshold = 20 # messages per second
|
||||
self.repeat_threshold = 5 # identical messages in a row
|
||||
self.message_counts = collections.defaultdict(int)
|
||||
self.last_reset_time = time.time()
|
||||
|
||||
def set_database(self, database):
|
||||
with self.lock:
|
||||
self.database = database
|
||||
|
||||
def emit(self, record):
|
||||
try:
|
||||
msg = self.format(record)
|
||||
timestamp = datetime.now(UTC).timestamp()
|
||||
|
||||
is_anomaly, anomaly_type = self._detect_anomaly(record, msg, timestamp)
|
||||
|
||||
log_entry = {
|
||||
"timestamp": timestamp,
|
||||
"level": record.levelname,
|
||||
"module": record.module,
|
||||
"message": msg,
|
||||
"is_anomaly": 1 if is_anomaly else 0,
|
||||
"anomaly_type": anomaly_type,
|
||||
}
|
||||
|
||||
with self.lock:
|
||||
self.logs_buffer.append(log_entry)
|
||||
|
||||
# Periodically flush to database if available
|
||||
if self.database and (time.time() - self.last_flush_time > self.flush_interval):
|
||||
self._flush_to_db()
|
||||
|
||||
except Exception:
|
||||
self.handleError(record)
|
||||
|
||||
def _detect_anomaly(self, record, message, timestamp):
|
||||
now = time.time()
|
||||
|
||||
# 1. Detect Log Flooding
|
||||
if now - self.last_reset_time > 1.0:
|
||||
self.message_counts.clear()
|
||||
self.last_reset_time = now
|
||||
|
||||
self.message_counts["total"] += 1
|
||||
if self.message_counts["total"] > self.flooding_threshold:
|
||||
return True, "flooding"
|
||||
|
||||
# 2. Detect Repeats
|
||||
if len(self.recent_messages) > 0:
|
||||
repeat_count = 0
|
||||
for prev_msg in reversed(self.recent_messages):
|
||||
if prev_msg == message:
|
||||
repeat_count += 1
|
||||
else:
|
||||
break
|
||||
|
||||
if repeat_count >= self.repeat_threshold:
|
||||
return True, "repeat"
|
||||
|
||||
self.recent_messages.append(message)
|
||||
return False, None
|
||||
|
||||
def _flush_to_db(self):
|
||||
if not self.database:
|
||||
return
|
||||
|
||||
# Ensure only one thread flushes at a time
|
||||
if not self.flush_lock.acquire(blocking=False):
|
||||
return
|
||||
|
||||
try:
|
||||
items_to_flush = []
|
||||
with self.lock:
|
||||
while self.logs_buffer:
|
||||
items_to_flush.append(self.logs_buffer.popleft())
|
||||
|
||||
if not items_to_flush:
|
||||
return
|
||||
|
||||
# Batch insert for speed
|
||||
for entry in items_to_flush:
|
||||
try:
|
||||
self.database.debug_logs.insert_log(
|
||||
level=entry["level"],
|
||||
module=entry["module"],
|
||||
message=entry["message"],
|
||||
is_anomaly=entry["is_anomaly"],
|
||||
anomaly_type=entry["anomaly_type"]
|
||||
)
|
||||
except Exception as e:
|
||||
print(f"Error inserting log: {e}")
|
||||
|
||||
# Periodic cleanup of old logs (only every 100 flushes or similar?
|
||||
# for now let's just keep it here but it should be fast)
|
||||
try:
|
||||
self.database.debug_logs.cleanup_old_logs()
|
||||
except Exception as e:
|
||||
print(f"Error cleaning up logs: {e}")
|
||||
|
||||
self.last_flush_time = time.time()
|
||||
except Exception as e:
|
||||
print(f"Failed to flush logs to database: {e}")
|
||||
finally:
|
||||
self.flush_lock.release()
|
||||
|
||||
def get_logs(self, limit=100, offset=0, search=None, level=None, module=None, is_anomaly=None):
|
||||
if self.database:
|
||||
# Flush current buffer first to ensure we have latest logs
|
||||
self._flush_to_db()
|
||||
|
||||
with self.lock:
|
||||
if self.database:
|
||||
return self.database.debug_logs.get_logs(
|
||||
limit=limit,
|
||||
offset=offset,
|
||||
search=search,
|
||||
level=level,
|
||||
module=module,
|
||||
is_anomaly=is_anomaly
|
||||
)
|
||||
else:
|
||||
# Fallback to in-memory buffer if DB not yet available
|
||||
logs = list(self.logs_buffer)
|
||||
if search:
|
||||
logs = [l for l in logs if search.lower() in l["message"].lower() or search.lower() in l["module"].lower()]
|
||||
if level:
|
||||
logs = [l for l in logs if l["level"] == level]
|
||||
if is_anomaly is not None:
|
||||
logs = [l for l in logs if l["is_anomaly"] == (1 if is_anomaly else 0)]
|
||||
|
||||
# Sort descending
|
||||
logs.sort(key=lambda x: x["timestamp"], reverse=True)
|
||||
return logs[offset : offset + limit]
|
||||
|
||||
def get_total_count(self, search=None, level=None, module=None, is_anomaly=None):
|
||||
with self.lock:
|
||||
if self.database:
|
||||
return self.database.debug_logs.get_total_count(
|
||||
search=search,
|
||||
level=level,
|
||||
module=module,
|
||||
is_anomaly=is_anomaly
|
||||
)
|
||||
else:
|
||||
return len(self.logs_buffer)
|
||||
|
||||
Reference in New Issue
Block a user