Files
HarborForge.Backend/app/main.py

903 lines
31 KiB
Python

from fastapi import FastAPI, Depends, HTTPException, status, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy.orm import Session
from typing import List
from datetime import datetime, timedelta
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
from app.core.config import get_db, settings
from app.models import models
from app.schemas import schemas
from app.services.webhook import fire_webhooks_sync
app = FastAPI(
title="HarborForge API",
description="Agent/人类协同任务管理平台 API",
version="0.1.0"
)
# CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Auth
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token")
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
user_id: int = None
def verify_password(plain_password: str, hashed_password: str) -> bool:
if not hashed_password:
return False
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
password = password[:72]
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: timedelta = None) -> str:
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=30))
to_encode.update({"exp": expire})
return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
user_id = payload.get("sub")
if user_id is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = db.query(models.User).filter(models.User.id == user_id).first()
if user is None:
raise credentials_exception
return user
# Health check
@app.get("/health")
def health_check():
return {"status": "healthy"}
# ============ Auth API ============
@app.post("/auth/token", response_model=Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
user = db.query(models.User).filter(models.User.username == form_data.username).first()
if not user or not verify_password(form_data.password, user.hashed_password or ""):
raise HTTPException(status_code=401, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"})
if not user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
access_token = create_access_token(data={"sub": str(user.id)}, expires_delta=timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES))
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/auth/me", response_model=schemas.UserResponse)
async def get_me(current_user: models.User = Depends(get_current_user)):
return current_user
# ============ Issues API ============
@app.post("/issues", response_model=schemas.IssueResponse, status_code=status.HTTP_201_CREATED)
def create_issue(issue: schemas.IssueCreate, bg: BackgroundTasks, db: Session = Depends(get_db)):
db_issue = models.Issue(**issue.model_dump())
db.add(db_issue)
db.commit()
db.refresh(db_issue)
event = "resolution.created" if db_issue.issue_type == "resolution" else "issue.created"
bg.add_task(fire_webhooks_sync, event, {"issue_id": db_issue.id, "title": db_issue.title, "type": db_issue.issue_type, "status": db_issue.status}, db_issue.project_id, db)
return db_issue
@app.get("/issues", response_model=List[schemas.IssueResponse])
def list_issues(
project_id: int = None,
issue_status: str = None,
issue_type: str = None,
skip: int = 0,
limit: int = 100,
db: Session = Depends(get_db)
):
query = db.query(models.Issue)
if project_id:
query = query.filter(models.Issue.project_id == project_id)
if issue_status:
query = query.filter(models.Issue.status == issue_status)
if issue_type:
query = query.filter(models.Issue.issue_type == issue_type)
issues = query.offset(skip).limit(limit).all()
return issues
@app.get("/issues/overdue", response_model=List[schemas.IssueResponse])
def list_overdue_issues(project_id: int = None, db: Session = Depends(get_db)):
"""List issues past their due date that aren't closed/resolved."""
query = db.query(models.Issue).filter(
models.Issue.due_date != None,
models.Issue.due_date < datetime.utcnow(),
models.Issue.status.notin_(["resolved", "closed"])
)
if project_id:
query = query.filter(models.Issue.project_id == project_id)
return query.order_by(models.Issue.due_date.asc()).all()
@app.get("/issues/{issue_id}", response_model=schemas.IssueResponse)
def get_issue(issue_id: int, db: Session = Depends(get_db)):
issue = db.query(models.Issue).filter(models.Issue.id == issue_id).first()
if not issue:
raise HTTPException(status_code=404, detail="Issue not found")
return issue
@app.patch("/issues/{issue_id}", response_model=schemas.IssueResponse)
def update_issue(issue_id: int, issue_update: schemas.IssueUpdate, db: Session = Depends(get_db)):
issue = db.query(models.Issue).filter(models.Issue.id == issue_id).first()
if not issue:
raise HTTPException(status_code=404, detail="Issue not found")
update_data = issue_update.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(issue, field, value)
db.commit()
db.refresh(issue)
return issue
@app.delete("/issues/{issue_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_issue(issue_id: int, db: Session = Depends(get_db)):
issue = db.query(models.Issue).filter(models.Issue.id == issue_id).first()
if not issue:
raise HTTPException(status_code=404, detail="Issue not found")
db.delete(issue)
db.commit()
return None
# ============ Comments API ============
@app.post("/comments", response_model=schemas.CommentResponse, status_code=status.HTTP_201_CREATED)
def create_comment(comment: schemas.CommentCreate, db: Session = Depends(get_db)):
db_comment = models.Comment(**comment.model_dump())
db.add(db_comment)
db.commit()
db.refresh(db_comment)
return db_comment
@app.get("/issues/{issue_id}/comments", response_model=List[schemas.CommentResponse])
def list_comments(issue_id: int, db: Session = Depends(get_db)):
comments = db.query(models.Comment).filter(models.Comment.issue_id == issue_id).all()
return comments
# ============ Projects API ============
@app.post("/projects", response_model=schemas.ProjectResponse, status_code=status.HTTP_201_CREATED)
def create_project(project: schemas.ProjectCreate, db: Session = Depends(get_db)):
db_project = models.Project(**project.model_dump())
db.add(db_project)
db.commit()
db.refresh(db_project)
return db_project
@app.get("/projects", response_model=List[schemas.ProjectResponse])
def list_projects(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
projects = db.query(models.Project).offset(skip).limit(limit).all()
return projects
@app.get("/projects/{project_id}", response_model=schemas.ProjectResponse)
def get_project(project_id: int, db: Session = Depends(get_db)):
project = db.query(models.Project).filter(models.Project.id == project_id).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
return project
# ============ Users API ============
@app.post("/users", response_model=schemas.UserResponse, status_code=status.HTTP_201_CREATED)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
existing = db.query(models.User).filter(
(models.User.username == user.username) | (models.User.email == user.email)
).first()
if existing:
raise HTTPException(status_code=400, detail="Username or email already exists")
hashed_password = get_password_hash(user.password) if user.password else None
db_user = models.User(
username=user.username,
email=user.email,
full_name=user.full_name,
hashed_password=hashed_password,
is_admin=user.is_admin
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
@app.get("/users", response_model=List[schemas.UserResponse])
def list_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
users = db.query(models.User).offset(skip).limit(limit).all()
return users
@app.get("/users/{user_id}", response_model=schemas.UserResponse)
def get_user(user_id: int, db: Session = Depends(get_db)):
user = db.query(models.User).filter(models.User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
# Run database migration on startup
@app.on_event("startup")
def startup():
from app.core.config import Base, engine
from app.models import webhook
from app.models import apikey
from app.models import activity
from app.models import milestone
Base.metadata.create_all(bind=engine)
# ============ Project Members API ============
@app.post("/projects/{project_id}/members", response_model=schemas.ProjectMemberResponse, status_code=status.HTTP_201_CREATED)
def add_project_member(project_id: int, member: schemas.ProjectMemberCreate, db: Session = Depends(get_db)):
project = db.query(models.Project).filter(models.Project.id == project_id).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
user = db.query(models.User).filter(models.User.id == member.user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
existing = db.query(models.ProjectMember).filter(
models.ProjectMember.project_id == project_id,
models.ProjectMember.user_id == member.user_id
).first()
if existing:
raise HTTPException(status_code=400, detail="User already a member")
db_member = models.ProjectMember(project_id=project_id, user_id=member.user_id, role=member.role)
db.add(db_member)
db.commit()
db.refresh(db_member)
return db_member
@app.get("/projects/{project_id}/members", response_model=List[schemas.ProjectMemberResponse])
def list_project_members(project_id: int, db: Session = Depends(get_db)):
members = db.query(models.ProjectMember).filter(models.ProjectMember.project_id == project_id).all()
return members
@app.delete("/projects/{project_id}/members/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
def remove_project_member(project_id: int, user_id: int, db: Session = Depends(get_db)):
member = db.query(models.ProjectMember).filter(
models.ProjectMember.project_id == project_id,
models.ProjectMember.user_id == user_id
).first()
if not member:
raise HTTPException(status_code=404, detail="Member not found")
db.delete(member)
db.commit()
return None
# ============ System API ============
@app.get("/version")
def version():
return {
"name": "HarborForge",
"version": "0.1.0",
"description": "Agent/人类协同任务管理平台"
}
# ============ Projects (update/delete) ============
@app.patch("/projects/{project_id}", response_model=schemas.ProjectResponse)
def update_project(project_id: int, project_update: schemas.ProjectUpdate, db: Session = Depends(get_db)):
project = db.query(models.Project).filter(models.Project.id == project_id).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
update_data = project_update.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(project, field, value)
db.commit()
db.refresh(project)
return project
@app.delete("/projects/{project_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_project(project_id: int, db: Session = Depends(get_db)):
project = db.query(models.Project).filter(models.Project.id == project_id).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
db.delete(project)
db.commit()
return None
# ============ Users (update/delete) ============
@app.patch("/users/{user_id}", response_model=schemas.UserResponse)
def update_user(user_id: int, db: Session = Depends(get_db), full_name: str = None, email: str = None):
user = db.query(models.User).filter(models.User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
if full_name is not None:
user.full_name = full_name
if email is not None:
user.email = email
db.commit()
db.refresh(user)
return user
# ============ Webhooks API ============
from app.models.webhook import Webhook, WebhookLog
from app.schemas.webhook import WebhookCreate, WebhookUpdate, WebhookResponse, WebhookLogResponse
@app.post("/webhooks", response_model=WebhookResponse, status_code=status.HTTP_201_CREATED)
def create_webhook(wh: WebhookCreate, db: Session = Depends(get_db)):
db_wh = Webhook(**wh.model_dump())
db.add(db_wh)
db.commit()
db.refresh(db_wh)
return db_wh
@app.get("/webhooks", response_model=List[WebhookResponse])
def list_webhooks(project_id: int = None, db: Session = Depends(get_db)):
query = db.query(Webhook)
if project_id is not None:
query = query.filter(Webhook.project_id == project_id)
return query.all()
@app.get("/webhooks/{webhook_id}", response_model=WebhookResponse)
def get_webhook(webhook_id: int, db: Session = Depends(get_db)):
wh = db.query(Webhook).filter(Webhook.id == webhook_id).first()
if not wh:
raise HTTPException(status_code=404, detail="Webhook not found")
return wh
@app.patch("/webhooks/{webhook_id}", response_model=WebhookResponse)
def update_webhook(webhook_id: int, wh_update: WebhookUpdate, db: Session = Depends(get_db)):
wh = db.query(Webhook).filter(Webhook.id == webhook_id).first()
if not wh:
raise HTTPException(status_code=404, detail="Webhook not found")
for field, value in wh_update.model_dump(exclude_unset=True).items():
setattr(wh, field, value)
db.commit()
db.refresh(wh)
return wh
@app.delete("/webhooks/{webhook_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_webhook(webhook_id: int, db: Session = Depends(get_db)):
wh = db.query(Webhook).filter(Webhook.id == webhook_id).first()
if not wh:
raise HTTPException(status_code=404, detail="Webhook not found")
db.delete(wh)
db.commit()
return None
@app.get("/webhooks/{webhook_id}/logs", response_model=List[WebhookLogResponse])
def list_webhook_logs(webhook_id: int, limit: int = 50, db: Session = Depends(get_db)):
logs = db.query(WebhookLog).filter(
WebhookLog.webhook_id == webhook_id
).order_by(WebhookLog.created_at.desc()).limit(limit).all()
return logs
# ============ Issue Status Transition ============
@app.post("/issues/{issue_id}/transition", response_model=schemas.IssueResponse)
def transition_issue(issue_id: int, new_status: str, bg: BackgroundTasks, db: Session = Depends(get_db)):
"""Transition issue status with validation and webhook."""
valid_statuses = ["open", "in_progress", "resolved", "closed", "blocked"]
if new_status not in valid_statuses:
raise HTTPException(status_code=400, detail=f"Invalid status. Must be one of: {valid_statuses}")
issue = db.query(models.Issue).filter(models.Issue.id == issue_id).first()
if not issue:
raise HTTPException(status_code=404, detail="Issue not found")
old_status = issue.status
issue.status = new_status
db.commit()
db.refresh(issue)
event = "issue.closed" if new_status == "closed" else "issue.updated"
bg.add_task(fire_webhooks_sync, event, {
"issue_id": issue.id,
"title": issue.title,
"old_status": old_status,
"new_status": new_status,
}, issue.project_id, db)
return issue
# ============ Search API ============
@app.get("/search/issues", response_model=List[schemas.IssueResponse])
def search_issues(
q: str,
project_id: int = None,
skip: int = 0,
limit: int = 50,
db: Session = Depends(get_db)
):
"""Search issues by title or description keyword."""
query = db.query(models.Issue).filter(
(models.Issue.title.contains(q)) | (models.Issue.description.contains(q))
)
if project_id:
query = query.filter(models.Issue.project_id == project_id)
return query.offset(skip).limit(limit).all()
# ============ Dashboard / Stats API ============
@app.get("/dashboard/stats")
def dashboard_stats(project_id: int = None, db: Session = Depends(get_db)):
"""Get issue statistics for dashboard."""
query = db.query(models.Issue)
if project_id:
query = query.filter(models.Issue.project_id == project_id)
total = query.count()
by_status = {}
for s in ["open", "in_progress", "resolved", "closed", "blocked"]:
by_status[s] = query.filter(models.Issue.status == s).count()
by_type = {}
for t in ["task", "story", "test", "resolution"]:
by_type[t] = query.filter(models.Issue.issue_type == t).count()
by_priority = {}
for p in ["low", "medium", "high", "critical"]:
by_priority[p] = query.filter(models.Issue.priority == p).count()
return {
"total": total,
"by_status": by_status,
"by_type": by_type,
"by_priority": by_priority,
}
# ============ Comments (update/delete) ============
@app.patch("/comments/{comment_id}", response_model=schemas.CommentResponse)
def update_comment(comment_id: int, comment_update: schemas.CommentUpdate, db: Session = Depends(get_db)):
comment = db.query(models.Comment).filter(models.Comment.id == comment_id).first()
if not comment:
raise HTTPException(status_code=404, detail="Comment not found")
for field, value in comment_update.model_dump(exclude_unset=True).items():
setattr(comment, field, value)
db.commit()
db.refresh(comment)
return comment
@app.delete("/comments/{comment_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_comment(comment_id: int, db: Session = Depends(get_db)):
comment = db.query(models.Comment).filter(models.Comment.id == comment_id).first()
if not comment:
raise HTTPException(status_code=404, detail="Comment not found")
db.delete(comment)
db.commit()
return None
# ============ API Key Auth ============
import secrets
from fastapi.security import APIKeyHeader
from app.models.apikey import APIKey
apikey_header = APIKeyHeader(name="X-API-Key", auto_error=False)
async def get_current_user_or_apikey(
token: str = Depends(oauth2_scheme),
api_key: str = Depends(apikey_header),
db: Session = Depends(get_db)
):
"""Authenticate via JWT token OR API key."""
# Try API key first
if api_key:
key_obj = db.query(APIKey).filter(APIKey.key == api_key, APIKey.is_active == True).first()
if key_obj:
key_obj.last_used_at = datetime.utcnow()
db.commit()
user = db.query(models.User).filter(models.User.id == key_obj.user_id).first()
if user:
return user
# Fall back to JWT
if token:
return await get_current_user(token=token, db=db)
raise HTTPException(status_code=401, detail="Not authenticated")
# ============ API Key Management ============
from pydantic import BaseModel as PydanticBaseModel
class APIKeyCreate(PydanticBaseModel):
name: str
user_id: int
class APIKeyResponse(PydanticBaseModel):
id: int
key: str
name: str
user_id: int
is_active: bool
created_at: datetime
last_used_at: datetime | None = None
class Config:
from_attributes = True
@app.post("/api-keys", response_model=APIKeyResponse, status_code=status.HTTP_201_CREATED)
def create_api_key(data: APIKeyCreate, db: Session = Depends(get_db)):
user = db.query(models.User).filter(models.User.id == data.user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
key = secrets.token_hex(32)
db_key = APIKey(key=key, name=data.name, user_id=data.user_id)
db.add(db_key)
db.commit()
db.refresh(db_key)
return db_key
@app.get("/api-keys", response_model=List[APIKeyResponse])
def list_api_keys(user_id: int = None, db: Session = Depends(get_db)):
query = db.query(APIKey)
if user_id:
query = query.filter(APIKey.user_id == user_id)
return query.all()
@app.delete("/api-keys/{key_id}", status_code=status.HTTP_204_NO_CONTENT)
def revoke_api_key(key_id: int, db: Session = Depends(get_db)):
key_obj = db.query(APIKey).filter(APIKey.id == key_id).first()
if not key_obj:
raise HTTPException(status_code=404, detail="API key not found")
key_obj.is_active = False
db.commit()
return None
# ============ Batch Operations ============
class BatchTransition(PydanticBaseModel):
issue_ids: List[int]
new_status: str
class BatchAssign(PydanticBaseModel):
issue_ids: List[int]
assignee_id: int
@app.post("/issues/batch/transition")
def batch_transition(data: BatchTransition, bg: BackgroundTasks, db: Session = Depends(get_db)):
valid_statuses = ["open", "in_progress", "resolved", "closed", "blocked"]
if data.new_status not in valid_statuses:
raise HTTPException(status_code=400, detail=f"Invalid status")
updated = []
for issue_id in data.issue_ids:
issue = db.query(models.Issue).filter(models.Issue.id == issue_id).first()
if issue:
old_status = issue.status
issue.status = data.new_status
updated.append({"id": issue.id, "title": issue.title, "old": old_status, "new": data.new_status})
db.commit()
for u in updated:
event = "issue.closed" if data.new_status == "closed" else "issue.updated"
bg.add_task(fire_webhooks_sync, event, u, None, db)
return {"updated": len(updated), "issues": updated}
@app.post("/issues/batch/assign")
def batch_assign(data: BatchAssign, db: Session = Depends(get_db)):
user = db.query(models.User).filter(models.User.id == data.assignee_id).first()
if not user:
raise HTTPException(status_code=404, detail="Assignee not found")
updated = []
for issue_id in data.issue_ids:
issue = db.query(models.Issue).filter(models.Issue.id == issue_id).first()
if issue:
issue.assignee_id = data.assignee_id
updated.append(issue_id)
db.commit()
return {"updated": len(updated), "issue_ids": updated, "assignee_id": data.assignee_id}
# ============ Activity Log ============
from app.models.activity import ActivityLog
class ActivityLogResponse(PydanticBaseModel):
id: int
action: str
entity_type: str
entity_id: int
user_id: int | None
details: str | None
created_at: datetime
class Config:
from_attributes = True
def log_activity(db: Session, action: str, entity_type: str, entity_id: int, user_id: int = None, details: str = None):
"""Helper to record an activity log entry."""
entry = ActivityLog(action=action, entity_type=entity_type, entity_id=entity_id, user_id=user_id, details=details)
db.add(entry)
db.commit()
@app.get("/activity", response_model=List[ActivityLogResponse])
def list_activity(
entity_type: str = None,
entity_id: int = None,
user_id: int = None,
limit: int = 50,
db: Session = Depends(get_db)
):
query = db.query(ActivityLog)
if entity_type:
query = query.filter(ActivityLog.entity_type == entity_type)
if entity_id:
query = query.filter(ActivityLog.entity_id == entity_id)
if user_id:
query = query.filter(ActivityLog.user_id == user_id)
return query.order_by(ActivityLog.created_at.desc()).limit(limit).all()
# ============ Issue Relations ============
class IssueRelation(PydanticBaseModel):
parent_id: int
child_id: int
@app.post("/issues/link")
def link_issues(rel: IssueRelation, db: Session = Depends(get_db)):
parent = db.query(models.Issue).filter(models.Issue.id == rel.parent_id).first()
child = db.query(models.Issue).filter(models.Issue.id == rel.child_id).first()
if not parent or not child:
raise HTTPException(status_code=404, detail="Issue not found")
if rel.parent_id == rel.child_id:
raise HTTPException(status_code=400, detail="Cannot link issue to itself")
child.depends_on_id = rel.parent_id
db.commit()
return {"parent_id": rel.parent_id, "child_id": rel.child_id, "status": "linked"}
@app.delete("/issues/link")
def unlink_issues(child_id: int, db: Session = Depends(get_db)):
child = db.query(models.Issue).filter(models.Issue.id == child_id).first()
if not child:
raise HTTPException(status_code=404, detail="Issue not found")
child.depends_on_id = None
db.commit()
return {"child_id": child_id, "status": "unlinked"}
@app.get("/issues/{issue_id}/children", response_model=List[schemas.IssueResponse])
def get_children(issue_id: int, db: Session = Depends(get_db)):
return db.query(models.Issue).filter(models.Issue.depends_on_id == issue_id).all()
# ============ Issue Tags ============
@app.post("/issues/{issue_id}/tags")
def add_tag(issue_id: int, tag: str, db: Session = Depends(get_db)):
issue = db.query(models.Issue).filter(models.Issue.id == issue_id).first()
if not issue:
raise HTTPException(status_code=404, detail="Issue not found")
current = set(issue.tags.split(",")) if issue.tags else set()
current.add(tag.strip())
current.discard("")
issue.tags = ",".join(sorted(current))
db.commit()
return {"issue_id": issue_id, "tags": list(current)}
@app.delete("/issues/{issue_id}/tags")
def remove_tag(issue_id: int, tag: str, db: Session = Depends(get_db)):
issue = db.query(models.Issue).filter(models.Issue.id == issue_id).first()
if not issue:
raise HTTPException(status_code=404, detail="Issue not found")
current = set(issue.tags.split(",")) if issue.tags else set()
current.discard(tag.strip())
current.discard("")
issue.tags = ",".join(sorted(current)) if current else None
db.commit()
return {"issue_id": issue_id, "tags": list(current)}
@app.get("/tags")
def list_all_tags(project_id: int = None, db: Session = Depends(get_db)):
"""Get all unique tags across issues."""
query = db.query(models.Issue.tags).filter(models.Issue.tags != None)
if project_id:
query = query.filter(models.Issue.project_id == project_id)
all_tags = set()
for (tags,) in query.all():
for t in tags.split(","):
t = t.strip()
if t:
all_tags.add(t)
return {"tags": sorted(all_tags)}
# ============ Milestones API ============
from app.models.milestone import Milestone as MilestoneModel, MilestoneStatus
from app.schemas.schemas import MilestoneCreate, MilestoneUpdate, MilestoneResponse
@app.post("/milestones", response_model=MilestoneResponse, status_code=status.HTTP_201_CREATED)
def create_milestone(ms: MilestoneCreate, db: Session = Depends(get_db)):
db_ms = MilestoneModel(**ms.model_dump())
db.add(db_ms)
db.commit()
db.refresh(db_ms)
return db_ms
@app.get("/milestones", response_model=List[MilestoneResponse])
def list_milestones(project_id: int = None, status_filter: str = None, db: Session = Depends(get_db)):
query = db.query(MilestoneModel)
if project_id:
query = query.filter(MilestoneModel.project_id == project_id)
if status_filter:
query = query.filter(MilestoneModel.status == status_filter)
return query.order_by(MilestoneModel.due_date.is_(None), MilestoneModel.due_date.asc()).all()
@app.get("/milestones/{milestone_id}", response_model=MilestoneResponse)
def get_milestone(milestone_id: int, db: Session = Depends(get_db)):
ms = db.query(MilestoneModel).filter(MilestoneModel.id == milestone_id).first()
if not ms:
raise HTTPException(status_code=404, detail="Milestone not found")
return ms
@app.patch("/milestones/{milestone_id}", response_model=MilestoneResponse)
def update_milestone(milestone_id: int, ms_update: MilestoneUpdate, db: Session = Depends(get_db)):
ms = db.query(MilestoneModel).filter(MilestoneModel.id == milestone_id).first()
if not ms:
raise HTTPException(status_code=404, detail="Milestone not found")
for field, value in ms_update.model_dump(exclude_unset=True).items():
setattr(ms, field, value)
db.commit()
db.refresh(ms)
return ms
@app.delete("/milestones/{milestone_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_milestone(milestone_id: int, db: Session = Depends(get_db)):
ms = db.query(MilestoneModel).filter(MilestoneModel.id == milestone_id).first()
if not ms:
raise HTTPException(status_code=404, detail="Milestone not found")
db.delete(ms)
db.commit()
return None
@app.get("/milestones/{milestone_id}/issues", response_model=List[schemas.IssueResponse])
def list_milestone_issues(milestone_id: int, db: Session = Depends(get_db)):
"""List all issues in a milestone."""
return db.query(models.Issue).filter(models.Issue.milestone_id == milestone_id).all()
@app.get("/milestones/{milestone_id}/progress")
def milestone_progress(milestone_id: int, db: Session = Depends(get_db)):
"""Get milestone completion progress."""
ms = db.query(MilestoneModel).filter(MilestoneModel.id == milestone_id).first()
if not ms:
raise HTTPException(status_code=404, detail="Milestone not found")
issues = db.query(models.Issue).filter(models.Issue.milestone_id == milestone_id).all()
total = len(issues)
done = sum(1 for i in issues if i.status in ("resolved", "closed"))
return {
"milestone_id": milestone_id,
"title": ms.title,
"total_issues": total,
"completed": done,
"progress_pct": round(done / total * 100, 1) if total else 0,
}
# ============ Export API ============
import csv
import io
from fastapi.responses import StreamingResponse
@app.get("/export/issues")
def export_issues_csv(project_id: int = None, db: Session = Depends(get_db)):
"""Export issues as CSV."""
query = db.query(models.Issue)
if project_id:
query = query.filter(models.Issue.project_id == project_id)
issues = query.all()
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(["id", "title", "type", "status", "priority", "project_id",
"reporter_id", "assignee_id", "milestone_id", "due_date",
"tags", "created_at", "updated_at"])
for i in issues:
writer.writerow([
i.id, i.title, i.issue_type, i.status, i.priority,
i.project_id, i.reporter_id, i.assignee_id, i.milestone_id,
i.due_date, i.tags, i.created_at, i.updated_at
])
output.seek(0)
return StreamingResponse(
iter([output.getvalue()]),
media_type="text/csv",
headers={"Content-Disposition": "attachment; filename=issues.csv"}
)