Files
AutumnSpark1226 ef7c2ab41f refactor user_settings.mu
also add current identity with button
2025-08-02 23:22:28 +02:00

427 lines
21 KiB
Plaintext
Executable File

#!/usr/bin/env python3
# nomadForum - a forum on the NomadNetwork
# Copyright (C) 2023-2025 AutumnSpark1226
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import os
import sys
import string
from argon2.exceptions import VerificationError
from argon2 import PasswordHasher
from hmac import compare_digest
import main
import notify
def list_connections():
query_results = main.query_database(f"SELECT remote_id, allow_login, send_notifications, public, conn_id, verified FROM connections WHERE username = '{username}'")
for result in query_results:
print(f">>Identity: {main.decrypt(result[0])}")
if result[5] == 1:
allow_login, send_notifications, public = "", "", ""
if result[1] == 1:
allow_login = "|*"
if result[2] == 1:
send_notifications = "|*"
if result[3] == 1:
public = "|*"
print(f" `B444`<?|connection_allow_login_{result[4]}|yes{allow_login}`>`b Autologin")
print(f" `B444`<?|connection_notifications_{result[4]}|yes{send_notifications}`>`b Notifications")
print(f" `B444`<?|connection_public_{result[4]}|yes{public}`>`b List on profile")
else:
print(" This identity is not verified yet.")
print(f" Verification code: `B444`<8|verification_code`>`b `Ff22`_`[Submit`:{main.page_path}/user_settings.mu`*|action=verify_connection|value={result[4]}|source_link_id={link_id}]`_`f")
print(f" `Ff22`_`[Remove`:{main.page_path}/user_settings.mu`action=remove_connection|value={result[4]}|source_link_id={link_id}]`_`f")
print()
def print_fields():
print(f">Settings Username: {username}")
print()
print(">>Pofile")
profile_data = main.query_database(f"SELECT display_name, about, default_styling, show_online FROM users WHERE username = '{username}'")[0]
display_name = profile_data[0].replace(">", "\\>")
print(f" Display name: `B444`<display_name`{display_name}>`b")
about = profile_data[1].replace("None", "").replace(">", "\\>").replace("\n", "$newline$")
print(f" About: `B444`<about`{about}>`b")
print(" $newline$ creates a new line. You can just use enter and it will be automatically replaced.")
show_online = ""
if profile_data[3] == 1:
show_online = "|*"
print(f" `B444`<?|show_online_status|yes{show_online}`>`b Show online status")
print(f" `Ff22`_`[Visit your profile`:{main.page_path}/profile.mu`username={username}]`_`f")
print()
print(">>Default post styling")
default_styling = profile_data[2].replace("None", "").replace(">", "\\>").replace("\n", "$newline$")
print(f" Default post styling: `B444`<default_styling`{default_styling}>`b")
print(" $newline$ creates a new line. You can just use enter and it will be automatically replaced.")
print()
list_connections()
print(">")
print(f"`Ff22`_`[Save`:{main.page_path}/user_settings.mu`*|action=save|source_link_id={link_id}]`_`f")
print()
print(">>Password")
if compare_digest(main.decrypt(main.query_database(f"SELECT password FROM users WHERE username = '{username}'")[0][0]), "$nopassword$"):
print(" Disabled")
print(" Set new password: `B444`<!|password`>`b")
print(" Confirm password: `B444`<!|password_confirm`>`b")
main.execute_sql(f"DELETE FROM verification_codes WHERE use_type = 'enable_password' AND username = '{username}'")
verification_code = main.get_verification_code()
main.execute_sql(f"INSERT INTO verification_codes (username, code, use_type, use_id, creation_time) VALUES ('{username}', '{main.encrypt(verification_code)}', 'enable_password', '{link_id}', unixepoch())")
print(f" Confirm this action by typing: '{verification_code}'")
print(f" Code: `B444`<8|confirmation_enable_password`>`b `Ff22`_`[Enable password`:{main.page_path}/user_settings.mu`*|action=enable_password|source_link_id={link_id}]`_`f")
else:
print(f" Enabled")
print(" Current password: `B444`<!|password_current`>`b")
print(" Update password: `B444`<!|password`>`b")
print(" Confirm password: `B444`<!|password_confirm`>`b")
print(f" `Ff22`_`[Change password`:{main.page_path}/user_settings.mu`*|action=change_password|source_link_id={link_id}]`_`f")
print(f" `Ff22`_`[Disable password`:{main.page_path}/user_settings.mu`*|action=disable_password|source_link_id={link_id}]`_`f (Your current password is required.)")
print()
print(">>Other actions")
if remote_identity != "":
print(f" `Ff22`_`[Add your current identity`:{main.page_path}/user_settings.mu`action=add_current_id|source_link_id={link_id}]`_`f")
if main.notifications_enabled:
print(f" Add identity: `B444`<33|identity`>`b `Ff22`_`[Submit`:{main.page_path}/user_settings.mu`*|action=add_identity|source_link_id={link_id}]`_`f")
print(" Enter the identity and NOT the LXMF address here.")
print(f" `Ff22`_`[Subscription settings`:{main.page_path}/subscription_settings.mu]`_`f")
print(f" `Ff22`_`[Delete your account`:{main.page_path}/delete_account.mu]`_`f")
def remove_connection():
if not value.isdigit():
print("something went wrong...")
main.close_database(write_changes=False)
sys.exit(0)
query = main.query_database(f"SELECT allow_login FROM connections WHERE conn_id = {value} AND username = '{username}'")
if len(query) != 1:
print("something went wrong...")
print(f"`Ff22`_`[Reload`:{main.page_path}/user_settings.mu]`_`f")
main.close_database()
sys.exit(0)
elif query[0][0] == 1:
print(">This identity is used for logging in. You can't remove it. (Disable autologin first.)")
print()
print_fields()
elif query[0][0] == 0:
main.execute_sql(f"DELETE FROM connections WHERE conn_id = {value} AND allow_login = 0 AND username = '{username}'")
print(">The identity has been removed.")
print()
print_fields()
else:
print("something went wrong...")
main.close_database(write_changes=False)
sys.exit(0)
def disable_password():
query_result = main.query_database(f"SELECT conn_id, remote_id FROM connections WHERE username = '{username}' AND allow_login = 1 AND verified = 1")
remote_id_found = False
for data in query_result:
if main.decrypt(data[1]) == remote_identity:
remote_id_found = True
break
if remote_id_found:
hasher = PasswordHasher()
hashed_password = main.decrypt(main.query_database(f"SELECT password FROM users WHERE username = '{username}'")[0][0])
try:
hasher.verify(hashed_password, password_current)
except VerificationError:
print(">You entered a wrong password.\n")
print_fields()
main.close_database(write_changes=False)
exit(0)
main.execute_sql("UPDATE users SET password = '" + main.encrypt("$nopassword$") + f"' WHERE link_id = '{link_id}' AND username = '{username}'")
print(">Your password has been disabled.")
print()
print_fields()
else:
print(">Disabling your password would lock you out of your account and is not permitted.")
print()
print_fields()
def enable_password():
query_result = main.query_database(f"SELECT code FROM verification_codes WHERE use_type = 'enable_password' AND use_id = '{link_id}' AND username = '{username}'")
if len(query_result) != 1:
print(">Verification error\n")
print_fields()
main.close_database(write_changes=False)
exit(0)
else:
verification_code = main.decrypt(query_result[0][0])
if confirmation_enable_password != verification_code:
print(">Verification error\n")
print_fields()
main.close_database()
exit(0)
main.execute_sql(f"DELETE FROM verification_codes WHERE use_type = 'enable_password' AND use_id = '{link_id}' AND username = '{username}'")
query_result = main.query_database("SELECT username, remote_id FROM connections WHERE allow_login = 1")
remote_id_found = False
for data in query_result:
if main.decrypt(data[1]) == remote_identity and data[0] == username:
remote_id_found = True
break
if remote_id_found:
if len(password) < 8:
print(">Your password must be at least 8 characters long.\n")
print_fields()
main.close_database()
sys.exit(0)
elif len(password) > 512 or len(password_confirm) > 512:
print(">Long passwords are good, but that is just too long.\n")
print_fields()
main.close_database()
sys.exit(0)
hasher = PasswordHasher()
hashed_password = hasher.hash(password)
try:
hasher.verify(hashed_password, password_confirm)
except VerificationError:
print(">The entered passwords do not match.\n")
print_fields()
main.close_database(write_changes=False)
sys.exit(0)
prepared_password = main.encrypt(hashed_password)
main.execute_sql(f"UPDATE users SET password = '{prepared_password}' WHERE link_id = '{link_id}' AND username = '{username}'")
print(">Your password has been enabled.")
print()
print_fields()
else:
print(">Verification failed")
print()
print_fields()
def change_password():
if len(password) < 8:
print(">Your password must be at least 8 characters long.\n")
print_fields()
main.close_database(write_changes=False)
sys.exit(0)
elif len(password) > 512 or len(password_confirm) > 512 or len(password_current) > 512:
print(">Long passwords are good, but that is just too long.\n")
print_fields()
main.close_database()
sys.exit(0)
hasher = PasswordHasher()
hashed_password = main.decrypt(main.query_database(f"SELECT password FROM users WHERE username = '{username}'")[0][0])
try:
hasher.verify(hashed_password, password_current)
except VerificationError:
print(">You entered a wrong password.\n")
print_fields()
main.close_database(write_changes=False)
exit(0)
hasher = PasswordHasher()
hashed_password = hasher.hash(password)
try:
hasher.verify(hashed_password, password_confirm)
except VerificationError:
print(">The entered passwords do not match.\n")
print_fields()
main.close_database(write_changes=False)
sys.exit(0)
prepared_password = main.encrypt(hashed_password)
main.execute_sql(f"UPDATE users SET password = '{prepared_password}' WHERE link_id = '{link_id}' AND username = '{username}'")
print(">Your password has been changed.")
print()
print_fields()
def verify_connection():
if len(verification_code) == 7 and verification_code.replace("-", "").isnumeric() and verification_code[3] == "-" and value.isdigit():
query_result = main.query_database(f"SELECT remote_id FROM connections WHERE conn_id = {value}")
if len(query_result) == 1:
requested_remote_id = main.decrypt(query_result[0][0])
possible_remote_ids = main.query_database(f"SELECT code_id, code, use_id FROM verification_codes WHERE username = '{username}' AND use_type = 'add_lxmf' AND try_counter < 3")
for verified_remote_id in possible_remote_ids:
if main.decrypt(verified_remote_id[2]) == requested_remote_id:
if main.decrypt(verified_remote_id[1]) == verification_code:
main.execute_sql(f"UPDATE connections SET verified = 1 WHERE username = '{username}' AND conn_id = {value}")
main.execute_sql(f"DELETE FROM verification_codes WHERE code_id = '{verified_remote_id[0]}'")
print(">Connection verified\n")
else:
main.execute_sql(f"UPDATE verification_codes SET try_counter = (try_counter + 1) WHERE code_id = '{verified_remote_id[0]}'")
main.purge()
print(">Could not verify connection\n")
break
else:
print(">Could not verify connection\n")
else:
print(">Could not verify connection\n")
print_fields()
def add_connection():
if len(identity) == 32 and set(identity).issubset(set(string.hexdigits)):
query_result = main.query_database("SELECT remote_id FROM connections")
for data in query_result:
if identity == main.decrypt(data[0]):
print(">Could not add identity\n")
print_fields()
main.close_database()
exit(0)
main.generate_verification(username, "add_lxmf", main.encrypt(identity), identity)
main.execute_sql(f"INSERT INTO connections (username, remote_id, verified) VALUES ('{username}', '{main.encrypt(identity)}', 0)")
print(">A verification code has been sent. Please enter enter it to verify this identity.\n")
else:
print(">Could not add identity\n")
print_fields()
def save():
global display_name, about, default_styling
# set new online status
if show_online_status:
main.execute_sql(f"UPDATE users SET show_online = 1 WHERE username = '{username}'")
else:
main.execute_sql(f"UPDATE users SET show_online = 0 WHERE username = '{username}'")
# set new display name
if display_name == "" or len(display_name) > 256:
main.execute_sql(f"UPDATE users SET display_name = username WHERE username = '{username}'")
else:
display_name = main.prepare_display_name(display_name, username)
if display_name != main.query_database(f"SELECT display_name FROM users WHERE username = '{username}'")[0][0]:
if display_name == username or len(main.query_database(f"SELECT user_id FROM users WHERE username = '{main.remove_micron(display_name)}' OR display_name = '{display_name}'")) == 0:
main.execute_sql(f"UPDATE users SET display_name = '{display_name}' WHERE username = '{username}'")
else:
print(">This display name already exists\n")
# set new about page
if about == "" or len(about) >= 4096:
main.execute_sql(f"UPDATE users SET about = 'None' WHERE username = '{username}'")
else:
about = main.prepare_content(about)
main.execute_sql(f"UPDATE users SET about = '{about}' WHERE username = '{username}'")
# set new default styling
if default_styling == "" or len(default_styling) >= 256:
main.execute_sql(f"UPDATE users SET default_styling = 'None' WHERE username = '{username}'")
else:
default_styling = main.prepare_content(default_styling)
main.execute_sql(f"UPDATE users SET default_styling = '{default_styling}' WHERE username = '{username}'")
conn_ids = main.query_database(f"SELECT conn_id, verified FROM connections WHERE username = '{username}'")
for conn_id in conn_ids:
if conn_id[1] == 1:
current_id_allow_login = os.environ.get(f"field_connection_allow_login_{conn_id[0]}", default="no") == "yes"
current_id_send_notifications = os.environ.get(f"field_connection_notifications_{conn_id[0]}", default="no") == "yes"
current_id_public = os.environ.get(f"field_connection_public_{conn_id[0]}", default="no") == "yes"
if current_id_allow_login:
main.execute_sql(f"UPDATE connections SET allow_login = 1 WHERE conn_id = {conn_id[0]} AND allow_login = 0 AND username = '{username}' AND verified = 1")
else:
if compare_digest(main.decrypt(main.query_database(f"SELECT password FROM users WHERE username = '{username}'")[0][0]), "$nopassword$") and main.decrypt(main.query_database(f"SELECT remote_id FROM connections WHERE conn_id = {conn_id[0]}")[0][0]) == remote_identity:
print(">Deactivating autologin would lock you out of your account and is not permitted.")
print()
else:
main.execute_sql(f"UPDATE connections SET allow_login = 0 WHERE conn_id = {conn_id[0]} AND allow_login = 1 AND username = '{username}' AND verified = 1")
if main.notifications_enabled:
if current_id_send_notifications:
if 1 != main.query_database(f"SELECT send_notifications FROM connections WHERE conn_id = {conn_id[0]} AND username = '{username}'")[0][0]:
main.execute_sql(f"UPDATE connections SET send_notifications = 1 WHERE conn_id = {conn_id[0]} AND send_notifications = 0 AND username = '{username}' AND verified = 1")
identity = main.decrypt(main.query_database(f"SELECT remote_id FROM connections WHERE conn_id = {conn_id[0]}")[0][0])
try:
notify.add_notification([identity, "Notifications are now enabled on this address."])
except:
print(">Could not send notification.")
else:
main.execute_sql(f"UPDATE connections SET send_notifications = 0 WHERE conn_id = {conn_id[0]} AND send_notifications = 1 AND username = '{username}' AND verified = 1")
if current_id_public:
main.execute_sql(f"UPDATE connections SET public = 1 WHERE conn_id = {conn_id[0]} AND public = 0 AND username = '{username}' AND verified = 1")
else:
main.execute_sql(f"UPDATE connections SET public = 0 WHERE conn_id = {conn_id[0]} AND username = '{username}' AND verified = 1")
else:
print("something went wrong...")
main.close_database(write_changes=False)
sys.exit(0)
print(">Your settings have been saved.\n")
print_fields()
try:
link_id, remote_identity = main.handle_ids()
main.print_header(link_id, reload=True)
action = ""
value = ""
password = ""
password_confirm = ""
password_current = ""
display_name = ""
about = ""
verification_code = ""
identity = ""
default_styling = ""
confirmation_enable_password = ""
show_online_status = False
for env_variable in os.environ:
if env_variable == "var_action":
action = os.environ[env_variable]
elif env_variable == "var_value":
value = os.environ[env_variable]
elif env_variable == "field_password":
password = os.environ[env_variable]
elif env_variable == "field_password_confirm":
password_confirm = os.environ[env_variable]
elif env_variable == "field_password_current":
password_current = os.environ[env_variable]
elif env_variable == "field_display_name":
display_name = os.environ[env_variable]
elif env_variable == "field_about":
about = os.environ[env_variable]
elif env_variable == "field_verification_code":
verification_code = os.environ[env_variable]
elif env_variable == "field_identity":
identity = os.environ[env_variable]
elif env_variable == "field_default_styling":
default_styling = os.environ[env_variable]
elif env_variable == "field_confirmation_enable_password":
confirmation_enable_password = os.environ[env_variable]
elif env_variable == "field_show_online_status":
show_online_status = os.environ[env_variable] == "yes"
query_result = main.query_database(f"SELECT username FROM users WHERE link_id = '{link_id}'")
if len(query_result) != 1:
print(">You are not logged in.")
else:
if action != "":
main.verify_link_id()
username = query_result[0][0]
if action == "remove_connection":
remove_connection()
elif action == "disable_password":
disable_password()
elif action == "enable_password":
enable_password()
elif action == "change_password":
change_password()
elif action == "verify_connection":
verify_connection()
elif action == "add_identity":
add_connection()
elif action == "save":
save()
elif action == "add_current_id":
main.add_current_remote_id(username, remote_identity)
print_fields()
else:
print_fields()
main.close_database()
except Exception:
print("An error occured")