Created web frontend launched via --web flag (#1967)

Author: overcuriousity 
Co-authored-by: Soxoj <soxoj@protonmail.com>
This commit is contained in:
overcuriousity
2024-12-16 14:24:03 +01:00
committed by GitHub
parent cb01535565
commit 88d68490f3
13 changed files with 530 additions and 13 deletions

2
.gitignore vendored
View File

@@ -42,4 +42,4 @@ settings.json
# other
*.egg-info
build
build

View File

@@ -1,7 +1,7 @@
LINT_FILES=maigret wizard.py tests
test:
coverage run --source=./maigret -m pytest tests
coverage run --source=./maigret,./maigret/web -m pytest tests
coverage report -m
coverage html

View File

@@ -324,7 +324,15 @@ def setup_arguments_parser(settings: Settings):
default=False,
help="Show database statistics (most frequent sites engines and tags).",
)
modes_group.add_argument(
"--web",
metavar='PORT',
type=int,
nargs='?', # Optional PORT value
const=5000, # Default PORT if `--web` is provided without a value
default=None, # Explicitly set default to None
help="Launch the web interface on the specified port (default: 5000 if no PORT is provided).",
)
output_group = parser.add_argument_group(
'Output options', 'Options to change verbosity and view of the console output'
)
@@ -485,6 +493,13 @@ async def main():
log_level = logging.WARNING
logger.setLevel(log_level)
if args.web is not None:
from maigret.web.app import app
port = args.web if args.web else 5000 # args.web is either the specified port or 5000 by default
app.run(port=port)
return
# Usernames initial list
usernames = {
u: args.id_type

View File

@@ -53,5 +53,6 @@
"xmind_report": false,
"graph_report": false,
"pdf_report": false,
"html_report": false
"html_report": false,
"web_interface_port": 5000
}

View File

@@ -42,6 +42,7 @@ class Settings:
pdf_report: bool
html_report: bool
graph_report: bool
web_interface_port: int
# submit mode settings
presence_strings: list

280
maigret/web/app.py Normal file
View File

