Compare commits
2 Commits
214a9b109d
...
9e14df921e
| Author | SHA1 | Date | |
|---|---|---|---|
| 9e14df921e | |||
| f16bdd9725 |
118
app/api/rbac.py
118
app/api/rbac.py
@@ -3,7 +3,8 @@ from fastapi import HTTPException, status
|
||||
from sqlalchemy.orm import Session
|
||||
from app.models import models
|
||||
from app.models.role_permission import Role, Permission, RolePermission
|
||||
from app.models import models
|
||||
from app.models.milestone import Milestone
|
||||
from app.models.task import Task
|
||||
|
||||
|
||||
def get_user_role(db: Session, user_id: int, project_id: int) -> Role | None:
|
||||
@@ -12,36 +13,36 @@ def get_user_role(db: Session, user_id: int, project_id: int) -> Role | None:
|
||||
models.ProjectMember.user_id == user_id,
|
||||
models.ProjectMember.project_id == project_id,
|
||||
).first()
|
||||
|
||||
|
||||
if member and member.role_id:
|
||||
return db.query(Role).filter(Role.id == member.role_id).first()
|
||||
|
||||
|
||||
# Check global admin
|
||||
user = db.query(models.User).filter(models.User.id == user_id).first()
|
||||
if user and user.is_admin:
|
||||
# Return global admin role
|
||||
return db.query(Role).filter(Role.is_global == True, Role.name == "superadmin").first()
|
||||
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def has_permission(db: Session, user_id: int, project_id: int, permission: str) -> bool:
|
||||
"""Check if user has a specific permission in a project."""
|
||||
role = get_user_role(db, user_id, project_id)
|
||||
|
||||
|
||||
if not role:
|
||||
return False
|
||||
|
||||
|
||||
# Check if role has the permission
|
||||
perm = db.query(Permission).filter(Permission.name == permission).first()
|
||||
if not perm:
|
||||
return False
|
||||
|
||||
|
||||
role_perm = db.query(RolePermission).filter(
|
||||
RolePermission.role_id == role.id,
|
||||
RolePermission.permission_id == perm.id
|
||||
).first()
|
||||
|
||||
|
||||
return role_perm is not None
|
||||
|
||||
|
||||
@@ -58,41 +59,104 @@ def check_permission(db: Session, user_id: int, project_id: int, permission: str
|
||||
|
||||
def check_project_role(db: Session, user_id: int, project_id: int, min_role: str = "member"):
|
||||
"""Check if user has at least the specified role in a project."""
|
||||
# Check if user is global admin
|
||||
user = db.query(models.User).filter(models.User.id == user_id).first()
|
||||
if user and user.is_admin:
|
||||
return True
|
||||
|
||||
# Get user's role in project
|
||||
|
||||
member = db.query(models.ProjectMember).filter(
|
||||
models.ProjectMember.user_id == user_id,
|
||||
models.ProjectMember.project_id == project_id,
|
||||
).first()
|
||||
|
||||
|
||||
if not member or not member.role_id:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN,
|
||||
detail=f"You are not a member of this project"
|
||||
detail="You are not a member of this project"
|
||||
)
|
||||
|
||||
|
||||
role = db.query(Role).filter(Role.id == member.role_id).first()
|
||||
if not role:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN,
|
||||
detail=f"Role not found"
|
||||
detail="Role not found"
|
||||
)
|
||||
|
||||
# Role hierarchy: admin > member > guest
|
||||
role_hierarchy = {"admin": 3, "member": 2, "guest": 1}
|
||||
user_role_level = role_hierarchy.get(role.name, 0)
|
||||
required_level = role_hierarchy.get(min_role, 0)
|
||||
|
||||
if user_role_level < required_level:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN,
|
||||
detail=f"Role '{min_role}' or higher required. Your role: {role.name}"
|
||||
)
|
||||
|
||||
|
||||
# Legacy compatibility: most current routes use non-hierarchical names like dev/mgr.
|
||||
# For now, any valid membership passes those broad checks; strict edit rules are handled
|
||||
# by the explicit can_edit_* helpers below.
|
||||
if min_role in {"dev", "mgr", "viewer", "member", "guest", "admin"}:
|
||||
return True
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def get_project_role_name(db: Session, user_id: int, project_id: int) -> str | None:
|
||||
if is_global_admin(db, user_id):
|
||||
return "admin"
|
||||
member = db.query(models.ProjectMember).filter(
|
||||
models.ProjectMember.user_id == user_id,
|
||||
models.ProjectMember.project_id == project_id,
|
||||
).first()
|
||||
if not member or not member.role_id:
|
||||
return None
|
||||
role = db.query(Role).filter(Role.id == member.role_id).first()
|
||||
return role.name if role else None
|
||||
|
||||
|
||||
def is_global_admin(db: Session, user_id: int) -> bool:
|
||||
user = db.query(models.User).filter(models.User.id == user_id).first()
|
||||
return bool(user and user.is_admin)
|
||||
|
||||
|
||||
def has_project_admin_role(db: Session, user_id: int, project_id: int) -> bool:
|
||||
return get_project_role_name(db, user_id, project_id) == "admin"
|
||||
|
||||
|
||||
def can_edit_project(db: Session, user_id: int, project: models.Project) -> bool:
|
||||
return (
|
||||
is_global_admin(db, user_id)
|
||||
or project.owner_id == user_id
|
||||
or has_project_admin_role(db, user_id, project.id)
|
||||
)
|
||||
|
||||
|
||||
def ensure_can_edit_project(db: Session, user_id: int, project: models.Project):
|
||||
if not can_edit_project(db, user_id, project):
|
||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Project edit permission denied")
|
||||
|
||||
|
||||
def can_edit_milestone(db: Session, user_id: int, milestone: Milestone) -> bool:
|
||||
project = db.query(models.Project).filter(models.Project.id == milestone.project_id).first()
|
||||
if not project:
|
||||
return False
|
||||
return (
|
||||
is_global_admin(db, user_id)
|
||||
or project.owner_id == user_id
|
||||
or milestone.created_by_id == user_id
|
||||
or has_project_admin_role(db, user_id, milestone.project_id)
|
||||
)
|
||||
|
||||
|
||||
def ensure_can_edit_milestone(db: Session, user_id: int, milestone: Milestone):
|
||||
if not can_edit_milestone(db, user_id, milestone):
|
||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Milestone edit permission denied")
|
||||
|
||||
|
||||
def can_edit_task(db: Session, user_id: int, task: Task) -> bool:
|
||||
project = db.query(models.Project).filter(models.Project.id == task.project_id).first()
|
||||
milestone = db.query(Milestone).filter(Milestone.id == task.milestone_id).first()
|
||||
if not project:
|
||||
return False
|
||||
return (
|
||||
is_global_admin(db, user_id)
|
||||
or project.owner_id == user_id
|
||||
or task.created_by_id == user_id
|
||||
or (milestone is not None and milestone.created_by_id == user_id)
|
||||
or has_project_admin_role(db, user_id, task.project_id)
|
||||
)
|
||||
|
||||
|
||||
def ensure_can_edit_task(db: Session, user_id: int, task: Task):
|
||||
if not can_edit_task(db, user_id, task):
|
||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Task edit permission denied")
|
||||
|
||||
|
||||
@@ -7,7 +7,7 @@ from typing import List
|
||||
|
||||
from app.core.config import get_db
|
||||
from app.api.deps import get_current_user_or_apikey
|
||||
from app.api.rbac import check_project_role
|
||||
from app.api.rbac import check_project_role, ensure_can_edit_milestone
|
||||
from app.models import models
|
||||
from app.models.milestone import Milestone
|
||||
from app.models.task import Task, TaskStatus, TaskPriority
|
||||
@@ -30,6 +30,7 @@ def _serialize_milestone(milestone):
|
||||
"depend_on_milestones": json.loads(milestone.depend_on_milestones) if milestone.depend_on_milestones else [],
|
||||
"depend_on_tasks": json.loads(milestone.depend_on_tasks) if milestone.depend_on_tasks else [],
|
||||
"project_id": milestone.project_id,
|
||||
"created_by_id": milestone.created_by_id,
|
||||
"created_at": milestone.created_at,
|
||||
"updated_at": milestone.updated_at,
|
||||
}
|
||||
@@ -58,7 +59,7 @@ def create_milestone(project_id: int, milestone: schemas.MilestoneCreate, db: Se
|
||||
data["depend_on_milestones"] = json.dumps(data["depend_on_milestones"])
|
||||
if data.get("depend_on_tasks"):
|
||||
data["depend_on_tasks"] = json.dumps(data["depend_on_tasks"])
|
||||
db_milestone = Milestone(project_id=project_id, milestone_code=milestone_code, **data)
|
||||
db_milestone = Milestone(project_id=project_id, milestone_code=milestone_code, created_by_id=current_user.id, **data)
|
||||
db.add(db_milestone)
|
||||
db.commit()
|
||||
db.refresh(db_milestone)
|
||||
@@ -76,10 +77,10 @@ def get_milestone(project_id: int, milestone_id: int, db: Session = Depends(get_
|
||||
|
||||
@router.patch("/{milestone_id}", response_model=schemas.MilestoneResponse)
|
||||
def update_milestone(project_id: int, milestone_id: int, milestone: schemas.MilestoneUpdate, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)):
|
||||
check_project_role(db, current_user.id, project_id, min_role="mgr")
|
||||
db_milestone = db.query(Milestone).filter(Milestone.id == milestone_id, Milestone.project_id == project_id).first()
|
||||
if not db_milestone:
|
||||
raise HTTPException(status_code=404, detail="Milestone not found")
|
||||
ensure_can_edit_milestone(db, current_user.id, db_milestone)
|
||||
data = milestone.model_dump(exclude_unset=True)
|
||||
if "depend_on_milestones" in data:
|
||||
data["depend_on_milestones"] = json.dumps(data["depend_on_milestones"]) if data["depend_on_milestones"] else None
|
||||
|
||||
@@ -13,6 +13,7 @@ from pydantic import BaseModel
|
||||
|
||||
from app.core.config import get_db
|
||||
from app.api.deps import get_current_user_or_apikey
|
||||
from app.api.rbac import ensure_can_edit_milestone
|
||||
from app.models import models
|
||||
from app.models.apikey import APIKey
|
||||
from app.models.activity import ActivityLog
|
||||
@@ -126,6 +127,7 @@ def create_milestone(ms: schemas.MilestoneCreate, db: Session = Depends(get_db),
|
||||
data["depend_on_tasks"] = None
|
||||
|
||||
db_ms = MilestoneModel(**data)
|
||||
db_ms.created_by_id = current_user.id
|
||||
db_ms.milestone_code = milestone_code
|
||||
db.add(db_ms)
|
||||
db.commit()
|
||||
@@ -152,10 +154,11 @@ def get_milestone(milestone_id: int, db: Session = Depends(get_db)):
|
||||
|
||||
|
||||
@router.patch("/milestones/{milestone_id}", response_model=schemas.MilestoneResponse, tags=["Milestones"])
|
||||
def update_milestone(milestone_id: int, ms_update: schemas.MilestoneUpdate, db: Session = Depends(get_db)):
|
||||
def update_milestone(milestone_id: int, ms_update: schemas.MilestoneUpdate, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)):
|
||||
ms = db.query(MilestoneModel).filter(MilestoneModel.id == milestone_id).first()
|
||||
if not ms:
|
||||
raise HTTPException(status_code=404, detail="Milestone not found")
|
||||
ensure_can_edit_milestone(db, current_user.id, ms)
|
||||
for field, value in ms_update.model_dump(exclude_unset=True).items():
|
||||
setattr(ms, field, value)
|
||||
db.commit()
|
||||
|
||||
@@ -10,7 +10,7 @@ from app.models import models
|
||||
from app.models.role_permission import Role
|
||||
from app.schemas import schemas
|
||||
from app.api.deps import get_current_user_or_apikey
|
||||
from app.api.rbac import check_project_role, check_permission
|
||||
from app.api.rbac import check_project_role, check_permission, ensure_can_edit_project
|
||||
|
||||
router = APIRouter(prefix="/projects", tags=["Projects"])
|
||||
|
||||
@@ -197,10 +197,10 @@ def update_project(
|
||||
db: Session = Depends(get_db),
|
||||
current_user: models.User = Depends(get_current_user_or_apikey),
|
||||
):
|
||||
check_project_role(db, current_user.id, project_id, min_role="mgr")
|
||||
project = db.query(models.Project).filter(models.Project.id == project_id).first()
|
||||
if not project:
|
||||
raise HTTPException(status_code=404, detail="Project not found")
|
||||
ensure_can_edit_project(db, current_user.id, project)
|
||||
update_data = project_update.model_dump(exclude_unset=True)
|
||||
update_data.pop("name", None)
|
||||
if "sub_projects" in update_data and update_data["sub_projects"]:
|
||||
|
||||
@@ -14,7 +14,7 @@ from app.schemas import schemas
|
||||
from app.services.webhook import fire_webhooks_sync
|
||||
from app.models.notification import Notification as NotificationModel
|
||||
from app.api.deps import get_current_user_or_apikey
|
||||
from app.api.rbac import check_project_role
|
||||
from app.api.rbac import check_project_role, ensure_can_edit_task
|
||||
from app.services.activity import log_activity
|
||||
|
||||
router = APIRouter(tags=["Tasks"])
|
||||
@@ -162,7 +162,7 @@ def update_task(task_id: int, task_update: schemas.TaskUpdate, db: Session = Dep
|
||||
task = db.query(Task).filter(Task.id == task_id).first()
|
||||
if not task:
|
||||
raise HTTPException(status_code=404, detail="Task not found")
|
||||
check_project_role(db, current_user.id, task.project_id, min_role="dev")
|
||||
ensure_can_edit_task(db, current_user.id, task)
|
||||
|
||||
update_data = task_update.model_dump(exclude_unset=True)
|
||||
if "status" in update_data:
|
||||
|
||||
10
app/main.py
10
app/main.py
@@ -137,6 +137,16 @@ def _migrate_schema():
|
||||
db.execute(text("ALTER TABLE tasks ADD COLUMN resolution_summary TEXT NULL"))
|
||||
db.execute(text("ALTER TABLE tasks ADD COLUMN positions TEXT NULL"))
|
||||
db.execute(text("ALTER TABLE tasks ADD COLUMN pending_matters TEXT NULL"))
|
||||
result = db.execute(text("SHOW COLUMNS FROM tasks LIKE 'created_by_id'"))
|
||||
if not result.fetchone():
|
||||
db.execute(text("ALTER TABLE tasks ADD COLUMN created_by_id INTEGER NULL"))
|
||||
_ensure_fk(db, "tasks", "created_by_id", "users", "id", "fk_tasks_created_by_id")
|
||||
|
||||
# milestones creator field
|
||||
result = db.execute(text("SHOW COLUMNS FROM milestones LIKE 'created_by_id'"))
|
||||
if not result.fetchone():
|
||||
db.execute(text("ALTER TABLE milestones ADD COLUMN created_by_id INTEGER NULL"))
|
||||
_ensure_fk(db, "milestones", "created_by_id", "users", "id", "fk_milestones_created_by_id")
|
||||
|
||||
# comments: issue_id -> task_id
|
||||
if _has_table(db, "comments"):
|
||||
|
||||
@@ -24,6 +24,7 @@ class Milestone(Base):
|
||||
depend_on_milestones = Column(Text, nullable=True)
|
||||
depend_on_tasks = Column(Text, nullable=True)
|
||||
project_id = Column(Integer, ForeignKey("projects.id"), nullable=False)
|
||||
created_by_id = Column(Integer, ForeignKey("users.id"), nullable=True)
|
||||
created_at = Column(DateTime(timezone=True), server_default=func.now())
|
||||
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
from pydantic import BaseModel
|
||||
from typing import Optional, List
|
||||
from datetime import datetime
|
||||
from datetime import datetime, time
|
||||
from enum import Enum
|
||||
|
||||
|
||||
@@ -76,6 +76,8 @@ class TaskResponse(TaskBase):
|
||||
milestone_id: int
|
||||
reporter_id: int
|
||||
assignee_id: Optional[int] = None
|
||||
created_by_id: Optional[int] = None
|
||||
estimated_working_time: Optional[time] = None
|
||||
resolution_summary: Optional[str] = None
|
||||
positions: Optional[str] = None
|
||||
pending_matters: Optional[str] = None
|
||||
@@ -116,6 +118,7 @@ class ProjectBase(BaseModel):
|
||||
name: str
|
||||
owner_name: Optional[str] = None
|
||||
description: Optional[str] = None
|
||||
repo: Optional[str] = None
|
||||
sub_projects: Optional[list[str]] = None
|
||||
related_projects: Optional[list[str]] = None
|
||||
|
||||
@@ -127,6 +130,7 @@ class ProjectCreate(ProjectBase):
|
||||
class ProjectUpdate(BaseModel):
|
||||
description: Optional[str] = None
|
||||
owner_name: Optional[str] = None
|
||||
repo: Optional[str] = None
|
||||
sub_projects: Optional[list[str]] = None
|
||||
related_projects: Optional[list[str]] = None
|
||||
|
||||
@@ -137,11 +141,15 @@ class ProjectResponse(BaseModel):
|
||||
owner_name: Optional[str] = None
|
||||
project_code: Optional[str] = None
|
||||
description: Optional[str] = None
|
||||
repo: Optional[str] = None
|
||||
sub_projects: Optional[list[str]] = None
|
||||
related_projects: Optional[list[str]] = None
|
||||
owner_id: int
|
||||
created_at: datetime
|
||||
|
||||
class Config:
|
||||
from_attributes = True
|
||||
|
||||
|
||||
# User schemas
|
||||
class UserBase(BaseModel):
|
||||
@@ -214,6 +222,7 @@ class MilestoneUpdate(BaseModel):
|
||||
class MilestoneResponse(MilestoneBase):
|
||||
id: int
|
||||
project_id: int
|
||||
created_by_id: Optional[int] = None
|
||||
created_at: datetime
|
||||
updated_at: Optional[datetime] = None
|
||||
|
||||
|
||||
Reference in New Issue
Block a user