from fastapi import FastAPI, Depends, HTTPException, status from fastapi.middleware.cors import CORSMiddleware from sqlalchemy.orm import Session from typing import List from app.core.config import get_db, settings from app.models import models from app.schemas import schemas app = FastAPI( title="HarborForge API", description="Agent/人类协同任务管理平台 API", version="0.1.0" ) # CORS app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Health check @app.get("/health") def health_check(): return {"status": "healthy"} # ============ Issues API ============ @app.post("/issues", response_model=schemas.IssueResponse, status_code=status.HTTP_201_CREATED) def create_issue(issue: schemas.IssueCreate, db: Session = Depends(get_db)): db_issue = models.Issue(**issue.model_dump()) db.add(db_issue) db.commit() db.refresh(db_issue) return db_issue @app.get("/issues", response_model=List[schemas.IssueResponse]) def list_issues( project_id: int = None, status: str = None, issue_type: str = None, skip: int = 0, limit: int = 100, db: Session = Depends(get_db) ): query = db.query(models.Issue) if project_id: query = query.filter(models.Issue.project_id == project_id) if status: query = query.filter(models.Issue.status == status) if issue_type: query = query.filter(models.Issue.issue_type == issue_type) issues = query.offset(skip).limit(limit).all() return issues @app.get("/issues/{issue_id}", response_model=schemas.IssueResponse) def get_issue(issue_id: int, db: Session = Depends(get_db)): issue = db.query(models.Issue).filter(models.Issue.id == issue_id).first() if not issue: raise HTTPException(status_code=404, detail="Issue not found") return issue @app.patch("/issues/{issue_id}", response_model=schemas.IssueResponse) def update_issue(issue_id: int, issue_update: schemas.IssueUpdate, db: Session = Depends(get_db)): issue = db.query(models.Issue).filter(models.Issue.id == issue_id).first() if not issue: raise HTTPException(status_code=404, detail="Issue not found") update_data = issue_update.model_dump(exclude_unset=True) for field, value in update_data.items(): setattr(issue, field, value) db.commit() db.refresh(issue) return issue @app.delete("/issues/{issue_id}", status_code=status.HTTP_204_NO_CONTENT) def delete_issue(issue_id: int, db: Session = Depends(get_db)): issue = db.query(models.Issue).filter(models.Issue.id == issue_id).first() if not issue: raise HTTPException(status_code=404, detail="Issue not found") db.delete(issue) db.commit() return None # ============ Comments API ============ @app.post("/comments", response_model=schemas.CommentResponse, status_code=status.HTTP_201_CREATED) def create_comment(comment: schemas.CommentCreate, db: Session = Depends(get_db)): db_comment = models.Comment(**comment.model_dump()) db.add(db_comment) db.commit() db.refresh(db_comment) return db_comment @app.get("/issues/{issue_id}/comments", response_model=List[schemas.CommentResponse]) def list_comments(issue_id: int, db: Session = Depends(get_db)): comments = db.query(models.Comment).filter(models.Comment.issue_id == issue_id).all() return comments # ============ Projects API ============ @app.post("/projects", response_model=schemas.ProjectResponse, status_code=status.HTTP_201_CREATED) def create_project(project: schemas.ProjectCreate, db: Session = Depends(get_db)): db_project = models.Project(**project.model_dump()) db.add(db_project) db.commit() db.refresh(db_project) return db_project @app.get("/projects", response_model=List[schemas.ProjectResponse]) def list_projects(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)): projects = db.query(models.Project).offset(skip).limit(limit).all() return projects @app.get("/projects/{project_id}", response_model=schemas.ProjectResponse) def get_project(project_id: int, db: Session = Depends(get_db)): project = db.query(models.Project).filter(models.Project.id == project_id).first() if not project: raise HTTPException(status_code=404, detail="Project not found") return project # ============ Users API ============ @app.post("/users", response_model=schemas.UserResponse, status_code=status.HTTP_201_CREATED) def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)): # Check if username or email exists existing = db.query(models.User).filter( (models.User.username == user.username) | (models.User.email == user.email) ).first() if existing: raise HTTPException(status_code=400, detail="Username or email already exists") # Hash password if provided hashed_password = None if user.password: from passlib.context import CryptContext pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") hashed_password = pwd_context.hash(user.password) db_user = models.User( username=user.username, email=user.email, full_name=user.full_name, hashed_password=hashed_password, is_admin=user.is_admin ) db.add(db_user) db.commit() db.refresh(db_user) return db_user @app.get("/users", response_model=List[schemas.UserResponse]) def list_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)): users = db.query(models.User).offset(skip).limit(limit).all() return users @app.get("/users/{user_id}", response_model=schemas.UserResponse) def get_user(user_id: int, db: Session = Depends(get_db)): user = db.query(models.User).filter(models.User.id == user_id).first() if not user: raise HTTPException(status_code=404, detail="User not found") return user # Run database migration on startup @app.on_event("startup") def startup(): from app.core.config import Base, engine Base.metadata.create_all(bind=engine)