@@ -0,0 +1,280 @@
# app.py
from flask import (
Flask,
render_template,
request,
send_file,
Response,
flash,
redirect,
url_for,
)
import logging
import os
import asyncio
from datetime import datetime
from threading import Thread
import maigret
import maigret.settings
from maigret.sites import MaigretDatabase
from maigret.report import generate_report_context
app = Flask(__name__)
app.secret_key = 'your-secret-key-here'
# Add background job tracking
background_jobs = {}
job_results = {}
# Configuration
MAIGRET_DB_FILE = os.path.join('maigret', 'resources', 'data.json')
COOKIES_FILE = "cookies.txt"
UPLOAD_FOLDER = 'uploads'
REPORTS_FOLDER = os.path.abspath('/tmp/maigret_reports')
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
os.makedirs(REPORTS_FOLDER, exist_ok=True)
def setup_logger(log_level, name):
logger = logging.getLogger(name)
logger.setLevel(log_level)
return logger
async def maigret_search(username, options):
logger = setup_logger(logging.WARNING, 'maigret')
try:
db = MaigretDatabase().load_from_path(MAIGRET_DB_FILE)
sites = db.ranked_sites_dict(top=int(options.get('top_sites', 500)))
results = await maigret.search(
username=username,
site_dict=sites,
timeout=int(options.get('timeout', 30)),
logger=logger,
id_type=options.get('id_type', 'username'),
cookies=COOKIES_FILE if options.get('use_cookies') else None,
)
return results
except Exception as e:
logger.error(f"Error during search: {str(e)}")
raise
async def search_multiple_usernames(usernames, options):
results = []
for username in usernames:
try:
search_results = await maigret_search(username.strip(), options)
results.append((username.strip(), options['id_type'], search_results))
except Exception as e:
logging.error(f"Error searching username {username}: {str(e)}")
return results
def process_search_task(usernames, options, timestamp):
try:
# Setup event loop for async operations
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# Run the search
general_results = loop.run_until_complete(
search_multiple_usernames(usernames, options)
)
# Create session folder
session_folder = os.path.join(REPORTS_FOLDER, f"search_{timestamp}")
os.makedirs(session_folder, exist_ok=True)
# Save the combined graph
graph_path = os.path.join(session_folder, "combined_graph.html")
maigret.report.save_graph_report(
graph_path,
general_results,
MaigretDatabase().load_from_path(MAIGRET_DB_FILE),
)
# Save individual reports
individual_reports = []
for username, id_type, results in general_results:
report_base = os.path.join(session_folder, f"report_{username}")
csv_path = f"{report_base}.csv"
json_path = f"{report_base}.json"
pdf_path = f"{report_base}.pdf"
html_path = f"{report_base}.html"
context = generate_report_context(general_results)
maigret.report.save_csv_report(csv_path, username, results)
maigret.report.save_json_report(
json_path, username, results, report_type='ndjson'
)
maigret.report.save_pdf_report(pdf_path, context)
maigret.report.save_html_report(html_path, context)
claimed_profiles = []
for site_name, site_data in results.items():
if (
site_data.get('status')
and site_data['status'].status
== maigret.result.MaigretCheckStatus.CLAIMED
):
claimed_profiles.append(
{
'site_name': site_name,
'url': site_data.get('url_user', ''),
'tags': (
site_data.get('status').tags
if site_data.get('status')
else []
),
}
)
individual_reports.append(
{
'username': username,
'csv_file': os.path.join(
f"search_{timestamp}", f"report_{username}.csv"
),
'json_file': os.path.join(
f"search_{timestamp}", f"report_{username}.json"
),
'pdf_file': os.path.join(
f"search_{timestamp}", f"report_{username}.pdf"
),
'html_file': os.path.join(
f"search_{timestamp}", f"report_{username}.html"
),
'claimed_profiles': claimed_profiles,
}
)
# Save results and mark job as complete
job_results[timestamp] = {
'status': 'completed',
'session_folder': f"search_{timestamp}",
'graph_file': os.path.join(f"search_{timestamp}", "combined_graph.html"),
'usernames': usernames,
'individual_reports': individual_reports,
}
except Exception as e:
job_results[timestamp] = {'status': 'failed', 'error': str(e)}
finally:
background_jobs[timestamp]['completed'] = True
@app.route('/')
def index():
return render_template('index.html')
@app.route('/search', methods=['POST'])
def search():
usernames_input = request.form.get('usernames', '').strip()
if not usernames_input:
flash('At least one username is required', 'danger')
return redirect(url_for('index'))
usernames = [
u.strip() for u in usernames_input.replace(',', ' ').split() if u.strip()
]
# Create timestamp for this search session
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
logging.info(f"Starting search for usernames: {usernames}")
options = {
'top_sites': request.form.get('top_sites', '500'),
'timeout': request.form.get('timeout', '30'),
'id_type': 'username', # fixed as username
'use_cookies': 'use_cookies' in request.form,
}
# Start background job
background_jobs[timestamp] = {
'completed': False,
'thread': Thread(
target=process_search_task, args=(usernames, options, timestamp)
),
}
background_jobs[timestamp]['thread'].start()
logging.info(f"Search job started with timestamp: {timestamp}")
# Redirect to status page
return redirect(url_for('status', timestamp=timestamp))
@app.route('/status/<timestamp>')
def status(timestamp):
logging.info(f"Status check for timestamp: {timestamp}")
# Validate timestamp
if timestamp not in background_jobs:
flash('Invalid search session', 'danger')
return redirect(url_for('index'))
# Check if job is completed
if background_jobs[timestamp]['completed']:
result = job_results.get(timestamp)
if not result:
flash('No results found for this search session', 'warning')
return redirect(url_for('index'))
if result['status'] == 'completed':
# Redirect to results page once done
return redirect(url_for('results', session_id=result['session_folder']))
else:
error_msg = result.get('error', 'Unknown error occurred')
flash(f'Search failed: {error_msg}', 'danger')
return redirect(url_for('index'))
# If job is still running, show status page with a simple spinner
return render_template('status.html', timestamp=timestamp)
@app.route('/results/<session_id>')
def results(session_id):
if not session_id.startswith('search_'):
flash('Invalid results session format', 'danger')
return redirect(url_for('index'))
result_data = next(
(
r
for r in job_results.values()
if r.get('status') == 'completed' and r['session_folder'] == session_id
),
None,
)
return render_template(
'results.html',
usernames=result_data['usernames'],
graph_file=result_data['graph_file'],
individual_reports=result_data['individual_reports'],
timestamp=session_id.replace('search_', ''),
)
@app.route('/reports/<path:filename>')
def download_report(filename):
try:
file_path = os.path.join(REPORTS_FOLDER, filename)
return send_file(file_path)
except Exception as e:
logging.error(f"Error serving file {filename}: {str(e)}")
return "File not found", 404
if __name__ == '__main__':
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
)
app.run(debug=True)

