formatted responses, add OTP

This commit is contained in:
Lucas Mathews
2024-05-25 18:56:33 +02:00
parent 9072103f75
commit fc1922f9c4
6 changed files with 185 additions and 154 deletions

View File

@@ -1,5 +1,5 @@
[database] [database]
name=bank.db name=test_database.db
[api_file] [api_file]
name=api.yml name=api.yml

View File

@@ -30,4 +30,17 @@ class Account(Base):
self.transactions = transactions if transactions is not None else [] self.transactions = transactions if transactions is not None else []
def __repr__(self): def __repr__(self):
return f"Account ID: {self.account_id}, Description: {self.description}, Open Timestamp: {self.open_timestamp}, Account Type: {self.account_type}, Balance: {self.balance}, Enabled: {self.enabled}, Notes: {self.notes}, Transactions: {self.transactions}" return f"Account: {self.account_id}, {self.client_id}, {self.description}, {self.open_timestamp}, {self.account_type}, {self.balance}, {self.enabled}, {self.notes}, {self.transactions}"
def to_dict(self):
return {
"account_id": self.account_id,
"client_id": self.client_id,
"description": self.description,
"open_timestamp": self.open_timestamp,
"account_type": self.account_type,
"balance": self.balance,
"enabled": self.enabled,
"notes": self.notes,
"transactions": [transaction.to_dict() for transaction in self.transactions]
}

View File

@@ -36,4 +36,21 @@ class Client(Base):
self.accounts = accounts if accounts is not None else [] self.accounts = accounts if accounts is not None else []
def __repr__(self): def __repr__(self):
return f"Client ID: {self.client_id}, Name: {self.name}, Birthdate: {self.birthdate}, Address: {self.address}, Phone Number: {self.phone_number}, Email: {self.email}, Enabled: {self.enabled}, Accounts: {self.accounts}" return f"Client: {self.client_id}, {self.name}, {self.birthdate}, {self.opening_timestamp}, {self.address}, {self.phone_number}, {self.email}, {self.hash}, {self.notes}, {self.enabled}, {self.administrator}, {self.accounts}"
def to_dict(self):
return {
"client_id": self.client_id,
"name": self.name,
"birthdate": self.birthdate,
"opening_timestamp": self.opening_timestamp,
"address": self.address,
"phone_number": self.phone_number,
"email": self.email,
"hash": self.hash,
"notes": self.notes,
"enabled": self.enabled,
"administrator": self.administrator,
"accounts": [account.to_dict() for account in self.accounts]
}

View File

@@ -25,4 +25,15 @@ class Transaction(Base):
self.recipient_account_id = recipient_account_id self.recipient_account_id = recipient_account_id
def __repr__(self): def __repr__(self):
return f"Transaction ID: {self.transaction_id}, Transaction Type: {self.transaction_type}, Amount: {self.amount}, Timestamp: {self.timestamp}, Description: {self.description} From Account Number: {self.account_id}, Recipient Account Number: {self.recipient_account_id}" return f"Transaction: {self.transaction_id}, {self.transaction_type}, {self.amount}, {self.timestamp}, {self.description}, {self.account_id}, {self.recipient_account_id}"
def to_dict(self):
return {
"transaction_id": self.transaction_id,
"transaction_type": self.transaction_type,
"amount": self.amount,
"timestamp": self.timestamp,
"description": self.description,
"account_id": self.account_id,
"recipient_account_id": self.recipient_account_id
}

View File

