9 Commits

Author SHA1 Message Date
0bdc432215 Accept Tessera (Keycloak-compatible) OIDC tokens as API bearer
Adds an additive bearer-verification path: verify RS256 access tokens against
Tessera's JWKS (iss/aud/exp), map sub/preferred_username/email + roles
(realm_access.roles, resource_access.<audience>.roles) to the app's identity.
Existing auth (API keys / app JWTs / sessions) is unchanged. Issuer + audience
are env-configurable. Validated end-to-end against the local sim.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-02 15:11:31 +01:00
1a5a3ed1b1 Merge fix/security-audit: RBAC/API-key-hash/cookie hardening 2026-06-01 09:23:35 +01:00
16199c9280 Merge feat/knowledge-base: KnowledgeBase feature 2026-06-01 09:23:35 +01:00
3f5f813c65 fix(security): RBAC on legacy create endpoints, hashed API keys, hardening
Addresses findings from the security audit:
- H1: add check_project_role to the legacy misc.py create endpoints
  (milestones=mgr, tasks/supports/meetings=dev) that previously required
  only authentication — closing a cross-project write bypass available to
  any logged-in user or agent API key.
- M2: comments are always attributed to the authenticated caller; the
  client-supplied author_id is dropped (no author spoofing).
- M3: API keys are stored as SHA-256 hashes (key_hash) plus a short
  key_prefix for display — never plaintext. Lookup hashes the presented
  key; listings never expose the secret. Includes an idempotent migration
  for existing deployments.
- M5: the OIDC session cookie's Secure flag is env-driven via
  SESSION_COOKIE_SECURE (default True; set false for plain-HTTP dev).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 20:16:11 +01:00