View File

@@ -0,0 +1,44 @@
<!-- templates/base.html -->
<!DOCTYPE html>
<html lang="en" data-bs-theme="dark">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Maigret Web Interface</title>
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0/dist/css/bootstrap.min.css" rel="stylesheet">
<style>
body {
padding-top: 2rem;
}
.form-container {
max-width: auto;
margin: auto;
}
[data-bs-theme="dark"] {
--bs-body-bg: #212529;
--bs-body-color: #dee2e6;
}
</style>
</head>
<body>
<div class="container">
<div class="mb-3">
<button class="btn btn-outline-secondary" id="theme-toggle">
Toggle Dark/Light Mode
</button>
</div>
{% block content %}{% endblock %}
</div>
<script src="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0/dist/js/bootstrap.bundle.min.js"></script>
<script>
document.getElementById('theme-toggle').addEventListener('click', function() {
const html = document.documentElement;
if (html.getAttribute('data-bs-theme') === 'dark') {
html.setAttribute('data-bs-theme', 'light');
} else {
html.setAttribute('data-bs-theme', 'dark');
}
});
</script>
</body>
</html>

View File

@@ -0,0 +1,35 @@
{% extends "base.html" %}
{% block content %}
<div class="form-container">
<h1 class="mb-4">Maigret Web Interface</h1>
{% if error %}
<div class="alert alert-danger">{{ error }}</div>
{% endif %}
<form method="POST" action="{{ url_for('search') }}" class="mb-4">
<div class="mb-3">
<label for="usernames" class="form-label">Usernames to Search</label>
<textarea class="form-control" id="usernames" name="usernames" rows="3" required
placeholder="Enter one or more usernames (separated by spaces or commas)"></textarea>
</div>
<div class="mb-3">
<label for="top_sites" class="form-label">Number of Top Sites to Check</label>
<input type="number" class="form-control" id="top_sites" name="top_sites" value="500" min="1" max="10000">
</div>
<div class="mb-3">
<label for="timeout" class="form-label">Timeout (seconds)</label>
<input type="number" class="form-control" id="timeout" name="timeout" value="30" min="1" max="120">
</div>
<div class="mb-3 form-check">
<input type="checkbox" class="form-check-input" id="use_cookies" name="use_cookies">
<label class="form-check-label" for="use_cookies">Use Cookies File</label>
</div>
<button type="submit" class="btn btn-primary">Search</button>
</form>
</div>
{% endblock %}

View File

@@ -0,0 +1,56 @@
{% extends "base.html" %}
{% block content %}
<div class="form-container">
<h1 class="mb-4">Search Results</h1>
{% with messages = get_flashed_messages() %}
{% if messages %}
{% for message in messages %}
<div class="alert alert-info">{{ message }}</div>
{% endfor %}
{% endif %}
{% endwith %}
<p>The search has completed. Below are the results:</p>
<!-- Display the combined graph if available -->
{% if graph_file %}
<h3>Combined Graph</h3>
<iframe src="{{ url_for('download_report', filename=graph_file) }}" style="width:100%; height:600px; border:none;"></iframe>
{% endif %}
<hr>
<!-- Display individual reports -->
{% if individual_reports %}
<h3>Individual Reports</h3>
<ul class="list-group">
{% for report in individual_reports %}
<li class="list-group-item">
<h5>{{ report.username }}</h5>
<p>
<a href="{{ url_for('download_report', filename=report.csv_file) }}">CSV Report</a> |
<a href="{{ url_for('download_report', filename=report.json_file) }}">JSON Report</a> |
<a href="{{ url_for('download_report', filename=report.pdf_file) }}">PDF Report</a> |
<a href="{{ url_for('download_report', filename=report.html_file) }}">HTML Report</a>
</p>
{% if report.claimed_profiles %}
<strong>Claimed Profiles:</strong>
<ul>
{% for profile in report.claimed_profiles %}
<li>
<a href="{{ profile.url }}" target="_blank">{{ profile.site_name }}</a> (Tags: {{ profile.tags|join(', ') }})
</li>
{% endfor %}
</ul>
{% else %}
<p>No claimed profiles found.</p>
{% endif %}
</li>
{% endfor %}
</ul>
{% else %}
<p>No individual reports available.</p>
{% endif %}
</div>
{% endblock %}

