client side hashing, shared via JSON, password reset working

This commit is contained in:
Lucas Mathews
2024-05-29 19:06:02 +02:00
parent b70ba6ae2e
commit 6c7883657d
11 changed files with 290 additions and 107 deletions

View File

@@ -6,16 +6,18 @@ from class_account import Account
from class_transaction import Transaction
from emailer import EmailSendingError # Import the EmailSendingError class to handle email sending errors
from flask import jsonify, session as flask_session # Imports the Flask modules
from functools import wraps # For decorators / user login
from database import * # Importing the database connection
from emailer import send_email # Importing the emailer function
from flask import session as flask_session
from flask import request
from database import session
import hashlib # For password hashing
import datetime # For timestamps
import uuid # For unique identifiers
import random # For OTP generation
import time # For OTP generation
from functools import wraps # For decorators / user login
from database import * # Importing the database connection
from emailer import send_email # Importing the emailer function
from flask import session as flask_session
from database import session
import re # For password hash validation
otps = {} # Temporary dictionary to store OTPs and their creation time
@@ -41,10 +43,8 @@ def generate_uuid_short():
def get_email(client_id:str):
"""Returns the email of a client given their client_id. If the client is not found, returns None."""
for client in session.query(Client).all():
if client.client_id == client_id:
return client.email
return None
client = session.query(Client).filter_by(client_id=client_id).one_or_none()
return client.email if client else None
def format_response(success: bool, message: str = '', data: dict = None):
"""Formats the response for the API so that it is standardised across all functions."""
@@ -68,10 +68,12 @@ def get_current_client():
def verify_otp(client_id:str, otp:int):
"""Verifies a one time password for a client. Returns True if the OTP is correct and False otherwise."""
if client_id in otps and otps[client_id][0] == otp:
return True
if client_id in otps:
stored_otp, creation_time = otps[client_id]
if stored_otp == otp and time.time() - creation_time <= 300: # Check if OTP is within 5 minutes
return True
return False
def delete_otp(client_id:str):
"""Deletes the OTP for a client."""
if client_id in otps:
@@ -88,14 +90,18 @@ def check_expired_otps():
### Authentication ###
######################
def login(client_id:str, password:str):
"""Logs in a client using their client_id and password. Returns a success message if the login is successful and an error message otherwise."""
password_hash = hash_password(password)
for client in session.query(Client).all():
if client.client_id == client_id and client.hash == password_hash:
flask_session['client_id'] = client_id
return format_response(True, f"{flask_session['client_id']} logged in succsessfully."), 200
return format_response(False, "Invalid client_id or password."), 400
def login():
"""Logs in a client using their client_id and password hash. Returns a success message if the login is successful and an error message otherwise."""
data = request.get_json()
client_id = data.get('client_id')
client_hash = data.get('client_hash')
client = session.query(Client).filter_by(client_id=client_id).first()
if client and client.hash == client_hash:
flask_session['client_id'] = client_id
return format_response(True, f"{flask_session['client_id']} logged in successfully."), 200
return format_response(False, "Invalid client_id or password."), 401
def logout():
"""Logs out a client. Returns a success message if the logout is successful and an error message otherwise."""
@@ -162,14 +168,16 @@ def generate_otp(client_id: str):
##############
@login_required
def get_client(client_id:str):
def get_client(client_id: str):
"""Returns a specific client in the database. If the client is not found, returns an error message."""
current_client_id, is_admin = get_current_client()
if not is_admin and client_id != current_client_id:
return format_response(False, "You can only view your own client information."), 403
for client in session.query(Client).all():
if client.client_id == client_id:
return format_response(True, "", client.to_dict()), 200
client = session.query(Client).filter_by(client_id=client_id).one_or_none()
if client:
return format_response(True, "", client.to_dict()), 200
return format_response(False, "Client not found."), 404
@login_required
@@ -192,25 +200,44 @@ def update_client(client_id:str, otp_code:int, **kwargs):
return format_response(False, "Client not found."), 404
@login_required
def change_password(client_id:str, password:str, new_password:str, otp:int):
def change_password():
"""Changes the password for a client in the database. If the client is not found, returns an error message."""
data = request.get_json()
client_id = data.get('client_id')
hash_old_password = data.get('hash_old_password')
hash_new_password = data.get('hash_new_password')
otp_code = data.get('otp_code')
current_client_id, is_admin = get_current_client()
# Verify if the OTP is correct
otp_verified = verify_otp(client_id, otp_code)
# Check if the client is authorized to change the password
if not is_admin and client_id != current_client_id:
return format_response(False, "You can only update your own client information."), 403
if not verify_otp(client_id, otp):
return format_response(False, "Invalid OTP."), 400
old_hash = hash_password(password)
new_hash = hash_password(new_password)
for client in session.query(Client).all():
if client.client_id == client_id:
if client.hash == old_hash:
client.hash = new_hash
session.commit()
delete_otp(client_id)
return format_response(True, f"Password for client_id: {client_id} has been updated."), 200
return format_response(False, "Invalid password."), 400
return format_response(False, "Client not found."), 404
return format_response(False, "You can only update your own client information."), 401
# Recheck OTP verification after authorisation check
if not otp_verified:
return format_response(False, "Invalid OTP."), 400
# Validate new password format
hash_format = r'^[0-9a-f]{128}$'
if not re.match(hash_format, hash_new_password):
return format_response(False, "Invalid new password format (must be provided as a hash)."), 400
# Check if the old password hash matches and update to the new password hash
client = session.query(Client).filter_by(client_id=client_id).first()
if client:
if client.hash == hash_old_password:
client.hash = hash_new_password
session.commit()
delete_otp(client_id)
return format_response(True, f"Password for client_id: {client_id} has been updated."), 200
else:
return format_response(False, "Invalid old password."), 400
else:
return format_response(False, "Client not found."), 404
@login_required
def get_accounts(client_id: str):