9feff8e008 feat(knowledge-base): KnowledgeBase feature — models, CRUD API, RBAC
New entities mirroring the Project shape:
- knowledge_bases (human code, title, description, created_by, timestamps)
- knowledge_topics (UNIQUE(topic, knowledge_base_id))
- knowledge_categories (self-referential parent; UNIQUE(topic_id, parent, name),
  with an app-level check for the NULL-parent case MySQL can't enforce)
- knowledge_facts (category_id NULL → fact lives directly on the topic)
- project_knowledge_bases (M2M project ↔ knowledge base)

Adds full CRUD for KB/topic/category/fact, a nested /tree aggregate,
project link/unlink/list, KB-code generation (same algorithm as project
codes), and category cycle-prevention. Four global permissions
(knowledge-base.create/read/update/delete) seeded in init_bootstrap and
granted to admin/mgr/dev/general-agent/guest as appropriate. New tables
auto-create via Base.metadata.create_all; router wired in main.py.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 15:03:14 +01:00
h z
88779d2db0 Merge pull request 'fix(users): admin-gated /users routes accept api-key auth' (#23) from fix/users-require-admin-accept-apikey into main 2026-05-29 07:55:45 +00:00
cacb1d2652 fix(users): admin-gated /users routes accept api-key auth
Local `require_admin` in users.py depended on `get_current_user`, which
is OAuth2 JWT only. That made every admin-gated /users route (list, get,
patch update, bind-agent, etc.) reject api-key clients with 401 even when
the api-key resolves to an is_admin=True user.

Switch to `get_current_user_or_apikey` (the one in deps.py) so X-API-Key
and Bearer-as-apikey fallback both work. The admin gate itself still
reads User.is_admin — only the auth carrier broadens. Matches the auth
pattern schedule_type.py and other admin routes already use.

Surfaced when sherlock (agent-resource-director) tried `hf user list` for
the recruitment workflow Step 3 verify and got 401 "Could not validate
credentials" despite a valid provisioned api-key.
2026-05-29 08:55:28 +01:00
d2b83ad58d fix(projects): perm-gate create + apikey-via-Bearer + introspect with apikey
Three coupled fixes so non-admin agents (e.g. nav, role=mgr) can
actually create projects through hf-cli with their API key:

1. POST /projects no longer hardcodes is_admin. It checks the global
   `project.create` perm via role_permissions (admin still wins via
   is_admin short-circuit). Permission-denied 403 message names the
   exact perm.

2. /auth/me/permissions now uses get_current_user_or_apikey (was
   get_current_user JWT-only). This is what hf-cli hits to populate
   its local permission cache that drives the "not permitted" gate;
   previously every API-key-authed agent saw all commands as gated.

3. get_current_user_or_apikey now also accepts an API key delivered
   via Authorization: Bearer (in addition to X-API-Key). hf-cli only
   knows Bearer; trying to JWT-decode an API key string would fail —
   so on decode failure, fall through to the API key lookup. Keeps
   X-API-Key behavior unchanged.

4. init_bootstrap: add `project.create` to DEFAULT_PERMISSIONS and to
   _MGR_PERMISSIONS so admin (auto-all) + mgr both get it on seed.

Bug came to light when manager-agent reported `hf project list`/`create`
returned `not permitted`. Root cause: hf-cli calls /auth/me/permissions
with the API key via Bearer header → 401 → state.Known=false → every
command in the surface is gated false locally. Even after the local
gate, POST /projects would still 403 due to the hardcoded admin check.
All four steps above are required end-to-end.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-24 22:09:34 +01:00
01f6b562e1 fix(bootstrap): seed 3 perms used in code but missing from catalog
Audit cross-referenced every check_permission / _has_permission /
_has_global_permission / _require_calendar_permission call against
init_bootstrap.DEFAULT_PERMISSIONS. Three were enforced in code but
never seeded, so the Role Editor couldn't expose them:

  - member.remove        (projects.py:357 — remove project member)
  - schedule_type.read   (schedule_type.py + schedule_type_special_slot.py)
  - schedule_type.manage (schedule_type.py + schedule_type_special_slot.py)

Seed only — default roles are NOT modified (admin still gets everything
via the "None = all perms" rule). Operators can grant via Role Editor.

Other audit notes (not fixed in this commit, separate decisions):
- GET /projects + GET /projects/{id}/members are completely unauthed
  (no Depends(get_current_user_or_apikey)). Anonymous can list all
  projects. Investigate whether this is deliberate (e.g. for monitor
  external scrape) or an oversight.
- create_project hardcodes `if not current_user.is_admin: 403 "Only
  admins can create projects"` — doesn't consult permissions at all.
  Means manager-role users can't create projects even if they have
  project.write or hypothetical project.create. Consider switching
  to a perm-based gate.
- Several catalog perms (project.*, task.create/read/write/delete,
  milestone.*) are seeded but never checked in code; basic CRUD on
  task/project/milestone/comment is gated via the parallel
  check_project_role (viewer/member/dev/mgr ladder) instead.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-24 21:18:30 +01:00
16 changed files with 1559 additions and 40 deletions

View File

@@ -1,4 +1,6 @@
"""Shared auth dependencies."""
import hashlib
import logging
from datetime import datetime, timedelta
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, APIKeyHeader
@@ -11,6 +13,8 @@ from app.core.config import get_db, settings
from app.models import models
from app.models.apikey import APIKey
logger = logging.getLogger("harborforge.deps")
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token", auto_error=False)
apikey_header = APIKeyHeader(name="X-API-Key", auto_error=False)
@@ -59,22 +63,69 @@ async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = De
return user
def hash_api_key(raw: str) -> str:
"""SHA-256 of a raw API key. Keys are high-entropy random tokens, so a
fast hash (not bcrypt) is appropriate and allows O(1) lookup by hash."""
return hashlib.sha256(raw.encode()).hexdigest()
def _lookup_api_key(db: Session, key: str) -> models.User | None:
"""Resolve an API key string to a User; mark last_used_at on hit."""
if not key:
return None
key_obj = db.query(APIKey).filter(APIKey.key_hash == hash_api_key(key), APIKey.is_active == True).first() # noqa: E712
if not key_obj:
return None
key_obj.last_used_at = datetime.utcnow()
db.commit()
return db.query(models.User).filter(models.User.id == key_obj.user_id).first()
async def get_current_user_or_apikey(
token: str = Depends(oauth2_scheme),
api_key: str = Depends(apikey_header),
db: Session = Depends(get_db)
):
"""Authenticate via JWT token OR API key."""
"""Authenticate via JWT token (Authorization: Bearer <jwt>) OR API key
(X-API-Key: <key>, OR — as a convenience for CLI clients that only know
Bearer — Authorization: Bearer <api-key>; falls back when JWT decode fails).
Bearer tokens are tried in order: local HS256 JWT → external Tessera
(OIDC) RS256 access token → API key. The Tessera path is purely additive
and never affects local-JWT/API-key callers.
"""
# Native X-API-Key header
if api_key:
key_obj = db.query(APIKey).filter(APIKey.key == api_key, APIKey.is_active == True).first()
if key_obj:
key_obj.last_used_at = datetime.utcnow()
db.commit()
user = db.query(models.User).filter(models.User.id == key_obj.user_id).first()
if user:
return user
user = _lookup_api_key(db, api_key)
if user:
return user
# Bearer header — local JWT first, then Tessera, then API key.
if token:
return await get_current_user(token=token, db=db)
try:
return await get_current_user(token=token, db=db)
except HTTPException:
pass
# External Tessera (OIDC) RS256 access token.
try:
from app.api.tessera import authenticate_tessera
return authenticate_tessera(db, token)
except HTTPException:
pass
except Exception: # JWKS fetch / unexpected verifier error → don't 500
logger.warning("Tessera token verification error", exc_info=True)
# Bearer-carried API key (CLI convenience).
user = _lookup_api_key(db, token)
if user:
return user
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
raise HTTPException(status_code=401, detail="Not authenticated")

View File

@@ -11,7 +11,7 @@ from app.core.config import get_db, settings
from app.models import models
from app.models.role_permission import Permission, Role, RolePermission
from app.schemas import schemas
from app.api.deps import Token, verify_password, create_access_token, get_current_user
from app.api.deps import Token, verify_password, create_access_token, get_current_user, get_current_user_or_apikey
router = APIRouter(prefix="/auth", tags=["Auth"])
@@ -80,7 +80,7 @@ class PermissionIntrospectionResponse(BaseModel):
@router.get("/me/permissions", response_model=PermissionIntrospectionResponse)
async def get_my_permissions(
current_user: models.User = Depends(get_current_user),
current_user: models.User = Depends(get_current_user_or_apikey),
db: Session = Depends(get_db),
):
"""Return the current user's effective permissions for CLI help introspection."""

View File

@@ -32,8 +32,12 @@ def create_comment(comment: schemas.CommentCreate, db: Session = Depends(get_db)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
check_project_role(db, current_user.id, task.project_id, min_role="viewer")
db_comment = models.Comment(**comment.model_dump())
# Always attribute the comment to the authenticated caller — never trust
# a client-supplied author_id (prevents author spoofing).
data = comment.model_dump()
data.pop("author_id", None)
db_comment = models.Comment(**data, author_id=current_user.id)
db.add(db_comment)
db.commit()
db.refresh(db_comment)

View File

@@ -0,0 +1,735 @@
"""Knowledge Base router with global-permission RBAC.
Permissions (global, granted via the Role Editor; admins auto-pass):
knowledge-base.create create a knowledge base
knowledge-base.read read any knowledge base / topic / category / fact
knowledge-base.update edit a KB and its topic/category/fact structure,
and link/unlink knowledge bases to projects
knowledge-base.delete delete a knowledge base
There is no per-KB membership model (unlike projects) — access is purely by
the four global permissions above.
"""
import re
from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app.core.config import get_db
from app.models import models
from app.models import knowledge as kb_models
from app.schemas import knowledge as kb_schemas
from app.api.deps import get_current_user_or_apikey
router = APIRouter(tags=["KnowledgeBase"])
PERM_CREATE = "knowledge-base.create"
PERM_READ = "knowledge-base.read"
PERM_UPDATE = "knowledge-base.update"
PERM_DELETE = "knowledge-base.delete"
# ---------------------------------------------------------------------------
# Permission helper (global perms only)
# ---------------------------------------------------------------------------
def _require_perm(db: Session, user: models.User, perm_name: str) -> None:
if user.is_admin:
return
from app.models.role_permission import Permission, RolePermission
has = (
db.query(Permission.id)
.join(RolePermission, RolePermission.permission_id == Permission.id)
.filter(
RolePermission.role_id == user.role_id,
Permission.name == perm_name,
)
.first()
if user.role_id
else None
)
if not has:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Permission denied: {perm_name} required",
)
# ---------------------------------------------------------------------------
# Knowledge-base code generation (same rules as project_code)
# ---------------------------------------------------------------------------
WORD_SEGMENT_RE = re.compile(r"[A-Za-z]+")
CAMEL_RE = re.compile(r"[A-Z]+(?=[A-Z][a-z])|[A-Z]?[a-z]+|[A-Z]+")
def _split_words(name: str):
segments = WORD_SEGMENT_RE.findall(name or "")
words = []
for seg in segments:
for part in CAMEL_RE.findall(seg):
if part.isupper() and len(part) > 1:
words.extend(list(part))
else:
words.append(part)
return words
def _code_exists(db: Session, code: str) -> bool:
return (
db.query(kb_models.KnowledgeBase)
.filter(kb_models.KnowledgeBase.knowledge_base_code == code)
.first()
is not None
)
def _next_counter(db: Session, prefix: str, width: int) -> str:
if width <= 0:
return ""
counter = (
db.query(kb_models.KnowledgeBaseCodeCounter)
.filter(kb_models.KnowledgeBaseCodeCounter.prefix == prefix)
.first()
)
if not counter:
counter = kb_models.KnowledgeBaseCodeCounter(prefix=prefix, next_value=0)
db.add(counter)
db.flush()
value = counter.next_value
counter.next_value += 1
db.flush()
return format(value, "x").upper().zfill(width)
def _generate_with_counter(db: Session, prefix: str, width: int) -> str:
while True:
suffix = _next_counter(db, prefix, width)
code = (prefix + suffix).upper()
if not _code_exists(db, code):
return code
def _generate_kb_code(db: Session, title: str) -> str:
words = _split_words(title)
if not words:
return _generate_with_counter(db, "UN", 4)
if len(words) == 1:
letters = "".join(c for c in words[0] if c.isalpha()).upper()
if not letters:
return _generate_with_counter(db, "UN", 4)
if len(letters) >= 6:
code = letters[:6]
if _code_exists(db, code):
return _generate_with_counter(db, letters[:2], 4)
return code
prefix = letters
return _generate_with_counter(db, prefix, 6 - len(prefix))
total_letters = sum(len(w) for w in words)
if len(words) > 6:
code = "".join(w[0] for w in words[:6]).upper()
if _code_exists(db, code):
return _generate_with_counter(db, code[:2], 4)
return code
if total_letters < 6:
prefix = "".join(words).upper()
return _generate_with_counter(db, prefix, 6 - len(prefix))
if total_letters == 6:
code = "".join(words).upper()
if _code_exists(db, code):
return _generate_with_counter(db, code[:2], 4)
return code
# total_letters > 6: initials, then fill from a counter on collision
code = "".join(w[0] for w in words).upper()[:6]
if not _code_exists(db, code):
return code
return _generate_with_counter(db, code[:2], 4)
# ---------------------------------------------------------------------------
# Resolvers
# ---------------------------------------------------------------------------
def _resolve_kb(db: Session, identifier: str) -> kb_models.KnowledgeBase:
kb = None
try:
kb = db.query(kb_models.KnowledgeBase).filter(kb_models.KnowledgeBase.id == int(identifier)).first()
except (ValueError, TypeError):
kb = (
db.query(kb_models.KnowledgeBase)
.filter(kb_models.KnowledgeBase.knowledge_base_code == str(identifier))
.first()
)
if not kb:
raise HTTPException(status_code=404, detail="Knowledge base not found")
return kb
def _resolve_project(db: Session, identifier: str) -> models.Project:
project = None
try:
project = db.query(models.Project).filter(models.Project.id == int(identifier)).first()
except (ValueError, TypeError):
project = db.query(models.Project).filter(models.Project.project_code == str(identifier)).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
return project
def _get_topic(db: Session, topic_id: int) -> kb_models.KnowledgeTopic:
topic = db.query(kb_models.KnowledgeTopic).filter(kb_models.KnowledgeTopic.id == topic_id).first()
if not topic:
raise HTTPException(status_code=404, detail="Topic not found")
return topic
def _get_category(db: Session, category_id: int) -> kb_models.KnowledgeCategory:
cat = db.query(kb_models.KnowledgeCategory).filter(kb_models.KnowledgeCategory.id == category_id).first()
if not cat:
raise HTTPException(status_code=404, detail="Category not found")
return cat
def _descendant_category_ids(db: Session, category_id: int) -> List[int]:
"""Return [category_id, ...all nested descendants] (deepest last)."""
collected = [category_id]
frontier = [category_id]
while frontier:
children = (
db.query(kb_models.KnowledgeCategory.id)
.filter(kb_models.KnowledgeCategory.parent.in_(frontier))
.all()
)
child_ids = [c.id for c in children]
if not child_ids:
break
collected.extend(child_ids)
frontier = child_ids
return collected
# ===========================================================================
# Knowledge Base CRUD
# ===========================================================================
@router.post("/knowledge-bases", response_model=kb_schemas.KnowledgeBaseResponse, status_code=status.HTTP_201_CREATED)
def create_knowledge_base(
payload: kb_schemas.KnowledgeBaseCreate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
_require_perm(db, current_user, PERM_CREATE)
kb = kb_models.KnowledgeBase(
title=payload.title,
description=payload.description,
created_by=current_user.id,
knowledge_base_code=_generate_kb_code(db, payload.title),
)
db.add(kb)
db.commit()
db.refresh(kb)
return kb
@router.get("/knowledge-bases", response_model=List[kb_schemas.KnowledgeBaseResponse])
def list_knowledge_bases(
skip: int = 0,
limit: int = 100,
project: Optional[str] = None,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
_require_perm(db, current_user, PERM_READ)
q = db.query(kb_models.KnowledgeBase)
if project is not None:
proj = _resolve_project(db, project)
linked_ids = [
row.knowledge_base_id
for row in db.query(kb_models.ProjectKnowledgeBase.knowledge_base_id)
.filter(kb_models.ProjectKnowledgeBase.project_id == proj.id)
.all()
]
if not linked_ids:
return []
q = q.filter(kb_models.KnowledgeBase.id.in_(linked_ids))
return q.order_by(kb_models.KnowledgeBase.id).offset(skip).limit(limit).all()
@router.get("/knowledge-bases/{kb_id}", response_model=kb_schemas.KnowledgeBaseResponse)
def get_knowledge_base(
kb_id: str,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
_require_perm(db, current_user, PERM_READ)
return _resolve_kb(db, kb_id)
@router.patch("/knowledge-bases/{kb_id}", response_model=kb_schemas.KnowledgeBaseResponse)
def update_knowledge_base(
kb_id: str,
payload: kb_schemas.KnowledgeBaseUpdate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
_require_perm(db, current_user, PERM_UPDATE)
kb = _resolve_kb(db, kb_id)
data = payload.model_dump(exclude_unset=True)
for field, value in data.items():
setattr(kb, field, value)
db.commit()
db.refresh(kb)
return kb
@router.delete("/knowledge-bases/{kb_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_knowledge_base(
kb_id: str,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
_require_perm(db, current_user, PERM_DELETE)
kb = _resolve_kb(db, kb_id)
topic_ids = [
t.id
for t in db.query(kb_models.KnowledgeTopic.id)
.filter(kb_models.KnowledgeTopic.knowledge_base_id == kb.id)
.all()
]
if topic_ids:
db.query(kb_models.KnowledgeFact).filter(kb_models.KnowledgeFact.topic_id.in_(topic_ids)).delete(synchronize_session=False)
db.query(kb_models.KnowledgeCategory).filter(kb_models.KnowledgeCategory.topic_id.in_(topic_ids)).delete(synchronize_session=False)
db.query(kb_models.KnowledgeTopic).filter(kb_models.KnowledgeTopic.id.in_(topic_ids)).delete(synchronize_session=False)
db.query(kb_models.ProjectKnowledgeBase).filter(kb_models.ProjectKnowledgeBase.knowledge_base_id == kb.id).delete(synchronize_session=False)
db.delete(kb)
db.commit()
return None
# ===========================================================================
# Tree (read-only aggregate)
# ===========================================================================
@router.get("/knowledge-bases/{kb_id}/tree", response_model=kb_schemas.KnowledgeBaseTree)
def get_knowledge_base_tree(
kb_id: str,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
_require_perm(db, current_user, PERM_READ)
kb = _resolve_kb(db, kb_id)
topics = db.query(kb_models.KnowledgeTopic).filter(kb_models.KnowledgeTopic.knowledge_base_id == kb.id).all()
topic_ids = [t.id for t in topics]
cats = (
db.query(kb_models.KnowledgeCategory).filter(kb_models.KnowledgeCategory.topic_id.in_(topic_ids)).all()
if topic_ids else []
)
facts = (
db.query(kb_models.KnowledgeFact).filter(kb_models.KnowledgeFact.topic_id.in_(topic_ids)).all()
if topic_ids else []
)
# Index facts by (topic_id, category_id) and categories by (topic_id, parent)
facts_by_cat: dict = {}
facts_topic_direct: dict = {}
for f in facts:
fr = kb_schemas.KnowledgeFactResponse.model_validate(f)
if f.category_id is None:
facts_topic_direct.setdefault(f.topic_id, []).append(fr)
else:
facts_by_cat.setdefault(f.category_id, []).append(fr)
cats_by_parent: dict = {}
for c in cats:
cats_by_parent.setdefault((c.topic_id, c.parent), []).append(c)
def build_category(cat) -> kb_schemas.CategoryTreeNode:
children = cats_by_parent.get((cat.topic_id, cat.id), [])
return kb_schemas.CategoryTreeNode(
id=cat.id,
name=cat.name,
parent=cat.parent,
topic_id=cat.topic_id,
description=cat.description,
categories=[build_category(ch) for ch in children],
facts=facts_by_cat.get(cat.id, []),
)
topic_nodes = []
for t in topics:
top_level_cats = cats_by_parent.get((t.id, None), [])
topic_nodes.append(
kb_schemas.TopicTreeNode(
id=t.id,
topic=t.topic,
knowledge_base_id=t.knowledge_base_id,
description=t.description,
categories=[build_category(c) for c in top_level_cats],
facts=facts_topic_direct.get(t.id, []),
)
)
return kb_schemas.KnowledgeBaseTree(
id=kb.id,
knowledge_base_code=kb.knowledge_base_code,
title=kb.title,
description=kb.description,
topics=topic_nodes,
)
# ===========================================================================
# Topics
# ===========================================================================
@router.get("/knowledge-bases/{kb_id}/topics", response_model=List[kb_schemas.KnowledgeTopicResponse])
def list_topics(
kb_id: str,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
_require_perm(db, current_user, PERM_READ)
kb = _resolve_kb(db, kb_id)
return db.query(kb_models.KnowledgeTopic).filter(kb_models.KnowledgeTopic.knowledge_base_id == kb.id).all()
@router.post("/knowledge-bases/{kb_id}/topics", response_model=kb_schemas.KnowledgeTopicResponse, status_code=status.HTTP_201_CREATED)
def create_topic(
kb_id: str,
payload: kb_schemas.KnowledgeTopicCreate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
_require_perm(db, current_user, PERM_UPDATE)
kb = _resolve_kb(db, kb_id)
existing = (
db.query(kb_models.KnowledgeTopic)
.filter(
kb_models.KnowledgeTopic.knowledge_base_id == kb.id,
kb_models.KnowledgeTopic.topic == payload.topic,
)
.first()
)
if existing:
raise HTTPException(status_code=400, detail="A topic with this name already exists in this knowledge base")
topic = kb_models.KnowledgeTopic(
topic=payload.topic,
description=payload.description,
knowledge_base_id=kb.id,
created_by=current_user.id,
)
db.add(topic)
db.commit()
db.refresh(topic)
return topic
@router.get("/knowledge-topics/{topic_id}", response_model=kb_schemas.KnowledgeTopicResponse)
def get_topic(
topic_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
_require_perm(db, current_user, PERM_READ)
return _get_topic(db, topic_id)
@router.patch("/knowledge-topics/{topic_id}", response_model=kb_schemas.KnowledgeTopicResponse)
def update_topic(
topic_id: int,
payload: kb_schemas.KnowledgeTopicUpdate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
_require_perm(db, current_user, PERM_UPDATE)
topic = _get_topic(db, topic_id)
data = payload.model_dump(exclude_unset=True)
if "topic" in data and data["topic"] and data["topic"] != topic.topic:
clash = (
db.query(kb_models.KnowledgeTopic)
.filter(
kb_models.KnowledgeTopic.knowledge_base_id == topic.knowledge_base_id,
kb_models.KnowledgeTopic.topic == data["topic"],
kb_models.KnowledgeTopic.id != topic.id,
)
.first()
)
if clash:
raise HTTPException(status_code=400, detail="A topic with this name already exists in this knowledge base")
for field, value in data.items():
setattr(topic, field, value)
db.commit()
db.refresh(topic)
return topic
@router.delete("/knowledge-topics/{topic_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_topic(
topic_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
_require_perm(db, current_user, PERM_UPDATE)
topic = _get_topic(db, topic_id)
db.query(kb_models.KnowledgeFact).filter(kb_models.KnowledgeFact.topic_id == topic.id).delete(synchronize_session=False)
db.query(kb_models.KnowledgeCategory).filter(kb_models.KnowledgeCategory.topic_id == topic.id).delete(synchronize_session=False)
db.delete(topic)
db.commit()
return None
# ===========================================================================
# Categories
# ===========================================================================
def _check_category_unique(db: Session, topic_id: int, parent: Optional[int], name: str, exclude_id: Optional[int] = None):
q = db.query(kb_models.KnowledgeCategory).filter(
kb_models.KnowledgeCategory.topic_id == topic_id,
kb_models.KnowledgeCategory.name == name,
)
if parent is None:
q = q.filter(kb_models.KnowledgeCategory.parent.is_(None))
else:
q = q.filter(kb_models.KnowledgeCategory.parent == parent)
if exclude_id is not None:
q = q.filter(kb_models.KnowledgeCategory.id != exclude_id)
if q.first():
raise HTTPException(status_code=400, detail="A category with this name already exists under the same parent")
@router.get("/knowledge-topics/{topic_id}/categories", response_model=List[kb_schemas.KnowledgeCategoryResponse])
def list_categories(
topic_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
_require_perm(db, current_user, PERM_READ)
_get_topic(db, topic_id)
return db.query(kb_models.KnowledgeCategory).filter(kb_models.KnowledgeCategory.topic_id == topic_id).all()
@router.post("/knowledge-categories", response_model=kb_schemas.KnowledgeCategoryResponse, status_code=status.HTTP_201_CREATED)
def create_category(
payload: kb_schemas.KnowledgeCategoryCreate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
_require_perm(db, current_user, PERM_UPDATE)
_get_topic(db, payload.topic_id)
if payload.parent is not None:
parent_cat = _get_category(db, payload.parent)
if parent_cat.topic_id != payload.topic_id:
raise HTTPException(status_code=400, detail="Parent category belongs to a different topic")
_check_category_unique(db, payload.topic_id, payload.parent, payload.name)
cat = kb_models.KnowledgeCategory(
name=payload.name,
description=payload.description,
parent=payload.parent,
topic_id=payload.topic_id,
created_by=current_user.id,
last_updated_by=current_user.id,
)
db.add(cat)
db.commit()
db.refresh(cat)
return cat
@router.get("/knowledge-categories/{category_id}", response_model=kb_schemas.KnowledgeCategoryResponse)
def get_category(
category_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
_require_perm(db, current_user, PERM_READ)
return _get_category(db, category_id)
@router.patch("/knowledge-categories/{category_id}", response_model=kb_schemas.KnowledgeCategoryResponse)
def update_category(
category_id: int,
payload: kb_schemas.KnowledgeCategoryUpdate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
_require_perm(db, current_user, PERM_UPDATE)
cat = _get_category(db, category_id)
data = payload.model_dump(exclude_unset=True)
new_parent = data.get("parent", cat.parent) if "parent" in data else cat.parent
if "parent" in data and data["parent"] is not None:
if data["parent"] == cat.id:
raise HTTPException(status_code=400, detail="A category cannot be its own parent")
parent_cat = _get_category(db, data["parent"])
if parent_cat.topic_id != cat.topic_id:
raise HTTPException(status_code=400, detail="Parent category belongs to a different topic")
# Prevent cycles: new parent must not be a descendant of this category
if data["parent"] in _descendant_category_ids(db, cat.id):
raise HTTPException(status_code=400, detail="Cannot move a category under one of its own descendants")
new_name = data.get("name", cat.name)
if ("name" in data and data["name"] != cat.name) or ("parent" in data and new_parent != cat.parent):
_check_category_unique(db, cat.topic_id, new_parent, new_name, exclude_id=cat.id)
for field, value in data.items():
setattr(cat, field, value)
cat.last_updated_by = current_user.id
db.commit()
db.refresh(cat)
return cat
@router.delete("/knowledge-categories/{category_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_category(
category_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
_require_perm(db, current_user, PERM_UPDATE)
cat = _get_category(db, category_id)
ids = _descendant_category_ids(db, cat.id)
db.query(kb_models.KnowledgeFact).filter(kb_models.KnowledgeFact.category_id.in_(ids)).delete(synchronize_session=False)
db.query(kb_models.KnowledgeCategory).filter(kb_models.KnowledgeCategory.id.in_(ids)).delete(synchronize_session=False)
db.commit()
return None
# ===========================================================================
# Facts
# ===========================================================================
@router.post("/knowledge-facts", response_model=kb_schemas.KnowledgeFactResponse, status_code=status.HTTP_201_CREATED)
def create_fact(
payload: kb_schemas.KnowledgeFactCreate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
_require_perm(db, current_user, PERM_UPDATE)
_get_topic(db, payload.topic_id)
if payload.category_id is not None:
cat = _get_category(db, payload.category_id)
if cat.topic_id != payload.topic_id:
raise HTTPException(status_code=400, detail="Category belongs to a different topic")
fact = kb_models.KnowledgeFact(
fact=payload.fact,
topic_id=payload.topic_id,
category_id=payload.category_id,
)
db.add(fact)
db.commit()
db.refresh(fact)
return fact
@router.get("/knowledge-facts/{fact_id}", response_model=kb_schemas.KnowledgeFactResponse)
def get_fact(
fact_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
_require_perm(db, current_user, PERM_READ)
fact = db.query(kb_models.KnowledgeFact).filter(kb_models.KnowledgeFact.id == fact_id).first()
if not fact:
raise HTTPException(status_code=404, detail="Fact not found")
return fact
@router.patch("/knowledge-facts/{fact_id}", response_model=kb_schemas.KnowledgeFactResponse)
def update_fact(
fact_id: int,
payload: kb_schemas.KnowledgeFactUpdate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
_require_perm(db, current_user, PERM_UPDATE)
fact = db.query(kb_models.KnowledgeFact).filter(kb_models.KnowledgeFact.id == fact_id).first()
if not fact:
raise HTTPException(status_code=404, detail="Fact not found")
data = payload.model_dump(exclude_unset=True)
if "category_id" in data and data["category_id"] is not None:
cat = _get_category(db, data["category_id"])
if cat.topic_id != fact.topic_id:
raise HTTPException(status_code=400, detail="Category belongs to a different topic")
for field, value in data.items():
setattr(fact, field, value)
db.commit()
db.refresh(fact)
return fact
@router.delete("/knowledge-facts/{fact_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_fact(
fact_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
_require_perm(db, current_user, PERM_UPDATE)
fact = db.query(kb_models.KnowledgeFact).filter(kb_models.KnowledgeFact.id == fact_id).first()
if not fact:
raise HTTPException(status_code=404, detail="Fact not found")
db.delete(fact)
db.commit()
return None
# ===========================================================================
# Project <-> KnowledgeBase links
# ===========================================================================
@router.get("/projects/{project_id}/knowledge-bases", response_model=List[kb_schemas.KnowledgeBaseResponse])
def list_project_knowledge_bases(
project_id: str,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
_require_perm(db, current_user, PERM_READ)
project = _resolve_project(db, project_id)
linked_ids = [
row.knowledge_base_id
for row in db.query(kb_models.ProjectKnowledgeBase.knowledge_base_id)
.filter(kb_models.ProjectKnowledgeBase.project_id == project.id)
.all()
]
if not linked_ids:
return []
return db.query(kb_models.KnowledgeBase).filter(kb_models.KnowledgeBase.id.in_(linked_ids)).all()
@router.post("/projects/{project_id}/knowledge-bases", response_model=kb_schemas.KnowledgeBaseResponse, status_code=status.HTTP_201_CREATED)
def link_knowledge_base_to_project(
project_id: str,
payload: kb_schemas.ProjectKnowledgeBaseLink,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
_require_perm(db, current_user, PERM_UPDATE)
project = _resolve_project(db, project_id)
kb = _resolve_kb(db, payload.knowledge_base)
existing = (
db.query(kb_models.ProjectKnowledgeBase)
.filter(
kb_models.ProjectKnowledgeBase.project_id == project.id,
kb_models.ProjectKnowledgeBase.knowledge_base_id == kb.id,
)
.first()
)
if not existing:
db.add(kb_models.ProjectKnowledgeBase(project_id=project.id, knowledge_base_id=kb.id))
db.commit()
return kb
@router.delete("/projects/{project_id}/knowledge-bases/{kb_id}", status_code=status.HTTP_204_NO_CONTENT)
def unlink_knowledge_base_from_project(
project_id: str,
kb_id: str,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
_require_perm(db, current_user, PERM_UPDATE)
project = _resolve_project(db, project_id)
kb = _resolve_kb(db, kb_id)
db.query(kb_models.ProjectKnowledgeBase).filter(
kb_models.ProjectKnowledgeBase.project_id == project.id,
kb_models.ProjectKnowledgeBase.knowledge_base_id == kb.id,
).delete(synchronize_session=False)
db.commit()
return None

View File

@@ -12,7 +12,7 @@ from sqlalchemy import func as sqlfunc
from pydantic import BaseModel
from app.core.config import get_db
from app.api.deps import get_current_user_or_apikey, require_admin
from app.api.deps import get_current_user_or_apikey, require_admin, hash_api_key
from app.api.rbac import check_project_role, ensure_can_edit_milestone
from app.models import models
from app.models.apikey import APIKey
@@ -49,7 +49,8 @@ class APIKeyCreate(BaseModel):
class APIKeyResponse(BaseModel):
id: int
key: str
key: str | None = None # full secret — only populated on create/reset
key_prefix: str | None = None # masked display for listings
name: str
user_id: int
is_active: bool
@@ -66,11 +67,16 @@ def create_api_key(data: APIKeyCreate, db: Session = Depends(get_db),
if not user:
raise HTTPException(status_code=404, detail="User not found")
key = secrets.token_hex(32)
db_key = APIKey(key=key, name=data.name, user_id=data.user_id)
db_key = APIKey(key_hash=hash_api_key(key), key_prefix=key[:8], name=data.name, user_id=data.user_id)
db.add(db_key)
db.commit()
db.refresh(db_key)
return db_key
# Return the raw key once (it is never stored or shown again).
return {
"id": db_key.id, "key": key, "key_prefix": db_key.key_prefix,
"name": db_key.name, "user_id": db_key.user_id, "is_active": db_key.is_active,
"created_at": db_key.created_at, "last_used_at": db_key.last_used_at,
}
@router.get("/api-keys", response_model=List[APIKeyResponse], tags=["API Keys"])
@@ -80,11 +86,14 @@ def list_api_keys(user_id: int = None, db: Session = Depends(get_db),
if user_id:
query = query.filter(APIKey.user_id == user_id)
keys = query.all()
# Never expose the full secret on listing; show only a masked prefix.
for k in keys:
if k.key and len(k.key) > 8:
k.key = k.key[:6] + "" + k.key[-2:]
return keys
# Never expose the secret on listing — the raw key isn't stored. Show only
# the masked prefix.
return [{
"id": k.id, "key": None,
"key_prefix": (k.key_prefix + "") if k.key_prefix else None,
"name": k.name, "user_id": k.user_id, "is_active": k.is_active,
"created_at": k.created_at, "last_used_at": k.last_used_at,
} for k in keys]
@router.delete("/api-keys/{key_id}", status_code=status.HTTP_204_NO_CONTENT, tags=["API Keys"])
@@ -132,7 +141,10 @@ def list_activity(entity_type: str = None, entity_id: int = None, user_id: int =
def create_milestone(ms: schemas.MilestoneCreate, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)):
import json
project = db.query(models.Project).filter(models.Project.id == ms.project_id).first()
project_code = project.project_code if project and project.project_code else f"P{ms.project_id}"
if not project:
raise HTTPException(status_code=404, detail="Project not found")
check_project_role(db, current_user.id, project.id, min_role="mgr")
project_code = project.project_code if project.project_code else f"P{ms.project_id}"
max_ms = db.query(MilestoneModel).filter(MilestoneModel.project_id == ms.project_id).order_by(MilestoneModel.id.desc()).first()
next_num = (max_ms.id + 1) if max_ms else 1
@@ -488,14 +500,15 @@ def create_milestone_task(project_code: str, milestone_id: str, task_data: dict,
project = db.query(models.Project).filter(models.Project.project_code == project_code).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
check_project_role(db, current_user.id, project.id, min_role="dev")
ms = db.query(MilestoneModel).filter(MilestoneModel.milestone_code == milestone_id, MilestoneModel.project_id == project.id).first()
if not ms:
raise HTTPException(status_code=404, detail="Milestone not found")
if ms.status and hasattr(ms.status, "value") and ms.status.value == "undergoing":
raise HTTPException(status_code=400, detail="Cannot add items to a milestone that is undergoing")
milestone_code = ms.milestone_code or f"m{ms.id}"
max_task = db.query(Task).filter(Task.milestone_id == ms.id).order_by(Task.id.desc()).first()
next_num = (max_task.id + 1) if max_task else 1
@@ -622,6 +635,7 @@ def create_support(project_code: str, milestone_id: str, support_data: dict, db:
project = db.query(models.Project).filter(models.Project.project_code == project_code).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
check_project_role(db, current_user.id, project.id, min_role="dev")
ms = db.query(MilestoneModel).filter(MilestoneModel.milestone_code == milestone_id, MilestoneModel.project_id == project.id).first()
if not ms:
@@ -768,14 +782,15 @@ def create_meeting(project_code: str, milestone_id: str, meeting_data: dict, db:
project = db.query(models.Project).filter(models.Project.project_code == project_code).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
check_project_role(db, current_user.id, project.id, min_role="dev")
ms = db.query(MilestoneModel).filter(MilestoneModel.milestone_code == milestone_id, MilestoneModel.project_id == project.id).first()
if not ms:
raise HTTPException(status_code=404, detail="Milestone not found")
if ms.status and hasattr(ms.status, "value") and ms.status.value == "undergoing":
raise HTTPException(status_code=400, detail="Cannot add items to a milestone that is undergoing")
milestone_code = ms.milestone_code or f"m{ms.id}"
max_meeting = db.query(Meeting).filter(Meeting.milestone_id == ms.id).order_by(Meeting.id.desc()).first()
next_num = (max_meeting.id + 1) if max_meeting else 1

View File

@@ -153,9 +153,27 @@ def _generate_project_code(db, name: str) -> str:
@router.post("", response_model=schemas.ProjectResponse, status_code=status.HTTP_201_CREATED)
def create_project(project: schemas.ProjectCreate, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)):
# Check if user is admin
# Project creation is gated by the `project.create` global permission
# (admin auto-grants by virtue of is_admin). Any role granted that perm
# via the Role Editor can create projects.
if not current_user.is_admin:
raise HTTPException(status_code=403, detail="Only admins can create projects")
from app.models.role_permission import Permission, RolePermission
has = (
db.query(Permission.id)
.join(RolePermission, RolePermission.permission_id == Permission.id)
.filter(
RolePermission.role_id == current_user.role_id,
Permission.name == "project.create",
)
.first()
if current_user.role_id
else None
)
if not has:
raise HTTPException(
status_code=403,
detail="Permission denied: project.create required",
)
# Auto-fill owner_name from owner_id
user = db.query(models.User).filter(models.User.id == project.owner_id).first()
if not user:

View File

@@ -7,7 +7,7 @@ from pydantic import BaseModel
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session
from app.api.deps import get_current_user, get_current_user_or_apikey, get_password_hash
from app.api.deps import get_current_user, get_current_user_or_apikey, get_password_hash, hash_api_key
from app.core.config import get_db, settings
from app.init_bootstrap import DELETED_USER_USERNAME
from app.models import models
@@ -39,7 +39,11 @@ def _user_response(user: models.User) -> dict:
return data
def require_admin(current_user: models.User = Depends(get_current_user)):
def require_admin(current_user: models.User = Depends(get_current_user_or_apikey)):
# Accept either OAuth2 JWT or X-API-Key (incl. Bearer-as-apikey fallback)
# so CLI clients using their provisioned api-key can hit admin-gated user
# routes (list / get / update / patch). The admin gate still reads
# User.is_admin — only the auth carrier broadens.
if not current_user.is_admin:
raise HTTPException(status_code=403, detail="Admin required")
return current_user
@@ -460,9 +464,10 @@ def reset_user_apikey(
existing_key.is_active = False
db.flush()
# Create new key
# Create new key (store only the hash + a display prefix)
new_key = APIKey(
key=new_key_value,
key_hash=hash_api_key(new_key_value),
key_prefix=new_key_value[:8],
name=f"{target_user.username}-key",
user_id=target_user.id,
is_active=True,

231
app/api/tessera.py Normal file
View File

@@ -0,0 +1,231 @@
"""Tessera (external OIDC, Keycloak-compatible) access-token verification.
Accepts RS256 access tokens issued by the configured Tessera realm as API
bearer tokens. This is ADDITIVE to the existing local HS256 JWT and API-key
auth — see app/api/deps.get_current_user_or_apikey.
Verification:
* fetch + cache the realm JWKS ({issuer}/protocol/openid-connect/certs),
* select the JWK by the token header `kid`,
* verify the RS256 signature, `iss == TESSERA_ISSUER`,
`aud` contains TESSERA_AUDIENCE, and require `exp`/`iat`.
Verified claims are mapped to / provision an hf models.User, mirroring the
OIDC login callback provisioning (app/api/routers/oidc.py).
"""
import logging
import threading
import time
import requests
from fastapi import HTTPException
from jose import jwt
from jose.exceptions import JWTError
from sqlalchemy.orm import Session
from app.core.config import settings
from app.models import models
from app.models.role_permission import Role
logger = logging.getLogger("harborforge.tessera")
# JWKS cache: refetched when a token's kid is unknown, and at most once per
# _JWKS_TTL otherwise (so rotated/revoked keys are picked up within the TTL).
_JWKS_TTL = 3600
_jwks_lock = threading.Lock()
_jwks: dict | None = None
_jwks_fetched_at: float = 0.0
def _jwks_url() -> str:
return settings.TESSERA_ISSUER.rstrip("/") + "/protocol/openid-connect/certs"
def _fetch_jwks() -> dict:
resp = requests.get(_jwks_url(), timeout=5)
resp.raise_for_status()
return resp.json()
def _get_jwks(force: bool = False) -> dict:
global _jwks, _jwks_fetched_at
with _jwks_lock:
now = time.time()
if force or _jwks is None or (now - _jwks_fetched_at) > _JWKS_TTL:
_jwks = _fetch_jwks()
_jwks_fetched_at = now
return _jwks
def _key_for_kid(kid: str | None) -> dict | None:
keys = (_get_jwks() or {}).get("keys") or []
for k in keys:
if k.get("kid") == kid:
return k
# Unknown kid → keys may have rotated; force a refresh once and retry.
keys = (_get_jwks(force=True) or {}).get("keys") or []
for k in keys:
if k.get("kid") == kid:
return k
return None
def verify_tessera_token(token: str) -> dict:
"""Verify a Tessera RS256 access token and return its decoded claims.
Raises HTTPException(401) on any failure (so callers can fall through to
the next auth method without leaking which check failed).
"""
if not token:
raise HTTPException(status_code=401, detail="No token")
try:
header = jwt.get_unverified_header(token)
except JWTError as exc:
raise HTTPException(status_code=401, detail="Malformed token") from exc
if header.get("alg") != "RS256":
raise HTTPException(status_code=401, detail="Unexpected token algorithm")
key = _key_for_kid(header.get("kid"))
if key is None:
raise HTTPException(status_code=401, detail="Unknown signing key")
try:
claims = jwt.decode(
token,
key,
algorithms=["RS256"],
issuer=settings.TESSERA_ISSUER,
audience=settings.TESSERA_AUDIENCE,
options={"require_exp": True, "require_iat": True},
)
except JWTError as exc:
raise HTTPException(status_code=401, detail="Invalid Tessera token") from exc
if not claims.get("sub"):
raise HTTPException(status_code=401, detail="Token missing subject")
return claims
def _collect_roles(claims: dict) -> set[str]:
"""Tessera/Keycloak roles: realm_access.roles + resource_access.<client>.roles.
Mirrors app/api/routers/oidc._collect_roles (normalised lower-case, no
leading slash).
"""
roles: set[str] = set()
ra = claims.get("realm_access")
if isinstance(ra, dict):
roles.update(ra.get("roles") or [])
res = claims.get("resource_access")
if isinstance(res, dict):
for v in res.values():
if isinstance(v, dict):
roles.update(v.get("roles") or [])
return {str(r).strip().lstrip("/").lower() for r in roles if r}
# Token roles (lower-case) mapped to an hf global role name, highest first.
# The first match wins. Token "admin" → hf admin (sets is_admin); others map
# onto the existing global role hierarchy.
_ROLE_PRIORITY = ["admin", "mgr", "dev", "member", "viewer", "guest"]
def _resolve_global_role(db: Session, token_roles: set[str]) -> Role | None:
for name in _ROLE_PRIORITY:
if name in token_roles:
role = db.query(Role).filter(
Role.is_global == True, # noqa: E712
Role.name == name,
).first()
if role:
return role
# No recognised role → fall back to guest (least privilege).
return db.query(Role).filter(
Role.is_global == True, # noqa: E712
Role.name == "guest",
).first()
def _unique_username(db: Session, base: str) -> str:
base = (base or "tessera-user").strip() or "tessera-user"
candidate = base
n = 1
while db.query(models.User).filter(models.User.username == candidate).first():
n += 1
candidate = f"{base}-{n}"
return candidate
def resolve_or_provision_user(db: Session, claims: dict) -> models.User:
"""Resolve the hf User for a verified Tessera token, auto-provisioning one
if no match exists. Mirrors the OIDC callback binding/provisioning."""
issuer = claims.get("iss") or settings.TESSERA_ISSUER
subject = claims.get("sub")
email = (claims.get("email") or "").strip() or None
username = (claims.get("preferred_username") or "").strip() or None
token_roles = _collect_roles(claims)
is_admin = "admin" in token_roles
# 1) by (issuer, subject)
user = db.query(models.User).filter(
models.User.oidc_issuer == issuer,
models.User.oidc_subject == subject,
).first()
# 2) by email — bind this Tessera identity onto the existing account.
if user is None and email:
user = db.query(models.User).filter(models.User.email == email).first()
if user is not None:
if user.oidc_subject and user.oidc_subject != subject:
# Email belongs to a user already bound to a different identity.
raise HTTPException(status_code=401, detail="Account already bound to another identity")
user.oidc_issuer = issuer
user.oidc_subject = subject
db.commit()
# 3) auto-provision
if user is None:
role = _resolve_global_role(db, token_roles)
if role is None:
raise HTTPException(status_code=500, detail="No global role available (DB not seeded)")
uname = _unique_username(db, username or (email.split("@")[0] if email else None) or f"tessera-{subject[:8]}")
# Email is NOT NULL + unique; synthesise a stable placeholder if absent.
eff_email = email or f"{subject}@tessera.local"
if db.query(models.User).filter(models.User.email == eff_email).first():
eff_email = f"{subject}@tessera.local"
user = models.User(
username=uname,
email=eff_email,
full_name=(claims.get("name") or username or uname),
hashed_password=None,
oidc_issuer=issuer,
oidc_subject=subject,
is_active=True,
is_admin=is_admin,
role_id=role.id,
)
db.add(user)
db.commit()
db.refresh(user)
logger.info("Tessera: provisioned user '%s' (admin=%s) for subject %s", user.username, is_admin, subject)
return user
if not user.is_active or user.username in ("acc-mgr", "deleted-user"):
raise HTTPException(status_code=401, detail="User is not permitted to sign in")
# Keep admin status in sync with the token's realm/client roles on each
# request so role changes in Tessera take effect without re-provisioning.
if bool(user.is_admin) != is_admin:
user.is_admin = is_admin
db.commit()
db.refresh(user)
return user
def authenticate_tessera(db: Session, token: str) -> models.User:
"""Verify a Tessera bearer token and return the resolved hf User."""
claims = verify_tessera_token(token)
return resolve_or_provision_user(db, claims)

View File

@@ -22,6 +22,17 @@ class Settings(BaseSettings):
# in via a bound OIDC identity / API keys), frontend hides password UI.
HARBORFORGE_OIDC_ONLY: bool = False
# Mark the OIDC state/session cookie Secure (HTTPS-only). Defaults to True
# for production; set SESSION_COOKIE_SECURE=false for plain-HTTP local dev.
SESSION_COOKIE_SECURE: bool = True
# External OIDC provider ("Tessera", Keycloak-compatible) whose RS256
# access tokens are accepted as API bearer tokens (additive to local
# HS256 JWT + API keys). Tokens are verified against the issuer's JWKS;
# `iss` must equal TESSERA_ISSUER and `aud` must contain TESSERA_AUDIENCE.
TESSERA_ISSUER: str = "https://login.hangman-lab.top/realms/Hangman-Lab"
TESSERA_AUDIENCE: str = "harbor-forge"
class Config:
env_file = ".env"

View File

@@ -36,8 +36,14 @@ DEFAULT_PERMISSIONS = [
# Project permissions
("project.read", "View project", "project"),
("project.write", "Edit project", "project"),
("project.create", "Create a project", "project"),
("project.delete", "Delete project", "project"),
("project.manage_members", "Manage project members", "project"),
# Knowledge base permissions
("knowledge-base.read", "View knowledge bases", "knowledge-base"),
("knowledge-base.create", "Create a knowledge base", "knowledge-base"),
("knowledge-base.update", "Edit a knowledge base and its structure", "knowledge-base"),
("knowledge-base.delete", "Delete a knowledge base", "knowledge-base"),
# Task/Milestone permissions
("task.create", "Create tasks", "task"),
("task.read", "View tasks", "task"),
@@ -76,6 +82,12 @@ DEFAULT_PERMISSIONS = [
("calendar.manage", "Manage calendar settings and workload policies", "calendar"),
# Webhook
("webhook.manage", "Manage webhooks", "admin"),
# Project member management (used by DELETE /projects/{id}/members/{user_id})
("member.remove", "Remove a project member", "project"),
# Schedule type (calendar templates) — read covers list+detail, manage covers
# create/edit/delete on schedule_types AND their special slots.
("schedule_type.read", "View schedule types and special slots", "calendar"),
("schedule_type.manage", "Create / edit / delete schedule types and slots", "calendar"),
]
@@ -98,7 +110,8 @@ def init_default_permissions(db: Session) -> list[Permission]:
# Default roles + permission set per role
# ---------------------------------------------------------------------------
_MGR_PERMISSIONS = {
"project.read", "project.write", "project.manage_members",
"project.read", "project.write", "project.create", "project.manage_members",
"knowledge-base.read", "knowledge-base.create", "knowledge-base.update", "knowledge-base.delete",
"task.create", "task.read", "task.write", "task.delete",
"milestone.create", "milestone.read", "milestone.write", "milestone.delete",
"milestone.freeze", "milestone.start", "milestone.close",
@@ -111,6 +124,7 @@ _MGR_PERMISSIONS = {
_DEV_PERMISSIONS = {
"project.read",
"knowledge-base.read", "knowledge-base.update",
"task.create", "task.read", "task.write",
"milestone.read",
"task.close", "task.reopen_closed", "task.reopen_completed",
@@ -131,6 +145,7 @@ _ACCOUNT_MANAGER_PERMISSIONS = {
# without admin intervention.
_GENERAL_AGENT_PERMISSIONS = {
"project.read",
"knowledge-base.read",
"task.read",
"milestone.read",
"monitor.read",

View File

@@ -27,7 +27,7 @@ app.add_middleware(
secret_key=settings.SECRET_KEY,
session_cookie="hf_oidc",
same_site="lax",
https_only=False,
https_only=settings.SESSION_COOKIE_SECURE,
max_age=600,
)
@@ -79,6 +79,7 @@ from app.api.routers.schedule_type import router as schedule_type_router
from app.api.routers.schedule_type_special_slot import router as schedule_type_special_slot_router
from app.api.routers.calendar import router as calendar_router
from app.api.routers.oidc import router as oidc_router
from app.api.routers.knowledge import router as knowledge_router
app.include_router(auth_router)
app.include_router(oidc_router)
@@ -99,6 +100,7 @@ app.include_router(essentials_router)
app.include_router(schedule_type_router)
app.include_router(schedule_type_special_slot_router)
app.include_router(calendar_router)
app.include_router(knowledge_router)
# Auto schema migration for lightweight deployments
@@ -449,6 +451,19 @@ def _migrate_schema():
"CREATE INDEX idx_time_slots_special_slot_id ON time_slots (special_slot_id)"
))
# --- api_keys: migrate legacy plaintext `key` -> hashed `key_hash` ---
# Only runs on deployments that still have the old plaintext column;
# fresh installs get key_hash/key_prefix directly from create_all.
if _has_table(db, "api_keys") and _has_column(db, "api_keys", "key"):
if not _has_column(db, "api_keys", "key_hash"):
db.execute(text("ALTER TABLE api_keys ADD COLUMN key_hash VARCHAR(64) NULL"))
if not _has_column(db, "api_keys", "key_prefix"):
db.execute(text("ALTER TABLE api_keys ADD COLUMN key_prefix VARCHAR(16) NULL"))
db.execute(text("ALTER TABLE api_keys MODIFY COLUMN `key` VARCHAR(64) NULL"))
db.execute(text("UPDATE api_keys SET key_hash = SHA2(`key`, 256), key_prefix = LEFT(`key`, 8) WHERE key_hash IS NULL AND `key` IS NOT NULL"))
db.execute(text("UPDATE api_keys SET `key` = NULL WHERE `key` IS NOT NULL"))
_ensure_unique_index(db, "api_keys", "idx_api_keys_key_hash", "key_hash")
# --- schedule_type_special_slots: create-table is handled by
# Base.metadata.create_all on first boot; no migration needed here
# because there is no legacy table to evolve. Future schema bumps
@@ -488,7 +503,7 @@ def _sync_default_user_roles(db):
@app.on_event("startup")
def startup():
from app.core.config import Base, engine, SessionLocal
from app.models import models, webhook, apikey, activity, milestone, notification, worklog, monitor, role_permission, task, support, meeting, proposal, propose, essential, agent, calendar, minimum_workload, schedule_type, schedule_type_special_slot, oidc_settings
from app.models import models, webhook, apikey, activity, milestone, notification, worklog, monitor, role_permission, task, support, meeting, proposal, propose, essential, agent, calendar, minimum_workload, schedule_type, schedule_type_special_slot, oidc_settings, knowledge
Base.metadata.create_all(bind=engine)
_migrate_schema()

View File

@@ -7,7 +7,10 @@ class APIKey(Base):
__tablename__ = "api_keys"
id = Column(Integer, primary_key=True, index=True)
key = Column(String(64), unique=True, nullable=False, index=True)
# The raw key is never stored — only its SHA-256 hash. `key_prefix` holds
# the first few chars for human-readable display/masking in listings.
key_hash = Column(String(64), unique=True, nullable=False, index=True)
key_prefix = Column(String(16), nullable=True)
name = Column(String(100), nullable=False) # e.g. "agent-zhi", "agent-lyn"
user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
is_active = Column(Boolean, default=True)

101
app/models/knowledge.py Normal file
View File

@@ -0,0 +1,101 @@
"""Knowledge Base models.
Mirrors the Project feature's shape (human-friendly *code*, creator FK,
created/updated timestamps). Hierarchy is:
knowledge_base
└─ knowledge_topic (unique per (topic, knowledge_base_id))
├─ knowledge_fact (category_id NULL → fact lives on the topic)
└─ knowledge_category (parent NULL → top-level category in topic)
├─ knowledge_fact
└─ knowledge_category (parent → nested)
`project_knowledge_base` is the M2M link between projects and knowledge bases.
Relationships are intentionally kept minimal (no ORM cascade on the
self-referential category tree); deletion ordering is handled explicitly in
the router to stay clear of FK-ordering surprises under MySQL.
"""
from sqlalchemy import Column, Integer, String, Text, DateTime, ForeignKey, UniqueConstraint
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
from app.core.config import Base
class KnowledgeBase(Base):
__tablename__ = "knowledge_bases"
id = Column(Integer, primary_key=True, index=True)
knowledge_base_code = Column(String(16), unique=True, index=True, nullable=True)
title = Column(String(200), nullable=False)
description = Column(Text, nullable=True)
created_by = Column(Integer, ForeignKey("users.id"), nullable=False)
created_at = Column(DateTime(timezone=True), server_default=func.now())
last_updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
creator = relationship("User", foreign_keys=[created_by])
class KnowledgeTopic(Base):
__tablename__ = "knowledge_topics"
__table_args__ = (
UniqueConstraint("topic", "knowledge_base_id", name="uq_knowledge_topic_kb"),
)
id = Column(Integer, primary_key=True, index=True)
topic = Column(String(200), nullable=False)
knowledge_base_id = Column(Integer, ForeignKey("knowledge_bases.id"), nullable=False, index=True)
description = Column(Text, nullable=True)
created_by = Column(Integer, ForeignKey("users.id"), nullable=False)
created_at = Column(DateTime(timezone=True), server_default=func.now())
last_updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
creator = relationship("User", foreign_keys=[created_by])
class KnowledgeCategory(Base):
__tablename__ = "knowledge_categories"
__table_args__ = (
# NOTE: MySQL treats NULLs as distinct in a UNIQUE index, so this only
# enforces uniqueness for non-NULL `parent`. Top-level categories
# (parent IS NULL) are de-duped in the router (application-level check).
UniqueConstraint("topic_id", "parent", "name", name="uq_knowledge_category_triple"),
)
id = Column(Integer, primary_key=True, index=True)
name = Column(String(200), nullable=False)
parent = Column(Integer, ForeignKey("knowledge_categories.id"), nullable=True, index=True)
topic_id = Column(Integer, ForeignKey("knowledge_topics.id"), nullable=False, index=True)
description = Column(Text, nullable=True)
created_by = Column(Integer, ForeignKey("users.id"), nullable=True)
last_updated_by = Column(Integer, ForeignKey("users.id"), nullable=True)
class KnowledgeFact(Base):
__tablename__ = "knowledge_facts"
id = Column(Integer, primary_key=True, index=True)
category_id = Column(Integer, ForeignKey("knowledge_categories.id"), nullable=True, index=True)
topic_id = Column(Integer, ForeignKey("knowledge_topics.id"), nullable=False, index=True)
fact = Column(Text, nullable=False)
last_updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
class ProjectKnowledgeBase(Base):
__tablename__ = "project_knowledge_bases"
__table_args__ = (
UniqueConstraint("project_id", "knowledge_base_id", name="uq_project_knowledge_base"),
)
id = Column(Integer, primary_key=True, index=True)
project_id = Column(Integer, ForeignKey("projects.id"), nullable=False, index=True)
knowledge_base_id = Column(Integer, ForeignKey("knowledge_bases.id"), nullable=False, index=True)
class KnowledgeBaseCodeCounter(Base):
__tablename__ = "knowledge_base_code_counters"
id = Column(Integer, primary_key=True, index=True)
prefix = Column(String(16), unique=True, index=True, nullable=False)
next_value = Column(Integer, default=0)

166
app/schemas/knowledge.py Normal file
View File

@@ -0,0 +1,166 @@
"""Pydantic schemas for the Knowledge Base feature."""
from pydantic import BaseModel
from typing import Optional, List
from datetime import datetime
# --------------------------------------------------------------------------
# Knowledge Base
# --------------------------------------------------------------------------
class KnowledgeBaseBase(BaseModel):
title: str
description: Optional[str] = None
class KnowledgeBaseCreate(KnowledgeBaseBase):
pass
class KnowledgeBaseUpdate(BaseModel):
title: Optional[str] = None
description: Optional[str] = None
class KnowledgeBaseResponse(BaseModel):
id: int
knowledge_base_code: Optional[str] = None
title: str
description: Optional[str] = None
created_by: int
created_at: Optional[datetime] = None
last_updated_at: Optional[datetime] = None
class Config:
from_attributes = True
# --------------------------------------------------------------------------
# Topic
# --------------------------------------------------------------------------
class KnowledgeTopicBase(BaseModel):
topic: str
description: Optional[str] = None
class KnowledgeTopicCreate(KnowledgeTopicBase):
pass
class KnowledgeTopicUpdate(BaseModel):
topic: Optional[str] = None
description: Optional[str] = None
class KnowledgeTopicResponse(BaseModel):
id: int
topic: str
knowledge_base_id: int
description: Optional[str] = None
created_by: int
created_at: Optional[datetime] = None
last_updated_at: Optional[datetime] = None
class Config:
from_attributes = True
# --------------------------------------------------------------------------
# Category
# --------------------------------------------------------------------------
class KnowledgeCategoryBase(BaseModel):
name: str
description: Optional[str] = None
class KnowledgeCategoryCreate(KnowledgeCategoryBase):
topic_id: int
parent: Optional[int] = None
class KnowledgeCategoryUpdate(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
parent: Optional[int] = None
class KnowledgeCategoryResponse(BaseModel):
id: int
name: str
parent: Optional[int] = None
topic_id: int
description: Optional[str] = None
created_by: Optional[int] = None
last_updated_by: Optional[int] = None
class Config:
from_attributes = True
# --------------------------------------------------------------------------
# Fact
# --------------------------------------------------------------------------
class KnowledgeFactBase(BaseModel):
fact: str
class KnowledgeFactCreate(KnowledgeFactBase):
topic_id: int
category_id: Optional[int] = None
class KnowledgeFactUpdate(BaseModel):
fact: Optional[str] = None
category_id: Optional[int] = None
class KnowledgeFactResponse(BaseModel):
id: int
category_id: Optional[int] = None
topic_id: int
fact: str
last_updated_at: Optional[datetime] = None
class Config:
from_attributes = True
# --------------------------------------------------------------------------
# Project <-> KnowledgeBase link
# --------------------------------------------------------------------------
class ProjectKnowledgeBaseLink(BaseModel):
# Accept either a numeric id or a knowledge_base_code (mirrors how
# projects are referenced elsewhere).
knowledge_base: str
# --------------------------------------------------------------------------
# Nested tree (read-only aggregate)
# --------------------------------------------------------------------------
class CategoryTreeNode(BaseModel):
id: int
name: str
parent: Optional[int] = None
topic_id: int
description: Optional[str] = None
categories: List["CategoryTreeNode"] = []
facts: List[KnowledgeFactResponse] = []
class TopicTreeNode(BaseModel):
id: int
topic: str
knowledge_base_id: int
description: Optional[str] = None
categories: List[CategoryTreeNode] = []
facts: List[KnowledgeFactResponse] = []
class KnowledgeBaseTree(BaseModel):
id: int
knowledge_base_code: Optional[str] = None
title: str
description: Optional[str] = None
topics: List[TopicTreeNode] = []
CategoryTreeNode.model_rebuild()

View File

@@ -105,7 +105,9 @@ class CommentBase(BaseModel):
class CommentCreate(CommentBase):
task_id: int
author_id: int
# author_id is NOT accepted from the client — the comment is always
# attributed to the authenticated caller (server-side) to prevent
# author spoofing.
class CommentUpdate(BaseModel):

View File

@@ -0,0 +1,147 @@
"""Knowledge Base API tests — CRUD, hierarchy, uniqueness, tree, links, RBAC."""
from tests.conftest import auth_header
def _create_kb(client, token, title="Infra Runbook", description="ops notes"):
r = client.post(
"/knowledge-bases",
json={"title": title, "description": description},
headers=auth_header(token),
)
assert r.status_code == 201, r.text
return r.json()
class TestKnowledgeBaseCRUD:
def test_create_generates_code(self, client, seed):
kb = _create_kb(client, seed["admin_token"], title="Infra Runbook")
assert kb["title"] == "Infra Runbook"
assert kb["knowledge_base_code"] # auto-generated, non-empty
assert kb["created_by"] == seed["admin_user"].id
def test_create_requires_permission(self, client, seed):
# dev role has no knowledge-base.create
r = client.post(
"/knowledge-bases",
json={"title": "Nope"},
headers=auth_header(seed["dev_token"]),
)
assert r.status_code == 403
def test_get_by_id_and_code(self, client, seed):
kb = _create_kb(client, seed["admin_token"])
by_id = client.get(f"/knowledge-bases/{kb['id']}", headers=auth_header(seed["admin_token"]))
by_code = client.get(f"/knowledge-bases/{kb['knowledge_base_code']}", headers=auth_header(seed["admin_token"]))
assert by_id.status_code == 200 and by_code.status_code == 200
assert by_id.json()["id"] == by_code.json()["id"] == kb["id"]
def test_update_and_list(self, client, seed):
kb = _create_kb(client, seed["admin_token"])
r = client.patch(
f"/knowledge-bases/{kb['id']}",
json={"description": "updated"},
headers=auth_header(seed["admin_token"]),
)
assert r.status_code == 200 and r.json()["description"] == "updated"
lst = client.get("/knowledge-bases", headers=auth_header(seed["admin_token"]))
assert lst.status_code == 200 and any(k["id"] == kb["id"] for k in lst.json())
def test_delete_cascades(self, client, seed):
token = seed["admin_token"]
kb = _create_kb(client, token)
topic = client.post(f"/knowledge-bases/{kb['id']}/topics", json={"topic": "Net"}, headers=auth_header(token)).json()
client.post("/knowledge-facts", json={"topic_id": topic["id"], "fact": "x"}, headers=auth_header(token))
r = client.delete(f"/knowledge-bases/{kb['id']}", headers=auth_header(token))
assert r.status_code == 204
assert client.get(f"/knowledge-bases/{kb['id']}", headers=auth_header(token)).status_code == 404
assert client.get(f"/knowledge-topics/{topic['id']}", headers=auth_header(token)).status_code == 404
class TestHierarchy:
def test_topic_unique_per_kb(self, client, seed):
token = seed["admin_token"]
kb = _create_kb(client, token)
r1 = client.post(f"/knowledge-bases/{kb['id']}/topics", json={"topic": "Routing"}, headers=auth_header(token))
r2 = client.post(f"/knowledge-bases/{kb['id']}/topics", json={"topic": "Routing"}, headers=auth_header(token))
assert r1.status_code == 201 and r2.status_code == 400
def test_category_triple_unique_and_nesting(self, client, seed):
token = seed["admin_token"]
kb = _create_kb(client, token)
topic = client.post(f"/knowledge-bases/{kb['id']}/topics", json={"topic": "T"}, headers=auth_header(token)).json()
# top-level category
c1 = client.post("/knowledge-categories", json={"topic_id": topic["id"], "name": "DNS"}, headers=auth_header(token))
assert c1.status_code == 201
# duplicate top-level (parent NULL) rejected at app level
dup = client.post("/knowledge-categories", json={"topic_id": topic["id"], "name": "DNS"}, headers=auth_header(token))
assert dup.status_code == 400
# nested category with same name under different parent is allowed
child = client.post(
"/knowledge-categories",
json={"topic_id": topic["id"], "name": "DNS", "parent": c1.json()["id"]},
headers=auth_header(token),
)
assert child.status_code == 201
def test_no_cycle_on_reparent(self, client, seed):
token = seed["admin_token"]
kb = _create_kb(client, token)
topic = client.post(f"/knowledge-bases/{kb['id']}/topics", json={"topic": "T"}, headers=auth_header(token)).json()
a = client.post("/knowledge-categories", json={"topic_id": topic["id"], "name": "A"}, headers=auth_header(token)).json()
b = client.post("/knowledge-categories", json={"topic_id": topic["id"], "name": "B", "parent": a["id"]}, headers=auth_header(token)).json()
# try to move A under its descendant B -> rejected
r = client.patch(f"/knowledge-categories/{a['id']}", json={"parent": b["id"]}, headers=auth_header(token))
assert r.status_code == 400
def test_tree_shape(self, client, seed):
token = seed["admin_token"]
kb = _create_kb(client, token)
topic = client.post(f"/knowledge-bases/{kb['id']}/topics", json={"topic": "T"}, headers=auth_header(token)).json()
cat = client.post("/knowledge-categories", json={"topic_id": topic["id"], "name": "C"}, headers=auth_header(token)).json()
# fact directly on topic
client.post("/knowledge-facts", json={"topic_id": topic["id"], "fact": "topic-fact"}, headers=auth_header(token))
# fact under category
client.post("/knowledge-facts", json={"topic_id": topic["id"], "category_id": cat["id"], "fact": "cat-fact"}, headers=auth_header(token))
tree = client.get(f"/knowledge-bases/{kb['id']}/tree", headers=auth_header(token)).json()
assert len(tree["topics"]) == 1
t = tree["topics"][0]
assert [f["fact"] for f in t["facts"]] == ["topic-fact"]
assert len(t["categories"]) == 1
assert [f["fact"] for f in t["categories"][0]["facts"]] == ["cat-fact"]
class TestProjectLinks:
def test_link_unlink(self, client, seed):
token = seed["admin_token"]
kb = _create_kb(client, token)
# link by code
r = client.post(
"/projects/TPRJ/knowledge-bases",
json={"knowledge_base": kb["knowledge_base_code"]},
headers=auth_header(token),
)
assert r.status_code == 201
linked = client.get("/projects/TPRJ/knowledge-bases", headers=auth_header(token)).json()
assert any(k["id"] == kb["id"] for k in linked)
# filter list by project
filtered = client.get(f"/knowledge-bases?project=TPRJ", headers=auth_header(token)).json()
assert any(k["id"] == kb["id"] for k in filtered)
# unlink
r = client.delete(f"/projects/TPRJ/knowledge-bases/{kb['id']}", headers=auth_header(token))
assert r.status_code == 204
linked = client.get("/projects/TPRJ/knowledge-bases", headers=auth_header(token)).json()
assert not any(k["id"] == kb["id"] for k in linked)
def test_link_is_idempotent(self, client, seed):
token = seed["admin_token"]
kb = _create_kb(client, token)
for _ in range(2):
r = client.post(
"/projects/TPRJ/knowledge-bases",
json={"knowledge_base": str(kb["id"])},
headers=auth_header(token),
)
assert r.status_code == 201
linked = client.get("/projects/TPRJ/knowledge-bases", headers=auth_header(token)).json()
assert sum(1 for k in linked if k["id"] == kb["id"]) == 1