auth
The login and registrations system are planned to change. This current solution was used because the goal was just to make it work. Now, we improve.
NOTE: We plan to migrate to Super Tokens authentication solution.
auth.py
The auth.py is used for login and registration, scoreboard fetch, fetch aura points
# auth.py — Flask blueprint for user registration, login, flag submission, and progress
from flask import Blueprint, request, jsonify
import sqlite3
import bcrypt
import jwt
import datetime
# Flask blueprint — handles all /api/auth routes
auth_bp = Blueprint('auth', __name__, url_prefix='/api/auth')
# --- Configuration ---
DB_PATH = "<DB_PATH>" # Path to SQLite database
JWT_SECRET = "<JWT_SECRET>" # Replace with a secure secret
JWT_ALGORITHM = "HS256"
JWT_EXP_DELTA_SECONDS = 3600 * 24 # Token valid for 1 day
# --- Helper function ---
def get_db():
"""Returns a SQLite connection with row access by column name."""
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
# --- Route: User Registration ---
@auth_bp.route("/register", methods=["POST"])
def register():
"""
Registers a new user with username, email, and password.
Passwords are hashed using bcrypt.
"""
data = request.json
username = data.get("username")
email = data.get("email")
password = data.get("password")
if not username or not email or not password:
return jsonify({"success": False, "message": "Username, email and password required"}), 400
hashed_pw = bcrypt.hashpw(password.encode(), bcrypt.gensalt())
conn = get_db()
cur = conn.cursor()
try:
# Prevent duplicate usernames or emails
cur.execute("SELECT 1 FROM users WHERE username = ? OR email = ?", (username, email))
if cur.fetchone():
return jsonify({"success": False, "message": "Username or email already exists"}), 409
# Insert new user
cur.execute("INSERT INTO users (username, email, password_hash) VALUES (?, ?, ?)",
(username, email, hashed_pw))
conn.commit()
except sqlite3.IntegrityError:
return jsonify({"success": False, "message": "Username or email already exists"}), 409
finally:
conn.close()
return jsonify({"success": True, "message": "User registered successfully"}), 201
# --- Route: User Login ---
@auth_bp.route("/login", methods=["POST"])
def login():
"""
Authenticates a user and returns a JWT token for subsequent requests.
"""
data = request.json
username = data.get("username")
password = data.get("password")
if not username or not password:
return jsonify({"success": False, "message": "Username and password required"}), 400
conn = get_db()
cur = conn.cursor()
cur.execute("SELECT password_hash FROM users WHERE username = ?", (username,))
row = cur.fetchone()
conn.close()
if not row:
return jsonify({"success": False, "message": "Invalid username or password"}), 401
password_hash = row["password_hash"]
if not bcrypt.checkpw(password.encode(), password_hash):
return jsonify({"success": False, "message": "Invalid username or password"}), 401
# Generate JWT token
payload = {
"username": username,
"exp": datetime.datetime.utcnow() + datetime.timedelta(seconds=JWT_EXP_DELTA_SECONDS)
}
token = jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
return jsonify({"success": True, "token": token}), 200
# --- Route: Submit Flag ---
@auth_bp.route("/submit", methods=["POST"])
def submit_flag():
"""
Authenticated route for submitting flags.
Updates user progress and awards aura points.
"""
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
return jsonify({"success": False, "message": "Missing or invalid token"}), 401
try:
token = auth_header.split(" ")[1]
payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
username = payload["username"]
except jwt.ExpiredSignatureError:
return jsonify({"success": False, "message": "Token expired"}), 401
except jwt.InvalidTokenError:
return jsonify({"success": False, "message": "Invalid token"}), 401
# Validate input
data = request.json
game = data.get("game")
level = data.get("level")
submitted_flag = data.get("flag")
if not game or not level or not submitted_flag:
return jsonify({"success": False, "message": "Missing game, level or flag"}), 400
conn = get_db()
cur = conn.cursor()
try:
# Fetch level info
cur.execute("SELECT flag, aura_points FROM levels WHERE game = ? AND level = ?", (game, level))
level_data = cur.fetchone()
if not level_data:
return jsonify({"success": False, "message": "Level not found"}), 404
correct_flag = level_data["flag"]
if submitted_flag.strip() != correct_flag.strip():
return jsonify({"success": False, "message": "Incorrect flag"}), 403
# Check if already completed
cur.execute("SELECT 1 FROM progress WHERE username = ? AND game = ? AND level = ?",
(username, game, level))
if cur.fetchone():
return jsonify({
"success": True,
"message": "Flag already submitted",
"aura_points": 0,
"game": game,
"level": level
})
# Insert new completion
aura_points = level_data["aura_points"] or 10
cur.execute("INSERT INTO progress (username, game, level, aura_points) VALUES (?, ?, ?, ?)",
(username, game, level, aura_points))
conn.commit()
return jsonify({
"success": True,
"message": "Correct flag!",
"aura_points": aura_points,
"game": game,
"level": level
})
except Exception as e:
conn.rollback()
return jsonify({"success": False, "message": str(e)}), 500
finally:
conn.close()
# --- Route: Get User Progress ---
@auth_bp.route("/myprogress", methods=["GET"])
def my_progress():
"""
Returns all progress for the authenticated user.
"""
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
return jsonify({"success": False, "message": "Missing or invalid token"}), 401
try:
token = auth_header.split(" ")[1]
payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
username = payload["username"]
conn = get_db()
cur = conn.cursor()
cur.execute("SELECT game, level FROM progress WHERE username = ? ORDER BY game, level", (username,))
progress = [{"series": row["game"], "level": row["level"].replace("level", "")} for row in cur.fetchall()]
return jsonify({"success": True, "progress": progress}), 200
except jwt.ExpiredSignatureError:
return jsonify({"success": False, "message": "Token expired"}), 401
except jwt.InvalidTokenError:
return jsonify({"success": False, "message": "Invalid token"}), 401
except Exception as e:
return jsonify({"success": False, "message": str(e)}), 500
finally:
conn.close()
# --- Route: Get Total Aura Points ---
@auth_bp.route("/aura_points", methods=["GET"])
def get_aura_points():
"""
Returns total aura points from challenges and academy tasks.
"""
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
return jsonify({"success": False, "message": "Missing or invalid token"}), 401
try:
token = auth_header.split(" ")[1]
payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
username = payload["username"]
conn = get_db()
cur = conn.cursor()
# Aura from challenges
cur.execute("SELECT COALESCE(SUM(aura_points), 0) as challenge_aura FROM progress WHERE username = ?", (username,))
challenge_aura = cur.fetchone()["challenge_aura"]
# Aura from academy tasks
cur.execute("""
SELECT COALESCE(SUM(at.aura_points), 0) as academy_aura
FROM academy_user_progress ap
JOIN users u ON ap.user_id = u.id
JOIN academy_tasks at ON ap.task_id = at.id
WHERE u.username = ? AND ap.completed = 1
""", (username,))
academy_aura = cur.fetchone()["academy_aura"]
total = challenge_aura + academy_aura
return jsonify({"success": True, "total_aura": total}), 200
except Exception as e:
return jsonify({"success": False, "message": str(e)}), 500
finally:
conn.close()
# --- Route: Scoreboard ---
@auth_bp.route("/scoreboard", methods=["GET"])
def get_scoreboard():
"""
Returns a leaderboard of users sorted by total aura points.
"""
try:
conn = get_db()
cur = conn.cursor()
cur.execute("SELECT username, COALESCE(SUM(aura_points),0) AS aura_points FROM progress GROUP BY username ORDER BY aura_points DESC")
rows = cur.fetchall()
scoreboard = [{"username": row["username"], "aura_points": row["aura_points"] or 0} for row in rows]
return jsonify(scoreboard), 200
except Exception as e:
return jsonify({"success": False, "message": str(e)}), 500
finally:
conn.close()