@@ -14,6 +14,8 @@ from functools import wraps # For decorators / user login
from database import * # Importing the database connection from database import * # Importing the database connection
from emailer import send_email # Importing the emailer function from emailer import send_email # Importing the emailer function
otps = {} # Temporary dictionary to store OTPs and their creation time
############## ##############
### System ### ### System ###
############## ##############
@@ -30,74 +32,25 @@ def generate_uuid(): # Generates a unique identifier for transactions
def generate_uuid_short(): # Generates a short uuid def generate_uuid_short(): # Generates a short uuid
return str(uuid.uuid4())[:8] return str(uuid.uuid4())[:8]
def get_email(client_id:str): def get_email(client_id:str): # Returns the email of a client
for client in session.query(Client).all(): for client in session.query(Client).all():
if client.client_id == client_id: if client.client_id == client_id:
return client.email return client.email
return None return None
###################### def format_response(success: bool, message: str = '', data: dict = None): # Formats the response for the API so that it is standardised across all functions
### Authentication ### response = {
###################### 'success': success,
'message': message,
}
if data is not None:
response['data'] = data
return jsonify(response)
def login(client_id:str, password:str): # Logs in a user def get_current_client(): # Returns the current client and if they are an administrator
password_hash = hash_password(password)
for client in session.query(Client).all():
if client.client_id == client_id and client.hash == password_hash:
flask_session['client_id'] = client_id
return jsonify({"message": f"{flask_session['client_id']} logged in succsessfully."}), 200
return "Invalid client_id or password.", 401
def logout():
if 'client_id' in flask_session:
flask_session.pop('client_id', None)
return jsonify({"message": "Logged out"}), 200
return jsonify({"message": "Not logged in"}), 404
def status():
if 'client_id' in flask_session:
return jsonify({"message": f"Logged in as {flask_session['client_id']}"}), 200
else:
return jsonify({"message": "Not logged in"}), 400
def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if 'client_id' not in flask_session:
return jsonify({"error": "Not logged in"}), 401
return f(*args, **kwargs)
return decorated_function
def admin_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if 'client_id' not in flask_session:
return jsonify({"error": "Not logged in"}), 401
for client in session.query(Client).all():
if client.client_id == flask_session['client_id']:
if client.administrator == 1:
return f(*args, **kwargs)
return jsonify({"error": "Not authorised"}), 403
return decorated_function
def get_current_client():
client = flask_session['client_id'] client = flask_session['client_id']
is_admin = session.query(Client).filter_by(client_id=client).one_or_none().administrator is_admin = session.query(Client).filter_by(client_id=client).one_or_none().administrator
return client, is_admin return client, is_admin
otps = {} # Dictionary to store OTPs and their creation time
@login_required
def generate_otp(client_id:str): # Generates a one time password for a client
current_client_id, is_admin = get_current_client()
if not is_admin and client_id != current_client_id:
return jsonify({"error": "You can only view your own client information."}), 403
email = get_email(client_id)
if email:
password = int(random.randint(100000, 999999)) # Generate a 6-digit OTP
send_email(email, "Luxbank One Time Password", f"Your one time password is: {password}"), 200
otps[client_id] = (password, time.time()) # Store the OTP and the current time
return jsonify({"error": "Client not found"}), 404
def verify_otp(client_id:str, otp:int): # Verifies a one time password for a client def verify_otp(client_id:str, otp:int): # Verifies a one time password for a client
if client_id in otps and otps[client_id][0] == otp: if client_id in otps and otps[client_id][0] == otp:
@@ -114,6 +67,62 @@ def check_expired_otps(): # Checks and deletes expired OTPs
for client_id in expired_otps: for client_id in expired_otps:
delete_otp(client_id) delete_otp(client_id)
######################
### Authentication ###
######################
def login(client_id:str, password:str): # Logs in a user
password_hash = hash_password(password)
for client in session.query(Client).all():
if client.client_id == client_id and client.hash == password_hash:
flask_session['client_id'] = client_id
return format_response(True, f"{flask_session['client_id']} logged in succsessfully."), 200
return format_response(False, "Invalid client_id or password."), 400
def logout():
if 'client_id' in flask_session:
flask_session.pop('client_id', None)
return format_response(True, "Logged out successfully."), 200
return format_response(False, "Not logged in."), 400
def status():
if 'client_id' in flask_session:
return format_response(True, f"Logged in as {flask_session['client_id']}"), 200
else:
return format_response(False, "Not logged in."), 400
def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if 'client_id' not in flask_session:
return format_response(False, "Not logged in."), 401
return f(*args, **kwargs)
return decorated_function
def admin_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if 'client_id' not in flask_session:
return format_response(False, "Not logged in."), 401
for client in session.query(Client).all():
if client.client_id == flask_session['client_id']:
if client.administrator == 1:
return f(*args, **kwargs)
return format_response(False, "Not authorised."), 403
return decorated_function
@login_required
def generate_otp(client_id:str): # Generates a one time password for a client
current_client_id, is_admin = get_current_client()
if not is_admin and client_id != current_client_id:
return format_response(False, "You can only generate OTPs for your own client account."), 403
email = get_email(client_id)
if email:
password = int(random.randint(100000, 999999)) # Generate a 6-digit OTP
send_email(email, "Luxbank One Time Password", f"Your one time password is: {password}"), 200
otps[client_id] = (password, time.time()) # Store the OTP and the current time
return format_response(True, "Client not found."), 404
############## ##############
### Client ### ### Client ###
@@ -123,17 +132,17 @@ def check_expired_otps(): # Checks and deletes expired OTPs
def get_client(client_id:str): # Returns a specific client in the database def get_client(client_id:str): # Returns a specific client in the database
current_client_id, is_admin = get_current_client() current_client_id, is_admin = get_current_client()
if not is_admin and client_id != current_client_id: if not is_admin and client_id != current_client_id:
return jsonify({"error": "You can only view your own client information."}), 403 return format_response(False, "You can only view your own client information."), 403
for client in session.query(Client).all(): for client in session.query(Client).all():
if client.client_id == client_id: if client.client_id == client_id:
return jsonify({"name": client.name, "birthdate": client.birthdate, "opening_timestamp": client.opening_timestamp, "address": client.address, "phone_number": client.phone_number, "email": client.email}), 200 return format_response(True, "", client.to_dict()), 200
return jsonify({"error": "Client not found"}), 404 return format_response(False, "Client not found."), 404
@login_required @login_required
def update_client(client_id:str, **kwargs): # Updates a client in the database def update_client(client_id:str, **kwargs): # Updates a client in the database
current_client_id, is_admin = get_current_client() current_client_id, is_admin = get_current_client()
if not is_admin and client_id != current_client_id: if not is_admin and client_id != current_client_id:
return jsonify({"error": "You can only update your own client information."}), 403 return format_response(False, "You can only view your own client information."), 403
for client in session.query(Client).all(): for client in session.query(Client).all():
if client.client_id == client_id: if client.client_id == client_id:
name = kwargs.get("name", None) name = kwargs.get("name", None)
@@ -155,16 +164,16 @@ def update_client(client_id:str, **kwargs): # Updates a client in the database
if notes: if notes:
client.notes = notes client.notes = notes
session.commit() session.commit()
return f"client_id: {client_id} has been updated.", 299 return format_response(True, f"client_id: {client_id} has been updated."), 200
return f"Client ID: {client_id} is not found." , 400 return format_response(False, "Client not found."), 404
@login_required @login_required
def change_password(client_id:str, password:str, new_password:str, otp:int): # Changes the password of a client def change_password(client_id:str, password:str, new_password:str, otp:int): # Changes the password of a client
current_client_id, is_admin = get_current_client() current_client_id, is_admin = get_current_client()
if not is_admin and client_id != current_client_id: if not is_admin and client_id != current_client_id:
return jsonify({"error": "You can only update your own password."}), 403 return format_response(False, "You can only update your own client information."), 403
if not verify_otp(client_id, otp): if not verify_otp(client_id, otp):
return jsonify({"error": "Invalid OTP"}), 400 return format_response(False, "Invalid OTP."), 400
old_hash = hash_password(password) old_hash = hash_password(password)
new_hash = hash_password(new_password) new_hash = hash_password(new_password)
for client in session.query(Client).all(): for client in session.query(Client).all():
@@ -173,9 +182,9 @@ def change_password(client_id:str, password:str, new_password:str, otp:int): # C
client.hash = new_hash client.hash = new_hash
session.commit() session.commit()
delete_otp(client_id) delete_otp(client_id)
return "Password changed successfully.", 200 return format_response(True, f"Password for client_id: {client_id} has been updated."), 200
return "Incorrect old password.", 400 return format_response(False, "Invalid password."), 400
return f"client_id: {client_id} is not found.", 404 return format_response(False, "Client not found."), 404
############### ###############
### Account ### ### Account ###
@@ -186,41 +195,38 @@ def get_account(account_id:str): # Returns a specific account in the database
current_client_id, is_admin = get_current_client() current_client_id, is_admin = get_current_client()
account_owner = session.query(Account).filter_by(account_id=account_id).one_or_none().client_id account_owner = session.query(Account).filter_by(account_id=account_id).one_or_none().client_id
if not is_admin and account_owner != current_client_id: if not is_admin and account_owner != current_client_id:
return jsonify({"error": "You can only view your own account information."}), 403 return format_response(False, "You can only view your own client information."), 403
account = session.query(Account).filter_by(account_id=account_id).one_or_none() account = session.query(Account).filter_by(account_id=account_id).one_or_none()
for account in session.query(Account).all(): for account in session.query(Account).all():
if account.account_id == account_id: if account.account_id == account_id:
return jsonify({"client_id": account.client_id, "description": account.description, "account_type": account.account_type, "balance": account.balance, "enabled": account.enabled, "notes": account.notes}), 200 return format_response(True, "", account.to_dict()), 200
return jsonify({"error": "Account not found"}), 404 return format_response(False, "Account not found."), 404
@login_required @login_required
def add_account(client_id:str, description:str, account_type:str, **kwargs): # Adds a new account to the database def add_account(client_id:str, description:str, account_type:str, **kwargs): # Adds a new account to the database
current_client_id, is_admin = get_current_client() current_client_id, is_admin = get_current_client()
if not is_admin and client_id != current_client_id: if not is_admin and client_id != current_client_id:
return jsonify({"error": "You can only add accounts your own client account."}), 403 return format_response(False, "You can only view your own client information."), 403
account_id = generate_uuid_short() account_id = generate_uuid_short()
notes = kwargs.get("notes", None) notes = kwargs.get("notes", None)
client_found = None client_found = None
# Find the client for client in session.query(Client).all(): # Find the client
for client in session.query(Client).all():
if client.client_id == client_id: if client.client_id == client_id:
client_found = client client_found = client
break break
# Check if client was found if client_found is None: # If the client is not found, return an error
if client_found is None: return format_response(False, "Client not found."), 404
return f"client_id: {client_id} is not found.", 422 new_account = Account(account_id, client_id, description, timestamp(), account_type, 0, 1, notes, None) # Create the new account
# Add the new account
new_account = Account(account_id, client_id, description, timestamp(), account_type, 0, 1, notes, None)
session.add(new_account) session.add(new_account)
session.commit() session.commit()
return f"New account has been added: description: {description}, uuid: {account_id} ", 200 return format_response(True, f"New account has been added: account_id: {account_id}"), 200
@login_required @login_required
def update_account(account_id:str, **kwargs): # Updates an account in the database def update_account(account_id:str, **kwargs): # Updates an account in the database
current_client_id, is_admin = get_current_client() current_client_id, is_admin = get_current_client()
account_owner = session.query(Account).filter_by(account_id=account_id).one_or_none().client_id account_owner = session.query(Account).filter_by(account_id=account_id).one_or_none().client_id
if not is_admin and account_owner != current_client_id: if not is_admin and account_owner != current_client_id:
return jsonify({"error": "You can only view your own account information."}), 403 return format_response(False, "You can only view your own client information."), 403
for account in session.query(Account).all(): for account in session.query(Account).all():
if account.account_id == account_id: if account.account_id == account_id:
description = kwargs.get("description", None) description = kwargs.get("description", None)
@@ -239,8 +245,8 @@ def update_account(account_id:str, **kwargs): # Updates an account in the databa
if notes: if notes:
account.notes = notes account.notes = notes
session.commit() session.commit()
return f"account_id: {account_id} has been updated.", 200 return format_response(True, f"account_id: {account_id} has been updated."), 200
return f"account_id: {account_id} is not found.", 400 return format_response(False, "Account not found."), 404
################### ###################
### Transaction ### ### Transaction ###
@@ -251,18 +257,18 @@ def get_transaction(transaction_id:int): # Returns a specific transaction in the
current_client_id, is_admin = get_current_client() current_client_id, is_admin = get_current_client()
transaction = session.query(Transaction).filter_by(transaction_id=transaction_id).one_or_none() transaction = session.query(Transaction).filter_by(transaction_id=transaction_id).one_or_none()
if not transaction: if not transaction:
return jsonify({"error": "Transaction not found"}), 404 return format_response(False, "Transaction not found."), 404
account = session.query(Account).filter_by(account_id=transaction.account_id).one_or_none() account = session.query(Account).filter_by(account_id=transaction.account_id).one_or_none()
recipient_account = session.query(Account).filter_by(account_id=transaction.recipient_account_id).one_or_none() recipient_account = session.query(Account).filter_by(account_id=transaction.recipient_account_id).one_or_none()
if not is_admin and (account.client_id != current_client_id and recipient_account.client_id != current_client_id): if not is_admin and (account.client_id != current_client_id and recipient_account.client_id != current_client_id):
return jsonify({"error": "You can only view your own transaction information."}), 403 return format_response(False, "You can only view your own client information."), 403
return jsonify({"transaction_type": transaction.transaction_type, "amount": transaction.amount, "timestamp": transaction.timestamp, "description": transaction.description, "account_id": transaction.account_id, "recipient_account_id": transaction.recipient_account_id}), 200 return format_response(True, "", transaction.to_dict()), 200
@login_required @login_required
def add_transaction(amount:int, account_id, recipient_account_id, **kwargs): # Adds a new transaction to the database def add_transaction(amount:int, account_id, recipient_account_id, **kwargs): # Adds a new transaction to the database
current_client_id, is_admin = get_current_client() current_client_id, is_admin = get_current_client()
if not is_admin and account_id != current_client_id: if not is_admin and account_id != current_client_id:
return jsonify({"error": "You can only add transactions to your own account."}), 403 return format_response(False, "You can only view your own client information."), 403
transaction_id = generate_uuid() transaction_id = generate_uuid()
for account in session.query(Account).all(): for account in session.query(Account).all():
if account.account_id == account_id: if account.account_id == account_id:
@@ -271,7 +277,7 @@ def add_transaction(amount:int, account_id, recipient_account_id, **kwargs): # A
account_dest = account account_dest = account
# Check if account has enough funds # Check if account has enough funds
if account_from.balance < amount: if account_from.balance < amount:
return f"Account ID: {account_id} does not have enough funds to transfer {amount}.", 401 return format_response(False, "Insufficient funds."), 400
# Perform the transaction # Perform the transaction
account_from.balance -= amount account_from.balance -= amount
account_dest.balance += amount account_dest.balance += amount
@@ -282,18 +288,18 @@ def add_transaction(amount:int, account_id, recipient_account_id, **kwargs): # A
new_transaction = Transaction(transaction_id, transaction_type, amount, timestamp(), description, account_id, recipient_account_id) new_transaction = Transaction(transaction_id, transaction_type, amount, timestamp(), description, account_id, recipient_account_id)
session.add(new_transaction) session.add(new_transaction)
session.commit() session.commit()
return f"New transaction has been added: description: {description}, uuid: {transaction_id} ", 200 return format_response(True, f"New transaction has been added: transaction_id: {transaction_id}"), 200
@login_required @login_required
def transaction_history(account_id:int): # Returns all transactions for a specific account def transaction_history(account_id:int): # Returns all transactions for a specific account
current_client_id, is_admin = get_current_client() current_client_id, is_admin = get_current_client()
account = session.query(Account).filter_by(account_id=account_id).one_or_none() account = session.query(Account).filter_by(account_id=account_id).one_or_none()
if not account: if not account:
return jsonify({"error": "Account not found."}), 404 return format_response(False, "Account not found."), 404
if not is_admin and account.client_id != current_client_id: if not is_admin and account.client_id != current_client_id:
return jsonify({"error": "You can only view your own transaction history."}), 403 return format_response(False, "You can only view your own client information."), 403
result = session.query(Transaction).filter(Transaction.account_id == account_id) result = session.query(Transaction).filter(Transaction.account_id == account_id)
return jsonify([{"transaction_id": transaction.transaction_id, "transaction_type": transaction.transaction_type, "amount": transaction.amount, "timestamp": transaction.timestamp, "description": transaction.description, "account_number": transaction.account_id, "recipient_account_number": transaction.recipient_account_id} for transaction in result]), 200 return format_response(True, "", [transaction.to_dict() for transaction in result]), 200
##################### #####################
### Administrator ### ### Administrator ###
@@ -302,17 +308,17 @@ def transaction_history(account_id:int): # Returns all transactions for a specif
@admin_required @admin_required
def delete_client(client_id:str): # Deletes a client from the database def delete_client(client_id:str): # Deletes a client from the database
if client_id == flask_session['client_id']: if client_id == flask_session['client_id']:
return "You can not delete yourself.", 400 return format_response(False, "You cannot delete your own account."), 400
for client in session.query(Client).all(): for client in session.query(Client).all():
if client.client_id == client_id: if client.client_id == client_id:
if client.accounts == None: if client.accounts == None:
session.delete(client) session.delete(client)
session.commit() session.commit()
return f"client_id: {client_id} has been removed.", 200 return format_response(True, f"client_id: {client_id} has been removed."), 200
else: else:
return f"client_id: {client_id} has active accounts and can not be removed.", 400 return format_response(False, "Client has accounts and can not be removed."), 400
return f"client_id: {client_id} is not found.", 404 return format_response(False, "Client not found."), 404
@admin_required @admin_required
def delete_account(account_id:str): # Deletes an account from the database def delete_account(account_id:str): # Deletes an account from the database
@@ -321,35 +327,35 @@ def delete_account(account_id:str): # Deletes an account from the database
if account.balance == 0: if account.balance == 0:
session.delete(account) session.delete(account)
session.commit() session.commit()
return f"account_id: {account_id} has been removed.", 200 return format_response(True, f"account_id: {account_id} has been removed."), 200
else: else:
return f"account_id: {account_id} has a balance and can not be removed.", 400 return format_response(False, "Account has a balance and can not be removed."), 400
return f"account_id: {account_id} is not found.", 404 return format_response(False, "Account not found."), 404
@admin_required @admin_required
def get_all_clients(): # Returns all clients in the database def get_all_clients(): # Returns all clients in the database
clients = session.query(Client).all() clients = session.query(Client).all()
return jsonify([{"client_id": client.client_id, "name": client.name, "birthdate": client.birthdate, "opening_timestamp": client.opening_timestamp, "address": client.address, "phone_number": client.phone_number, "email": client.email} for client in clients]) return format_response(True, "", [client.to_dict() for client in clients]), 200
@admin_required @admin_required
def get_all_accounts(): # Returns all accounts in the database def get_all_accounts(): # Returns all accounts in the database
accounts = session.query(Account).all() accounts = session.query(Account).all()
return jsonify([{"account_id": account.account_id, "client_id": account.client_id, "description": account.description, "open_timestamp": account.open_timestamp, "account_type": account.account_type, "balance": account.balance, "enabled": account.enabled, "notes": account.notes} for account in accounts]) return format_response(True, "", [account.to_dict() for account in accounts]), 200
@admin_required @admin_required
def get_all_transactions(): # Returns all transactions in the database def get_all_transactions(): # Returns all transactions in the database
transactions = session.query(Transaction).all() transactions = session.query(Transaction).all()
return jsonify([{"transaction_id": transaction.transaction_id, "transaction_type": transaction.transaction_type, "amount": transaction.amount, "timestamp": transaction.timestamp, "description": transaction.description, "account_id": transaction.account_id, "recipient_account_id": transaction.recipient_account_id} for transaction in transactions]) return format_response(True, "", [transaction.to_dict() for transaction in transactions]), 200
@admin_required @admin_required
def apply_interest(account_id:int, interest_rate:float): def apply_interest(account_id:int, interest_rate:float):
for account in session.query(Account).filter(Account.account_id == account_id): for account in session.query(Account).filter(Account.account_id == account_id):
if account.account_id == account_id: if account.account_id == account_id:
account.balance += account.balance * interest_rate interest = account.balance * interest_rate
account.balance += interest
session.commit() session.commit()
return f"Interest has been applied to Account ID: {account_id}." return format_response(True, f"{interest} in interest has been applied to Account ID: {account_id}."), 200
return f"Account ID: {account_id} is not found." return format_response(False, "Account not found."), 404
@admin_required @admin_required
def apply_fee(account_id:int, fee:float): def apply_fee(account_id:int, fee:float):
@@ -357,8 +363,8 @@ def apply_fee(account_id:int, fee:float):
if account.account_id == account_id: if account.account_id == account_id:
account.balance -= fee account.balance -= fee
session.commit() session.commit()
return f"Fee has been applied to Account ID: {account_id}." return format_response(True, f"{fee} in fees has been applied to Account ID: {account_id}."), 200
return f"Account ID: {account_id} is not found." return format_response(False, "Account not found."), 404
@admin_required @admin_required
def delete_transaction(transaction_id:int): def delete_transaction(transaction_id:int):
@@ -366,8 +372,8 @@ def delete_transaction(transaction_id:int):
if transaction.transaction_id == transaction_id: if transaction.transaction_id == transaction_id:
session.delete(transaction) session.delete(transaction)
session.commit() session.commit()
return f"Transaction ID: {transaction_id} has been removed.", 400 return format_response(True, f"Transaction ID: {transaction_id} has been removed."), 200
return f"Transaction ID: {transaction_id} is not found.", 404 return format_response(False, "Transaction not found."), 404
@admin_required @admin_required
def modify_balance(transaction_id:int, amount:int): def modify_balance(transaction_id:int, amount:int):
@@ -375,44 +381,26 @@ def modify_balance(transaction_id:int, amount:int):
if transaction.transaction_id == transaction_id: if transaction.transaction_id == transaction_id:
transaction.amount = amount transaction.amount = amount
session.commit() session.commit()
return f"Transaction ID: {transaction_id} has been modified.", 200 return format_response(True, f"Transaction ID: {transaction_id} has been modified."), 200
return f"Transaction ID: {transaction_id} is not found.", 404 return format_response(False, "Transaction not found."), 404
@admin_required @admin_required
def test_account_balances(): def test_account_balances():
# Get all transactions from the database all_transactions = session.query(Transaction).all()# Get all transactions from the database
all_transactions = session.query(Transaction).all() calculated_balances = {} # Initialize a dictionary to store the calculated balance for each account
for transaction in all_transactions: # Go through each transaction
# Initialize a dictionary to store the calculated balance for each account if transaction.account_id not in calculated_balances: # If the account ID of the transaction is not in the dictionary, add it with a balance of 0
calculated_balances = {}
# Go through each transaction
for transaction in all_transactions:
# If the account ID of the transaction is not in the dictionary, add it with a balance of 0
if transaction.account_id not in calculated_balances:
calculated_balances[transaction.account_id] = 0 calculated_balances[transaction.account_id] = 0
if transaction.transaction_type == 'Deposit': # Update the calculated balance for the account
# Update the calculated balance for the account
if transaction.transaction_type == 'Deposit':
calculated_balances[transaction.account_id] += transaction.amount calculated_balances[transaction.account_id] += transaction.amount
elif transaction.transaction_type == 'Withdrawal': elif transaction.transaction_type == 'Withdrawal':
calculated_balances[transaction.account_id] -= transaction.amount calculated_balances[transaction.account_id] -= transaction.amount
all_accounts = session.query(Account).all() # Get all accounts from the database
# Get all accounts from the database discrepancies = [] # Initialize a list to store the discrepancies
all_accounts = session.query(Account).all() for account in all_accounts: # Go through each account
if calculated_balances.get(account.account_id, 0) != account.balance: # If the calculated balance doesn't match the stored balance, add the discrepancy to the list
# Initialize a list to store the discrepancies
discrepancies = []
# Go through each account
for account in all_accounts:
# If the calculated balance doesn't match the stored balance, add the discrepancy to the list
if calculated_balances.get(account.account_id, 0) != account.balance:
discrepancies.append({"error": f"Alert: Account {account.account_id} has a balance discrepancy. Stored balance is {account.balance}, but calculated balance is {calculated_balances.get(account.account_id, 0)}."}) discrepancies.append({"error": f"Alert: Account {account.account_id} has a balance discrepancy. Stored balance is {account.balance}, but calculated balance is {calculated_balances.get(account.account_id, 0)}."})
return format_response(True, "", discrepancies), 200 # Return the list of discrepancies
# Return the list of discrepancies
return jsonify(discrepancies), 200
@admin_required @admin_required
def add_client(name:str, birthdate:str, address:str, phone_number:str, email:str, password:str, **kwargs): # Adds a new client to the database def add_client(name:str, birthdate:str, address:str, phone_number:str, email:str, password:str, **kwargs): # Adds a new client to the database
@@ -421,7 +409,7 @@ def add_client(name:str, birthdate:str, address:str, phone_number:str, email:str
new_client = Client(client_id, name, birthdate, timestamp(), address, phone_number, email, hash_password(password), notes, 1, 0, None) new_client = Client(client_id, name, birthdate, timestamp(), address, phone_number, email, hash_password(password), notes, 1, 0, None)
session.add(new_client) session.add(new_client)
session.commit() session.commit()
return client_id, 200 return format_response(True, f"New client has been added: client_id: {client_id}"), 200
def initialise_database(password:str, email:str): def initialise_database(password:str, email:str):
existing_clients = session.query(Client).all() # Check if any clients exist in the database existing_clients = session.query(Client).all() # Check if any clients exist in the database
@@ -431,8 +419,8 @@ def initialise_database(password:str, email:str):
admin_client = session.query(Client).filter_by(name='ADMINISTRATOR').one() # Retrieve the administrator client admin_client = session.query(Client).filter_by(name='ADMINISTRATOR').one() # Retrieve the administrator client
admin_client.administrator = 1 # Set the new client as an administrator admin_client.administrator = 1 # Set the new client as an administrator
session.commit() session.commit()
return jsonify(f"Database initialised with administrator account with client_id {admin_client.client_id}"), 200 return format_response(True, f"Database initialised with administrator account with client_id {admin_client.client_id}"), 200
return jsonify("Database not empty, this function cannot be used."), 400 return format_response(False, "Database not empty."), 400
@admin_required @admin_required
def promote_to_admin(client_id:str): def promote_to_admin(client_id:str):
@@ -440,8 +428,8 @@ def promote_to_admin(client_id:str):
if client.client_id == client_id: if client.client_id == client_id:
client.administrator = 1 client.administrator = 1
session.commit() session.commit()
return f"client_id: {client_id} has been promoted to administrator.", 200 return format_response(True, f"client_id: {client_id} has been promoted to administrator."), 200
return f"client_id: {client_id} is not found.", 404 return format_response(False, f"client_id: {client_id} is not found."), 404
@admin_required @admin_required
def demote_from_admin(client_id:str): def demote_from_admin(client_id:str):
@@ -449,5 +437,6 @@ def demote_from_admin(client_id:str):
if client.client_id == client_id: if client.client_id == client_id:
client.administrator = 0 client.administrator = 0
session.commit() session.commit()
return f"client_id: {client_id} has been demoted from administrator.", 200 return format_response(True, f"client_id: {client_id} has been demoted from administrator."), 200
return f"client_id: {client_id} is not found.", 404 return format_response(False, f"client_id: {client_id} is not found."), 404

View File

@@ -3,4 +3,5 @@ connexion[swagger-ui]==2.14.2
requests requests
sqlalchemy sqlalchemy
flask-session flask-session
faker faker
customtkinter