"""Users router.""" from datetime import datetime from typing import List from fastapi import APIRouter, Depends, HTTPException, status from pydantic import BaseModel from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import Session from app.api.deps import get_current_user, get_password_hash from app.core.config import get_db from app.models import models from app.models.agent import Agent from app.models.role_permission import Permission, Role, RolePermission from app.models.worklog import WorkLog from app.schemas import schemas router = APIRouter(prefix="/users", tags=["Users"]) def _user_response(user: models.User) -> dict: """Build a UserResponse-compatible dict that includes the agent_id when present.""" data = { "id": user.id, "username": user.username, "email": user.email, "full_name": user.full_name, "is_active": user.is_active, "is_admin": user.is_admin, "role_id": user.role_id, "role_name": user.role_name, "agent_id": user.agent.agent_id if user.agent else None, "created_at": user.created_at, } return data def require_admin(current_user: models.User = Depends(get_current_user)): if not current_user.is_admin: raise HTTPException(status_code=403, detail="Admin required") return current_user def _has_global_permission(db: Session, user: models.User, permission_name: str) -> bool: if user.is_admin: return True if not user.role_id: return False perm = db.query(Permission).filter(Permission.name == permission_name).first() if not perm: return False return db.query(RolePermission).filter( RolePermission.role_id == user.role_id, RolePermission.permission_id == perm.id, ).first() is not None def require_account_creator( db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user), ): if current_user.is_admin or _has_global_permission(db, current_user, "account.create"): return current_user raise HTTPException(status_code=403, detail="Account creation permission required") def _resolve_user_role(db: Session, role_id: int | None) -> Role: if role_id is None: role = db.query(Role).filter(Role.name == "guest").first() if not role: raise HTTPException(status_code=500, detail="Default guest role is missing") return role role = db.query(Role).filter(Role.id == role_id).first() if not role: raise HTTPException(status_code=400, detail="Role not found") if role.name == "admin": raise HTTPException(status_code=400, detail="Admin role cannot be assigned via user management") if role.is_global is False: raise HTTPException(status_code=400, detail="Only global roles can be assigned to accounts") return role @router.post("", response_model=schemas.UserResponse, status_code=status.HTTP_201_CREATED) def create_user( user: schemas.UserCreate, db: Session = Depends(get_db), _: models.User = Depends(require_account_creator), ): # Validate agent_id / claw_identifier: both or neither has_agent_id = bool(user.agent_id) has_claw = bool(user.claw_identifier) if has_agent_id != has_claw: raise HTTPException( status_code=400, detail="agent_id and claw_identifier must both be provided or both omitted", ) existing = db.query(models.User).filter( (models.User.username == user.username) | (models.User.email == user.email) ).first() if existing: raise HTTPException(status_code=400, detail="Username or email already exists") # Check agent_id uniqueness if has_agent_id: existing_agent = db.query(Agent).filter(Agent.agent_id == user.agent_id).first() if existing_agent: raise HTTPException(status_code=400, detail="agent_id already in use") assigned_role = _resolve_user_role(db, user.role_id) hashed_password = get_password_hash(user.password) if user.password else None db_user = models.User( username=user.username, email=user.email, full_name=user.full_name, hashed_password=hashed_password, is_admin=False, is_active=True, role_id=assigned_role.id, ) db.add(db_user) db.flush() # get db_user.id # Create Agent record if agent binding is requested (BE-CAL-003) if has_agent_id: db_agent = Agent( user_id=db_user.id, agent_id=user.agent_id, claw_identifier=user.claw_identifier, ) db.add(db_agent) db.commit() db.refresh(db_user) return _user_response(db_user) @router.get("", response_model=List[schemas.UserResponse]) def list_users( skip: int = 0, limit: int = 100, db: Session = Depends(get_db), _: models.User = Depends(require_admin), ): users = db.query(models.User).order_by(models.User.created_at.desc()).offset(skip).limit(limit).all() return [_user_response(u) for u in users] def _find_user_by_id_or_username(db: Session, identifier: str) -> models.User | None: """Resolve a user by numeric id or username string.""" try: uid = int(identifier) return db.query(models.User).filter(models.User.id == uid).first() except ValueError: return db.query(models.User).filter(models.User.username == identifier).first() @router.get("/{identifier}", response_model=schemas.UserResponse) def get_user( identifier: str, db: Session = Depends(get_db), _: models.User = Depends(require_admin), ): user = _find_user_by_id_or_username(db, identifier) if not user: raise HTTPException(status_code=404, detail="User not found") return _user_response(user) @router.patch("/{identifier}", response_model=schemas.UserResponse) def update_user( identifier: str, payload: schemas.UserUpdate, db: Session = Depends(get_db), current_user: models.User = Depends(require_admin), ): user = _find_user_by_id_or_username(db, identifier) if not user: raise HTTPException(status_code=404, detail="User not found") if payload.email is not None and payload.email != user.email: existing = db.query(models.User).filter(models.User.email == payload.email, models.User.id != user.id).first() if existing: raise HTTPException(status_code=400, detail="Email already exists") user.email = payload.email if payload.full_name is not None: user.full_name = payload.full_name if payload.password is not None and payload.password.strip(): user.hashed_password = get_password_hash(payload.password) if payload.role_id is not None: if user.is_admin: raise HTTPException(status_code=400, detail="Admin accounts cannot be reassigned via user management") assigned_role = _resolve_user_role(db, payload.role_id) user.role_id = assigned_role.id if payload.is_active is not None: if current_user.id == user.id and payload.is_active is False: raise HTTPException(status_code=400, detail="You cannot deactivate your own account") user.is_active = payload.is_active db.commit() db.refresh(user) return _user_response(user) @router.delete("/{identifier}", status_code=status.HTTP_204_NO_CONTENT) def delete_user( identifier: str, db: Session = Depends(get_db), current_user: models.User = Depends(require_admin), ): user = _find_user_by_id_or_username(db, identifier) if not user: raise HTTPException(status_code=404, detail="User not found") if current_user.id == user.id: raise HTTPException(status_code=400, detail="You cannot delete your own account") # Protect built-in accounts from deletion if user.is_admin: raise HTTPException(status_code=400, detail="Admin accounts cannot be deleted") if user.username == "acc-mgr": raise HTTPException(status_code=400, detail="The acc-mgr account is a built-in account and cannot be deleted") try: db.delete(user) db.commit() except IntegrityError: db.rollback() raise HTTPException(status_code=400, detail="User has related records. Deactivate the account instead.") return None @router.post("/{identifier}/reset-apikey") def reset_user_apikey( identifier: str, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user), ): """Reset (regenerate) a user's API key. Permission rules: - user.reset-apikey: can reset any user's API key - user.reset-self-apikey: can reset only own API key - admin: can reset any user's API key """ import secrets from app.models.apikey import APIKey target_user = _find_user_by_id_or_username(db, identifier) if not target_user: raise HTTPException(status_code=404, detail="User not found") is_self = current_user.id == target_user.id can_reset_any = _has_global_permission(db, current_user, "user.reset-apikey") can_reset_self = _has_global_permission(db, current_user, "user.reset-self-apikey") if not (can_reset_any or (is_self and can_reset_self)): raise HTTPException(status_code=403, detail="API key reset permission required") # Find existing active API key for target user, or create one existing_key = db.query(APIKey).filter( APIKey.user_id == target_user.id, APIKey.is_active == True, ).first() new_key_value = secrets.token_hex(32) if existing_key: # Deactivate old key existing_key.is_active = False db.flush() # Create new key new_key = APIKey( key=new_key_value, name=f"{target_user.username}-key", user_id=target_user.id, is_active=True, ) db.add(new_key) db.commit() db.refresh(new_key) return { "user_id": target_user.id, "username": target_user.username, "api_key": new_key_value, "message": "API key has been reset. Please save this key — it will not be shown again.", } class WorkLogResponse(BaseModel): id: int task_id: int user_id: int hours: float description: str | None = None logged_date: datetime created_at: datetime class Config: from_attributes = True @router.get("/{identifier}/worklogs", response_model=List[WorkLogResponse]) def list_user_worklogs( identifier: str, limit: int = 50, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user), ): user = _find_user_by_id_or_username(db, identifier) if not user: raise HTTPException(status_code=404, detail="User not found") if current_user.id != user.id and not current_user.is_admin: raise HTTPException(status_code=403, detail="Forbidden") return db.query(WorkLog).filter(WorkLog.user_id == user.id).order_by(WorkLog.logged_date.desc()).limit(limit).all()