refactor: move all files to root (no nested backend/)

This commit is contained in:
root
2026-02-21 08:25:37 +00:00
parent a557ab6f4a
commit fd980c0344
21 changed files with 0 additions and 91 deletions

190
app/main.py Normal file
View File

@@ -0,0 +1,190 @@
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy.orm import Session
from typing import List
from app.core.config import get_db, settings
from app.models import models
from app.schemas import schemas
app = FastAPI(
title="HarborForge API",
description="Agent/人类协同任务管理平台 API",
version="0.1.0"
)
# CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Health check
@app.get("/health")
def health_check():
return {"status": "healthy"}
# ============ Issues API ============
@app.post("/issues", response_model=schemas.IssueResponse, status_code=status.HTTP_201_CREATED)
def create_issue(issue: schemas.IssueCreate, db: Session = Depends(get_db)):
db_issue = models.Issue(**issue.model_dump())
db.add(db_issue)
db.commit()
db.refresh(db_issue)
return db_issue
@app.get("/issues", response_model=List[schemas.IssueResponse])
def list_issues(
project_id: int = None,
status: str = None,
issue_type: str = None,
skip: int = 0,
limit: int = 100,
db: Session = Depends(get_db)
):
query = db.query(models.Issue)
if project_id:
query = query.filter(models.Issue.project_id == project_id)
if status:
query = query.filter(models.Issue.status == status)
if issue_type:
query = query.filter(models.Issue.issue_type == issue_type)
issues = query.offset(skip).limit(limit).all()
return issues
@app.get("/issues/{issue_id}", response_model=schemas.IssueResponse)
def get_issue(issue_id: int, db: Session = Depends(get_db)):
issue = db.query(models.Issue).filter(models.Issue.id == issue_id).first()
if not issue:
raise HTTPException(status_code=404, detail="Issue not found")
return issue
@app.patch("/issues/{issue_id}", response_model=schemas.IssueResponse)
def update_issue(issue_id: int, issue_update: schemas.IssueUpdate, db: Session = Depends(get_db)):
issue = db.query(models.Issue).filter(models.Issue.id == issue_id).first()
if not issue:
raise HTTPException(status_code=404, detail="Issue not found")
update_data = issue_update.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(issue, field, value)
db.commit()
db.refresh(issue)
return issue
@app.delete("/issues/{issue_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_issue(issue_id: int, db: Session = Depends(get_db)):
issue = db.query(models.Issue).filter(models.Issue.id == issue_id).first()
if not issue:
raise HTTPException(status_code=404, detail="Issue not found")
db.delete(issue)
db.commit()
return None
# ============ Comments API ============
@app.post("/comments", response_model=schemas.CommentResponse, status_code=status.HTTP_201_CREATED)
def create_comment(comment: schemas.CommentCreate, db: Session = Depends(get_db)):
db_comment = models.Comment(**comment.model_dump())
db.add(db_comment)
db.commit()
db.refresh(db_comment)
return db_comment
@app.get("/issues/{issue_id}/comments", response_model=List[schemas.CommentResponse])
def list_comments(issue_id: int, db: Session = Depends(get_db)):
comments = db.query(models.Comment).filter(models.Comment.issue_id == issue_id).all()
return comments
# ============ Projects API ============
@app.post("/projects", response_model=schemas.ProjectResponse, status_code=status.HTTP_201_CREATED)
def create_project(project: schemas.ProjectCreate, db: Session = Depends(get_db)):
db_project = models.Project(**project.model_dump())
db.add(db_project)
db.commit()
db.refresh(db_project)
return db_project
@app.get("/projects", response_model=List[schemas.ProjectResponse])
def list_projects(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
projects = db.query(models.Project).offset(skip).limit(limit).all()
return projects
@app.get("/projects/{project_id}", response_model=schemas.ProjectResponse)
def get_project(project_id: int, db: Session = Depends(get_db)):
project = db.query(models.Project).filter(models.Project.id == project_id).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
return project
# ============ Users API ============
@app.post("/users", response_model=schemas.UserResponse, status_code=status.HTTP_201_CREATED)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
# Check if username or email exists
existing = db.query(models.User).filter(
(models.User.username == user.username) | (models.User.email == user.email)
).first()
if existing:
raise HTTPException(status_code=400, detail="Username or email already exists")
# Hash password if provided
hashed_password = None
if user.password:
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
hashed_password = pwd_context.hash(user.password)
db_user = models.User(
username=user.username,
email=user.email,
full_name=user.full_name,
hashed_password=hashed_password,
is_admin=user.is_admin
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
@app.get("/users", response_model=List[schemas.UserResponse])
def list_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
users = db.query(models.User).offset(skip).limit(limit).all()
return users
@app.get("/users/{user_id}", response_model=schemas.UserResponse)
def get_user(user_id: int, db: Session = Depends(get_db)):
user = db.query(models.User).filter(models.User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
# Run database migration on startup
@app.on_event("startup")
def startup():
from app.core.config import Base, engine
Base.metadata.create_all(bind=engine)