Files
s2p_banking_system/server/manager.py
2024-06-21 00:36:51 +02:00

558 lines
28 KiB
Python

# Lucas Mathews - Fontys Student ID: 5023572
# Banking System Manager File
from class_client import Client
from class_account import Account
from class_transaction import Transaction
from emailer import EmailSendingError # Import the EmailSendingError class to handle email sending errors
from functools import wraps # For decorators / user login
from database import * # Importing the database connection
from emailer import send_email # Importing the emailer function
from logger import event_logger # Importing the event_logger function
from flask import session as flask_session
from flask import request
from flask import jsonify # Imports the Flask modules
from database import session
import hashlib # For password hashing
import datetime # For timestamps
import uuid # For unique identifiers
import random # For OTP generation
import time # For OTP generation
import re # For password hash validation
otps = {} # Temporary dictionary to store OTPs and their creation time
##############
### System ###
##############
def timestamp():
"""Returns the current timestamp in the format 'YYYY-MM-DD HH:MM:SS'."""
return (datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
def hash_password(password:str):
"""Hashes a password using the SHA-512 algorithm and returns the hexadecimal representation of the hash."""
return hashlib.sha512(password.encode()).hexdigest()
def generate_uuid():
"""Generates a unique identifier using the UUID4 algorithm and returns it as a string."""
return str(uuid.uuid4())
def generate_uuid_short():
"""Generates a short unique identifier using the UUID4 algorithm and returns the first 8 characters as a string."""
return str(uuid.uuid4())[:8]
def get_email(client_id:str):
"""Returns the email of a client given their client_id. If the client is not found, returns None."""
client = session.query(Client).filter_by(client_id=client_id).one_or_none()
return client.email if client else None
def format_response(success: bool, message: str = '', data: dict = None):
"""Formats the response for the API so that it is standardised across all functions."""
response = {
'success': success,
'message': message,
}
if data is not None:
response['data'] = data
return jsonify(response)
def get_current_client():
"""Returns the client_id and administrator status of the currently logged in client. If no client is logged in, returns None, None."""
if 'client_id' not in flask_session:
return None, None
client = flask_session['client_id']
client_obj = session.query(Client).filter_by(client_id=client).one_or_none()
if client_obj is None:
return None, None
return client_obj.client_id, client_obj.administrator
def verify_otp(client_id:str, otp:int):
"""Verifies a one time password for a client. Returns True if the OTP is correct and False otherwise."""
if CONFIG["smtp"]["enabled"] == "False":
return True
if client_id in otps:
stored_otp, creation_time = otps[client_id]
if stored_otp == otp and time.time() - creation_time <= 300: # Check if OTP is within 5 minutes
return True
return False
def delete_otp(client_id:str):
"""Deletes the OTP for a client."""
if client_id in otps:
del otps[client_id]
def clean_expired_otps():
"""Checks for expired OTPs and deletes them. An OTP is considered expired if it is older than 5 minutes."""
current_time = time.time()
expired_otps = [client_id for client_id, (otp, creation_time) in otps.items() if current_time - creation_time > 300] # Find OTPs older than 5 minutes
otps_removed = 0
for client_id in expired_otps:
delete_otp(client_id)
otps_removed += 1
event_logger(f"Cleaned {otps_removed} expired OTPs.")
######################
### Authentication ###
######################
def login():
"""Logs in a client using their client_id and password hash. Returns a success message if the login is successful and an error message otherwise."""
data = request.get_json()
client_id = data.get('client_id')
client_hash = data.get('client_hash')
client = session.query(Client).filter_by(client_id=client_id).first()
if client and client.hash == client_hash:
flask_session['client_id'] = client_id
event_logger(f"{client_id} logged in successfully.")
return format_response(True, f"{flask_session['client_id']} logged in successfully."), 200
return format_response(False, "Invalid client_id or password."), 401
def logout():
"""Logs out a client. Returns a success message if the logout is successful and an error message otherwise."""
if 'client_id' in flask_session:
flask_session.pop('client_id', None)
return format_response(True, "Logged out successfully."), 200
return format_response(False, "Not logged in."), 400
def status():
"""Returns the current status of the client."""
if 'client_id' in flask_session:
return format_response(True, f"Logged in as {flask_session['client_id']}"), 200
else:
return format_response(False, "Not logged in."), 400
def login_required(f):
"""Decorator function to check if a client is logged in before accessing a route."""
@wraps(f)
def decorated_function(*args, **kwargs):
if 'client_id' not in flask_session:
return format_response(False, "Not logged in."), 401
return f(*args, **kwargs)
return decorated_function
def admin_required(f):
"""Decorator function to check if a client is an administrator before accessing a route."""
@wraps(f)
def decorated_function(*args, **kwargs):
if 'client_id' not in flask_session:
return format_response(False, "Not logged in."), 401
for client in session.query(Client).all():
if client.client_id == flask_session['client_id']:
if client.administrator == 1:
return f(*args, **kwargs)
return format_response(False, "Not authorised."), 403
return decorated_function
@login_required
def generate_otp(client_id: str):
"""Generates a one-time password for a client and sends it to their email address. Returns a success message if the OTP is generated and an error message otherwise."""
if CONFIG["smtp"]["enabled"] == "False":
return format_response(True, "OTP generation disabled as SMTP is not enabled."), 200
current_client_id, is_admin = get_current_client()
if not is_admin and client_id != current_client_id:
return format_response(False, "You can only generate OTPs for your own client account."), 403
email = get_email(client_id)
if email:
password = int(random.randint(100000, 999999)) # Generate a 6-digit OTP
try:
send_email(email, "Luxbank One Time Password", f"Your one-time password is: {password}")
otps[client_id] = (password, time.time()) # Store the OTP and the current time
event_logger(f"OTP Code {password} emailed to {email}")
print(f"OTP Code {password} emailed to {email}")
return format_response(True, "OTP generated and sent successfully."), 200
except EmailSendingError as e:
event_logger(f"Error sending email: {str(e)}")
error_message = "Error sending email. Please try again later."
if e.original_error:
error_message += f" Original error: {str(e.original_error)}"
return format_response(False, error_message), 500
else:
return format_response(False, "Email address not found for the client."), 404
##############
### Client ###
##############
@login_required
def get_client(client_id: str):
"""Returns a specific client in the database. If the client is not found, returns an error message."""
current_client_id, is_admin = get_current_client()
if not is_admin and client_id != current_client_id:
return format_response(False, "You can only view your own client information."), 403
client = session.query(Client).filter_by(client_id=client_id).one_or_none()
if client:
return format_response(True, "", client.to_dict()), 200
return format_response(False, "Client not found."), 404
@login_required
def update_client(client_id:str, otp_code:int, **kwargs):
"""Updates a client in the database. If the client is not found, returns an error message."""
current_client_id, is_admin = get_current_client()
if not verify_otp(current_client_id, otp_code):
return format_response(False, "Invalid OTP."), 400 # Changed to 400 Bad Request
if not is_admin and client_id != current_client_id:
return format_response(False, "You can only view your own client information."), 403
client = session.query(Client).filter_by(client_id=client_id).first()
if client:
for field in ['name', 'birthdate', 'address', 'phone_number', 'email', 'notes']:
if field in kwargs and kwargs[field] is not None:
setattr(client, field, kwargs[field])
session.commit()
return format_response(True, f"Client ID: {client_id} has been updated."), 200
return format_response(False, "Client not found."), 404
def change_password():
"""Changes the password for a client in the database. If the client is not found, returns an error message."""
data = request.get_json()
client_id = data.get('client_id')
hash_old_password = data.get('hash_old_password')
hash_new_password = data.get('hash_new_password')
otp_code = data.get('otp_code')
current_client_id, is_admin = get_current_client()
otp_verified = verify_otp(client_id, otp_code) # Verify if the OTP is correct
if not is_admin and client_id != current_client_id: # Check if the client is authorized to change the password
return format_response(False, "You can only update your own client information."), 401
if not otp_verified: # Recheck OTP verification after authorisation check
return format_response(False, "Invalid OTP."), 400
hash_format = r'^[0-9a-f]{128}$' # Validate new password format
if not re.match(hash_format, hash_new_password):
return format_response(False, "Invalid new password format (must be provided as a hash)."), 400
client = session.query(Client).filter_by(client_id=client_id).first() # Check if the old password hash matches and update to the new password hash
if client:
if client.hash == hash_old_password:
client.hash = hash_new_password
session.commit()
delete_otp(current_client_id)
event_logger(f"Password for client_id {client_id} has been updated by {current_client_id}.")
return format_response(True, f"Password for client_id {client_id} has been updated."), 200
else:
return format_response(False, "Invalid old password."), 400
else:
return format_response(False, "Client not found."), 404
@login_required
def get_accounts(client_id: str):
"""Returns all accounts for a specific client in the database. If the client is not found, returns an error message."""
current_client_id, is_admin = get_current_client()
if current_client_id is None:
raise Exception("No current client found")
if not is_admin and client_id != current_client_id:
return format_response(False, "You can only view your own client information."), 403
accounts = session.query(Account).filter(Account.client_id == client_id)
return format_response(True, "", [account.to_dict() for account in accounts]), 200
###############
### Account ###
###############
@login_required
def get_account(account_id: str):
"""Returns a specific account in the database. If the account is not found, returns an error message."""
current_client_id, is_admin = get_current_client()
account = session.query(Account).filter_by(account_id=account_id).one_or_none()
if account is None:
return format_response(False, "Account not found."), 404
account_owner = account.client_id
if not is_admin and account_owner != current_client_id:
return format_response(False, "You can only view your own client information."), 403
return format_response(True, "", account.to_dict()), 200
@login_required
def add_account(client_id:str, description:str, account_type:str, **kwargs):
"""Adds a new account to the database. If the client is not found, returns an error message."""
current_client_id, is_admin = get_current_client()
if not is_admin and client_id != current_client_id:
return format_response(False, "You can only view your own client information."), 403
account_id = generate_uuid_short()
notes = kwargs.get("notes", None)
client_found = None
for client in session.query(Client).all(): # Find the client
if client.client_id == client_id:
client_found = client
break
if client_found is None: # If the client is not found, return an error
return format_response(False, "Client not found."), 404
new_account = Account(account_id, client_id, description, timestamp(), account_type, 0, 1, notes, None) # Create the new account
session.add(new_account)
session.commit()
return format_response(True, f"New account has been added: account_id: {account_id}"), 200
@login_required
def update_account(account_id: str, otp_code: str, **kwargs):
"""Updates an account in the database. If the account is not found, returns an error message."""
current_client_id, is_admin = get_current_client()
# Verify OTP
if not verify_otp(current_client_id, otp_code):
return format_response(False, "Invalid OTP."), 400
# Query the account once
account = session.query(Account).filter_by(account_id=account_id).one_or_none()
if account is None:
return format_response(False, "Account not found."), 404
account_owner = account.client_id
# Check permissions
if not is_admin and account_owner != current_client_id:
return format_response(False, "You can only update your own account information."), 403
# Update the account with provided kwargs
description = kwargs.get("description")
account_type = kwargs.get("account_type")
balance = kwargs.get("balance")
enabled = kwargs.get("enabled")
notes = kwargs.get("notes")
if description is not None:
account.description = description
if account_type is not None:
account.account_type = account_type
if balance is not None:
account.balance = balance
if enabled is not None:
account.enabled = enabled
if notes is not None:
account.notes = notes
# Commit the changes
session.commit()
return format_response(True, f"account_id: {account_id} has been updated."), 200
###################
### Transaction ###
###################
@login_required
def get_transaction(transaction_id:int):
"""Returns a specific transaction in the database. If the transaction is not found, returns an error message."""
current_client_id, is_admin = get_current_client()
transaction = session.query(Transaction).filter_by(transaction_id=transaction_id).one_or_none()
if not transaction:
return format_response(False, "Transaction not found."), 404
account = session.query(Account).filter_by(account_id=transaction.account_id).one_or_none()
recipient_account = session.query(Account).filter_by(account_id=transaction.recipient_account_id).one_or_none()
if not is_admin and (account.client_id != current_client_id and recipient_account.client_id != current_client_id):
return format_response(False, "You can only view your own client information."), 403
return format_response(True, "", transaction.to_dict()), 200
@login_required
def add_transaction(amount: float, account_id: str, recipient_account_id: str, otp_code: int, description: str):
"""Adds a new transaction to the database. If the account is not found, returns an error message."""
print(f"Adding transaction: amount: {amount}, account_id: {account_id}, recipient_account_id: {recipient_account_id}, otp_code: {otp_code}, description: {description}")
current_client_id, is_admin = get_current_client()
if not is_admin and account_id != current_client_id:
return format_response(False, "You can only view your own client information."), 403
if not is_admin:
otp_verified = verify_otp(current_client_id, otp_code)
if not otp_verified:
return format_response(False, "Invalid OTP."), 400
transaction_id = generate_uuid()
account_from = session.query(Account).filter_by(account_id=account_id).one_or_none()
account_dest = session.query(Account).filter_by(account_id=recipient_account_id).one_or_none()
if account_from is None or account_dest is None:
return format_response(False, "Account not found."), 404
if account_from.balance < amount:
return format_response(False, "Insufficient funds."), 400
delete_otp(current_client_id)
# Perform the transaction
account_from.balance -= amount
account_dest.balance += amount
transaction_type = "transfer"
session.commit()
# Create the transaction record
new_transaction = Transaction(transaction_id, transaction_type, amount, timestamp(), description, account_id, recipient_account_id)
session.add(new_transaction)
session.commit()
return format_response(True, f"New transaction has been added: transaction_id: {transaction_id}"), 200
@login_required
def transaction_history(account_id:int):
"""Returns all transactions for a specific account in the database. If the account is not found, returns an error message."""
current_client_id, is_admin = get_current_client()
account = session.query(Account).filter_by(account_id=account_id).one_or_none()
if not account:
return format_response(False, "Account not found."), 404
if not is_admin and account.client_id != current_client_id:
return format_response(False, "You can only view your own client information."), 403
result = session.query(Transaction).filter(Transaction.account_id == account_id)
return format_response(True, "", [transaction.to_dict() for transaction in result]), 200
#####################
### Administrator ###
#####################
@admin_required
def delete_client(client_id:str):
"""Deletes a client from the database. If the client is not found, returns an error message."""
if client_id == flask_session['client_id']:
return format_response(False, "You cannot delete your own account."), 400
for client in session.query(Client).all():
if client.client_id == client_id:
if client.accounts == None:
session.delete(client)
session.commit()
event_logger(f"Client ID: {client_id} has been removed by {flask_session['client_id']}.")
return format_response(True, f"client_id: {client_id} has been removed."), 200
else:
return format_response(False, "Client has accounts and can not be removed."), 400
return format_response(False, "Client not found."), 404
@admin_required
def delete_account(account_id:str):
"""Deletes an account from the database. If the account is not found, returns an error message."""
for account in session.query(Account).all():
if account.account_id == account_id:
if account.balance == 0:
session.delete(account)
session.commit()
event_logger(f"Account ID: {account_id} has been removed by {flask_session['client_id']}.")
return format_response(True, f"account_id: {account_id} has been removed."), 200
else:
return format_response(False, "Account has a balance and can not be removed."), 400
return format_response(False, "Account not found."), 404
@admin_required
def get_all_clients():
"""Returns all clients in the database."""
clients = session.query(Client).all()
event_logger(f"All clients have been retrieved by {flask_session['client_id']}.")
return format_response(True, "", [client.to_dict() for client in clients]), 200
@admin_required
def get_all_accounts():
"""Returns all accounts in the database."""
accounts = session.query(Account).all()
event_logger(f"All accounts have been retrieved by {flask_session['client_id']}.")
return format_response(True, "", [account.to_dict() for account in accounts]), 200
@admin_required
def get_all_transactions():
"""Returns all transactions in the database."""
transactions = session.query(Transaction).all()
event_logger(f"All transactions have been retrieved by {flask_session['client_id']}.")
return format_response(True, "", [transaction.to_dict() for transaction in transactions]), 200
@admin_required
def apply_interest(account_id:int, interest_rate:float):
"""Applies interest to an account based on the interest rate. If the account is not found, returns an error message."""
for account in session.query(Account).filter(Account.account_id == account_id):
if account.account_id == account_id:
interest = account.balance * interest_rate
account.balance += interest
session.commit()
event_logger(f"Interest of €{interest} has been applied to Account ID: {account_id} by {flask_session['client_id']}.")
return format_response(True, f"{interest} in interest has been applied to Account ID: {account_id}."), 200
return format_response(False, "Account not found."), 404
@admin_required
def apply_fee(account_id:int, fee:float):
"""Applies a fee to an account based on the fee amount. If the account is not found, returns an error message."""
for account in session.query(Account).all():
if account.account_id == account_id:
account.balance -= fee
session.commit()
event_logger(f"Fee of €{fee} has been applied to Account ID: {account_id} by {flask_session['client_id']}.")
return format_response(True, f"{fee} in fees has been applied to Account ID: {account_id}."), 200
return format_response(False, "Account not found."), 404
@admin_required
def delete_transaction(transaction_id:int):
"""Deletes a transaction from the database. If the transaction is not found, returns an error message."""
for transaction in session.query(Transaction).all():
if transaction.transaction_id == transaction_id:
session.delete(transaction)
session.commit()
event_logger(f"Transaction ID: {transaction_id} has been removed by {flask_session['client_id']}.")
return format_response(True, f"Transaction ID: {transaction_id} has been removed."), 200
return format_response(False, "Transaction not found."), 404
@admin_required
def modify_balance(transaction_id:int, amount:int):
"""Modifies the amount of a transaction in the database. If the transaction is not found, returns an error message."""
for transaction in session.query(Transaction).all():
if transaction.transaction_id == transaction_id:
transaction.amount = amount
session.commit()
event_logger(f"Transaction ID: {transaction_id} has been modified by {flask_session['client_id']}.")
return format_response(True, f"Transaction ID: {transaction_id} has been modified."), 200
return format_response(False, "Transaction not found."), 404
@admin_required
def test_account_balances():
"""Checks all account balances in the database and returns a list of discrepancies."""
event_logger(f"Account balances have been checked by {flask_session['client_id']}.")
all_transactions = session.query(Transaction).all()# Get all transactions from the database
calculated_balances = {} # Initialize a dictionary to store the calculated balance for each account
for transaction in all_transactions: # Go through each transaction
if transaction.account_id not in calculated_balances: # If the account ID of the transaction is not in the dictionary, add it with a balance of 0
calculated_balances[transaction.account_id] = 0
if transaction.transaction_type == 'Deposit': # Update the calculated balance for the account
calculated_balances[transaction.account_id] += transaction.amount
elif transaction.transaction_type == 'Withdrawal':
calculated_balances[transaction.account_id] -= transaction.amount
all_accounts = session.query(Account).all() # Get all accounts from the database
discrepancies = [] # Initialize a list to store the discrepancies
for account in all_accounts: # Go through each account
if calculated_balances.get(account.account_id, 0) != account.balance: # If the calculated balance doesn't match the stored balance, add the discrepancy to the list
discrepancies.append({"error": f"Alert: Account {account.account_id} has a balance discrepancy. Stored balance is {account.balance}, but calculated balance is {calculated_balances.get(account.account_id, 0)}."})
return format_response(True, "", discrepancies), 200 # Return the list of discrepancies
@admin_required
def add_client(name:str, birthdate:str, address:str, phone_number:str, email:str, password:str, **kwargs):
"""Adds a new client to the database."""
client_id = generate_uuid_short()
notes = kwargs.get("notes", None)
new_client = Client(client_id, name, birthdate, timestamp(), address, phone_number, email, hash_password(password), notes, 1, 0, None)
session.add(new_client)
session.commit()
event_logger(f"New client has been added: client_id: {client_id} by {flask_session['client_id']}.")
return format_response(True, f"New client has been added: client_id: {client_id}"), 200
def initialise_database(password:str, email:str):
"""Initialises the database with an administrator client if no clients exist."""
existing_clients = session.query(Client).all() # Check if any clients exist in the database
if not existing_clients: # If no clients exist, create an administrator client
new_client = Client(generate_uuid_short(), "ADMIN", "ADMIN", timestamp(), "ADMIN", "ADMIN", email, hash_password(password), None, 1, 0, None)
session.add(new_client)
session.commit()
admin_client = session.query(Client).filter_by(name='ADMIN').one() # Retrieve the administrator client
admin_client.administrator = 1 # Set the new client as an administrator
session.commit()
event_logger(f"Database initialised with administrator account with client_id {admin_client.client_id}.")
return format_response(True, f"Database initialised with administrator account with client_id {admin_client.client_id}"), 200
return format_response(False, "Database not empty."), 400
@admin_required
def promote_to_admin(client_id:str):
"""Promotes a client to administrator status. If the client is not found, returns an error message."""
for client in session.query(Client).all():
if client.client_id == client_id:
client.administrator = 1
session.commit()
event_logger(f"Client ID: {client_id} has been promoted to administrator by {flask_session['client_id']}.")
return format_response(True, f"client_id: {client_id} has been promoted to administrator."), 200
return format_response(False, f"client_id: {client_id} is not found."), 404
@admin_required
def demote_from_admin(client_id:str):
"""Demotes a client from administrator status. If the client is not found, returns an error message."""
for client in session.query(Client).all():
if client.client_id == client_id:
client.administrator = 0
session.commit()
event_logger(f"Client ID: {client_id} has been demoted from administrator by {flask_session['client_id']}.")
return format_response(True, f"client_id: {client_id} has been demoted from administrator."), 200
return format_response(False, f"client_id: {client_id} is not found."), 404