feat: API key auth for agents (create/list/revoke) + dual auth (JWT or API key)

This commit is contained in:
Zhi
2026-02-22 09:05:05 +00:00
parent 1d5d8add3d
commit 1e9c6fd2f8
2 changed files with 98 additions and 0 deletions

View File

@@ -252,6 +252,7 @@ def get_user(user_id: int, db: Session = Depends(get_db)):
def startup():
from app.core.config import Base, engine
from app.models import webhook
from app.models import apikey
Base.metadata.create_all(bind=engine)
@@ -515,3 +516,85 @@ def delete_comment(comment_id: int, db: Session = Depends(get_db)):
db.delete(comment)
db.commit()
return None
# ============ API Key Auth ============
import secrets
from fastapi.security import APIKeyHeader
from app.models.apikey import APIKey
apikey_header = APIKeyHeader(name="X-API-Key", auto_error=False)
async def get_current_user_or_apikey(
token: str = Depends(oauth2_scheme),
api_key: str = Depends(apikey_header),
db: Session = Depends(get_db)
):
"""Authenticate via JWT token OR API key."""
# Try API key first
if api_key:
key_obj = db.query(APIKey).filter(APIKey.key == api_key, APIKey.is_active == True).first()
if key_obj:
key_obj.last_used_at = datetime.utcnow()
db.commit()
user = db.query(models.User).filter(models.User.id == key_obj.user_id).first()
if user:
return user
# Fall back to JWT
if token:
return await get_current_user(token=token, db=db)
raise HTTPException(status_code=401, detail="Not authenticated")
# ============ API Key Management ============
from pydantic import BaseModel as PydanticBaseModel
class APIKeyCreate(PydanticBaseModel):
name: str
user_id: int
class APIKeyResponse(PydanticBaseModel):
id: int
key: str
name: str
user_id: int
is_active: bool
created_at: datetime
last_used_at: datetime | None = None
class Config:
from_attributes = True
@app.post("/api-keys", response_model=APIKeyResponse, status_code=status.HTTP_201_CREATED)
def create_api_key(data: APIKeyCreate, db: Session = Depends(get_db)):
user = db.query(models.User).filter(models.User.id == data.user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
key = secrets.token_hex(32)
db_key = APIKey(key=key, name=data.name, user_id=data.user_id)
db.add(db_key)
db.commit()
db.refresh(db_key)
return db_key
@app.get("/api-keys", response_model=List[APIKeyResponse])
def list_api_keys(user_id: int = None, db: Session = Depends(get_db)):
query = db.query(APIKey)
if user_id:
query = query.filter(APIKey.user_id == user_id)
return query.all()
@app.delete("/api-keys/{key_id}", status_code=status.HTTP_204_NO_CONTENT)
def revoke_api_key(key_id: int, db: Session = Depends(get_db)):
key_obj = db.query(APIKey).filter(APIKey.id == key_id).first()
if not key_obj:
raise HTTPException(status_code=404, detail="API key not found")
key_obj.is_active = False
db.commit()
return None

15
app/models/apikey.py Normal file
View File

@@ -0,0 +1,15 @@
from sqlalchemy import Column, Integer, String, DateTime, Boolean, ForeignKey
from sqlalchemy.sql import func
from app.core.config import Base
class APIKey(Base):
__tablename__ = "api_keys"
id = Column(Integer, primary_key=True, index=True)
key = Column(String(64), unique=True, nullable=False, index=True)
name = Column(String(100), nullable=False) # e.g. "agent-zhi", "agent-lyn"
user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
last_used_at = Column(DateTime(timezone=True), nullable=True)