View File

@@ -0,0 +1,16 @@
{% extends "base.html" %}
{% block content %}
<div class="container mt-4 text-center">
<h2>Search in progress...</h2>
<p>Your request is being processed in the background. This page will automatically redirect once the results are ready.</p>
<div class="spinner-border text-primary" role="status">
<span class="visually-hidden">Loading...</span>
</div>
<script>
// Auto-refresh the page every 5 seconds to check completion
setTimeout(function() {
window.location.reload();
}, 5000);
</script>
</div>
{% endblock %}

72
poetry.lock generated
View File

@@ -151,13 +151,13 @@ python-socks = {version = ">=2.4.3,<3.0.0", extras = ["asyncio"]}
[[package]]
name = "aiosignal"
version = "1.3.1"
version = "1.3.2"
description = "aiosignal: a list of registered asynchronous callbacks"
optional = false
python-versions = ">=3.7"
python-versions = ">=3.9"
files = [
{file = "aiosignal-1.3.1-py3-none-any.whl", hash = "sha256:f8376fb07dd1e86a584e4fcdec80b36b7f81aac666ebc724e2c090300dd83b17"},
{file = "aiosignal-1.3.1.tar.gz", hash = "sha256:54cd96e15e1649b75d6c87526a6ff0b6c1b0dd3459f43d9ca11d48c339b68cfc"},
{file = "aiosignal-1.3.2-py2.py3-none-any.whl", hash = "sha256:45cde58e409a301715980c2b01d0c28bdde3770d8290b5eb2173759d9acb31a5"},
{file = "aiosignal-1.3.2.tar.gz", hash = "sha256:a8c255c66fafb1e499c9351d0bf32ff2d8a0321595ebac3b93713656d2436f54"},
]
[package.dependencies]
@@ -192,6 +192,23 @@ files = [
[package.extras]
with-fonttools = ["fonttools (>=4.0)"]
[[package]]
name = "asgiref"
version = "3.8.1"
description = "ASGI specs, helper code, and adapters"
optional = false
python-versions = ">=3.8"
files = [
{file = "asgiref-3.8.1-py3-none-any.whl", hash = "sha256:3e1e3ecc849832fe52ccf2cb6686b7a55f82bb1d6aee72a58826471390335e47"},
{file = "asgiref-3.8.1.tar.gz", hash = "sha256:c343bd80a0bec947a9860adb4c432ffa7db769836c64238fc34bdc3fec84d590"},
]
[package.dependencies]
typing-extensions = {version = ">=4", markers = "python_version < \"3.11\""}
[package.extras]
tests = ["mypy (>=0.800)", "pytest", "pytest-asyncio"]
[[package]]
name = "asn1crypto"
version = "1.5.1"
@@ -315,6 +332,17 @@ d = ["aiohttp (>=3.10)"]
jupyter = ["ipython (>=7.8.0)", "tokenize-rt (>=3.2.0)"]
uvloop = ["uvloop (>=0.15.2)"]
[[package]]
name = "blinker"
version = "1.9.0"
description = "Fast, simple object-to-object and broadcast signaling"
optional = false
python-versions = ">=3.9"
files = [
{file = "blinker-1.9.0-py3-none-any.whl", hash = "sha256:ba0efaa9080b619ff2f3459d1d500c57bddea4a6b424b60a91141db6fd2f08bc"},
{file = "blinker-1.9.0.tar.gz", hash = "sha256:b4ce2265a7abece45e7cc896e98dbebe6cead56bcf805a3d23136d145f5445bf"},
]
[[package]]
name = "certifi"
version = "2024.8.30"
@@ -771,6 +799,29 @@ mccabe = ">=0.7.0,<0.8.0"
pycodestyle = ">=2.12.0,<2.13.0"
pyflakes = ">=3.2.0,<3.3.0"
[[package]]
name = "flask"
version = "3.1.0"
description = "A simple framework for building complex web applications."
optional = false
python-versions = ">=3.9"
files = [
{file = "flask-3.1.0-py3-none-any.whl", hash = "sha256:d667207822eb83f1c4b50949b1623c8fc8d51f2341d65f72e1a1815397551136"},
{file = "flask-3.1.0.tar.gz", hash = "sha256:5f873c5184c897c8d9d1b05df1e3d01b14910ce69607a117bd3277098a5836ac"},
]
[package.dependencies]
asgiref = {version = ">=3.2", optional = true, markers = "extra == \"async\""}
blinker = ">=1.9"
click = ">=8.1.3"
itsdangerous = ">=2.2"
Jinja2 = ">=3.1.2"
Werkzeug = ">=3.1"
[package.extras]
async = ["asgiref (>=3.2)"]
dotenv = ["python-dotenv"]
[[package]]
name = "frozenlist"
version = "1.5.0"
@@ -997,6 +1048,17 @@ qtconsole = ["qtconsole"]
test = ["packaging", "pickleshare", "pytest", "pytest-asyncio (<0.22)", "testpath"]
test-extra = ["curio", "ipython[test]", "matplotlib (!=3.2.0)", "nbformat", "numpy (>=1.23)", "pandas", "trio"]
[[package]]
name = "itsdangerous"
version = "2.2.0"
description = "Safely pass data to untrusted environments and back."
optional = false
python-versions = ">=3.8"
files = [
{file = "itsdangerous-2.2.0-py3-none-any.whl", hash = "sha256:c6242fc49e35958c8b15141343aa660db5fc54d4f13a1db01a3f5891b98700ef"},
{file = "itsdangerous-2.2.0.tar.gz", hash = "sha256:e0050c0b7da1eea53ffaf149c0cfbb5c6e2e2b69c4bef22c81fa6eb73e5f6173"},
]
[[package]]
name = "jedi"
version = "0.19.2"
@@ -2950,4 +3012,4 @@ propcache = ">=0.2.0"
[metadata]
lock-version = "2.0"
python-versions = "^3.10"
content-hash = "8074573cbda8b96a0c5e85c5ab04b5f1a62a6e84dffbad5fd7a1c4cdff8a0a82"
content-hash = "b25ba6ce790999bbdbd4e6892dd56c84d359684824b49c4f0dd882e1dcbedc0d"

View File

@@ -70,6 +70,8 @@ networkx = "^2.6.3"
pyvis = "^0.3.2"
reportlab = "^4.2.0"
cloudscraper = "^1.2.71"
flask = {extras = ["async"], version = "^3.1.0"}
asgiref = "^3.8.1"
platformdirs = "^4.3.6"

View File

@@ -42,6 +42,7 @@ DEFAULT_ARGS: Dict[str, Any] = {
'use_disabled_sites': False,
'username': [],
'verbose': False,
'web': None,
'with_domains': False,
'xmind': False,
}
@@ -55,7 +56,8 @@ def test_args_search_mode(argparser):
want_args = dict(DEFAULT_ARGS)
want_args.update({'username': ['username']})
assert args == Namespace(**want_args)
for arg in vars(args):
assert getattr(args, arg) == want_args[arg]
def test_args_search_mode_several_usernames(argparser):
@@ -66,7 +68,8 @@ def test_args_search_mode_several_usernames(argparser):
want_args = dict(DEFAULT_ARGS)
want_args.update({'username': ['username1', 'username2']})
assert args == Namespace(**want_args)
for arg in vars(args):
assert getattr(args, arg) == want_args[arg]
def test_args_self_check_mode(argparser):
@@ -81,7 +84,8 @@ def test_args_self_check_mode(argparser):
}
)
assert args == Namespace(**want_args)
for arg in vars(args):
assert getattr(args, arg) == want_args[arg]
def test_args_multiple_sites(argparser):
@@ -97,4 +101,5 @@ def test_args_multiple_sites(argparser):
}
)
assert args == Namespace(**want_args)
for arg in vars(args):
assert getattr(args, arg) == want_args[arg]