add: remove secret strings

This commit is contained in:
h z
2025-05-23 19:44:49 +01:00
commit f96707629f
32 changed files with 597 additions and 0 deletions

28
Dockerfile Normal file
View File

@@ -0,0 +1,28 @@
FROM python:3.12-slim
ENV PYTHONUNBUFFERED=1
RUN apt-get update \
&& apt-get install -y --no-install-recommends \
openssh-client \
curl \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY pyproject.toml ./
COPY src/ ./src
COPY app.py ./
COPY project_plan.md ./
COPY scripts/ ./scripts
RUN pip install --upgrade pip \
&& pip install uv \
&& uv pip install .
RUN mkdir logs
EXPOSE 5058 5059
ENTRYPOINT ["uv", "run", "python", "app.py"]

16
app.py Normal file
View File

@@ -0,0 +1,16 @@
import threading
from mcp_service import start_mcp
from api_service import start_api
if __name__ == '__main__':
t_mcp = threading.Thread(target=start_mcp, daemon=True)
t_api = threading.Thread(target=start_api, daemon=True)
t_mcp.start()
t_api.start()
t_mcp.join()
t_api.join()

13
project_plan.md Normal file
View File

@@ -0,0 +1,13 @@
---
Project goal is to make a local codebase knowledge management system to assist pair-coding model agent understanding codebase
### Designed features
- Lazy load, only generate abstract(knowledge) for files/directories that are required by a model
- Knowledge access:
- more like a cache, don't let the model query knowledge by semantic directly, instead, provide list of topics as hot spots first
- if hot topics didn't hit, list the root dir of code base, let model determine which file to analysis or directory to further investigate
- the analysis of file is also done by a model, whom may also request knowledge of another file to understand the current one
- some machinism needed to prevent circular dependency(e.g. require file A to understand file B, and require file B to understand file A)
- Codebase access:
- service connects to a workspace that contains the codebase via a ssh session

18
pyproject.toml Normal file
View File

@@ -0,0 +1,18 @@
[project]
name = "ckb"
version = "0.1.0"
description = "Add your description here"
requires-python = ">=3.12"
dependencies = [
"anthropic>=0.51.0",
"deepseek>=1.0.0",
"generativeai>=0.0.1",
"google>=3.0.0",
"openai>=1.79.0",
"protobuf>=6.31.0",
]
[tool.setuptools]
package-dir = {"" = "src"}
[tool.setuptools.packages.find]
where = ["src"]

0
src/agents/__init__.py Normal file
View File

View File

View File

@@ -0,0 +1,40 @@
general_sys_msg = """
You are a {role}
Your task is {task}
You have access to the following tools:
{tools}
If you have any tool whose name starts with res_tool_
You should call that tool right before the final answer
e.g.
Thought: calling mandatory res_tool
Action: res_tool_general_response
Action Input: ...
Observation: ...
Final Answer: ...
Use the following format:
```
Question: the question you must answer
If you want to use tools:
Thought: always reason what to do
Action: the action to take, must be one of [{tool_names}]
Action Input: the input to the action
Observation: the result of the action
If no tool is needed:
Thought: what you are thinking
(the Thought or Thought/Action/... can repeat multiple times))
Final Answer: Final response to the user message
```
The user message is {user_message}
"""

View File

View File

View File

@@ -0,0 +1,8 @@
from langchain_core.tools import tool
@tool
def res_tool_general_response(session_id: str, response: str):
return {
'session_id': session_id,
'response': response
}

View File

@@ -0,0 +1,17 @@
import importlib
import pkgutil
import uvicorn
from fastapi import FastAPI
api = FastAPI()
for finder, name, ispkg, in pkgutil.iter_modules(__path__):
module = importlib.import_module(f'{__name__}.{name}')
if hasattr(module, 'router'):
api.include_router(module.router)
def start_api():
uvicorn.run(api, port=5059, host='0.0.0.0')

View File

@@ -0,0 +1,9 @@
from odmantic import Model
from db_models.embedded_models.Codebase import Codebase
class BinaryLibrary(Model):
codebase: Codebase
path: str
abstract: str

View File

@@ -0,0 +1,9 @@
from odmantic import Model
from db_models.embedded_models.Codebase import Codebase
class BinaryTool(Model):
codebase: Codebase
path: str
abstract: str

16
src/db_models/CodeFile.py Normal file
View File

@@ -0,0 +1,16 @@
from odmantic import Model
from typing import List
from db_models.embedded_models.CodeSegment import CodeSegment
from db_models.embedded_models.Codebase import Codebase
class CodeFile(Model):
codebase: Codebase
type: str
path: str
md5: str
abstract: str
segments: List[CodeSegment]
scanned: bool

View File

@@ -0,0 +1,13 @@
from odmantic import Model
from db_models.embedded_models.Codebase import Codebase
class ConfigFile(Model):
codebase: Codebase
type: str
path: str
md5: str
abstract: str
scanned: bool

View File

@@ -0,0 +1,11 @@
from odmantic import Model
from db_models.embedded_models.Codebase import Codebase
class Directory(Model):
codebase: Codebase
path: str
md5: str
abstract: str
scanned: bool

9
src/db_models/Hotspot.py Normal file
View File

@@ -0,0 +1,9 @@
from odmantic import Model
from typing import List
from db_models.embedded_models.Codebase import Codebase
class Hotspot(Model):
codebase: Codebase
topic: str
links: List[int]

View File

@@ -0,0 +1,9 @@
from odmantic import Model
from db_models.embedded_models.Codebase import Codebase
class IgnoreFile(Model):
codebase: Codebase
path: str
md5: str

View File

View File

@@ -0,0 +1,8 @@
from odmantic import EmbeddedModel
from typing import List
class CodeSegment(EmbeddedModel):
line_start: int
line_end: int
abstract: str
links: List[str]

View File

@@ -0,0 +1,9 @@
from odmantic import EmbeddedModel
class Codebase(EmbeddedModel):
name: str
version: str
branch: str
path: str
repo: str

View File

@@ -0,0 +1,12 @@
import importlib
import pkgutil
from fastmcp import FastMCP
mcp = FastMCP("ckb")
for finder, modname, ispkg, in pkgutil.walk_packages(__path__, __name__ + '.'):
importlib.import_module(modname)
def start_mcp():
mcp.run(transport='sse', port=5058, host='0.0.0.0', path='/sse')

View File

@@ -0,0 +1,8 @@
from mcp_service import mcp
@mcp.prompt()
def scan_file():
return """
"""

View File

@@ -0,0 +1,168 @@
from typing import List, Dict, Any
from mcp_service import mcp
@mcp.tool()
async def scan_file(codebase, file_path):
"""
generate knowledge abstract for a specific file with model
steps:
refer to scan_directory, determine type and then generate knowledge abstract in db
:param codebase: codebase of the file
:param file_path: path to the file
:return: {
"status": "success"| "failure",
"result": generated CodeFile/ConfigFile document in json
}
"""
pass
@mcp.tool()
async def scan_directory(codebase, directory_path):
"""
steps:
1. list all files and directories in the directory
2. for each file and directory, determine the type by its name and extension
3. if document of the file exists in db, check md5 to see if it is changed, if changed, rescan it, otherwise skip it
4. if the file does not help the understanding of current codebase, e.g. .git or site-packages, markit as ignore and skip it
5. if the file is a config file, scan it and generate knowledge abstract
6. if the file is a code file, scan it and generate knowledge abstract
7. if the file is a binary executable file and can not understand usage by its name, try execute it with arguments like `--help` in a sandbox
7.1 if execution gives help message, generate knowledge abstract
7.2 otherwise, dont touch it and skip it
8. if the file is a binary library file, try to understand it with static analysis tools like `ldd` or `objdump`
8.1 if you could understand it, generate knowledge abstract
8.2 otherwise, dont touch it and skip it
9. if the file is a directory, scan it recursively
:param codebase:
:param directory_path: path to the directory, relative to codebase root
:return: {status: "success"|"failure", result: list of generated CodeFile/ConfigFile documents in json}
"""
pass
@mcp.tool()
async def list_hot_spots(codebase, limit=10):
"""
list most visited hotspots in codebase
:param codebase: which codebase
:param limit: how many hotspots to list
:return: list of keywords for existing hotspots in the codebase
"""
pass
@mcp.tool()
async def list_directory(codebase, path, include_ignore=True):
"""
list all files and directories in the directory, result string is equal to `ls -la` command,
:param codebase:
:param path:
:param include_ignore: if true, files marked as ignore will be included in the result
:return: {
status: "success"|"failure",
result: selected lines base on include ignores from `ls -la` command if success
}
"""
pass
@mcp.tool()
async def read_file(codebase, path):
"""
read content of the file
:param codebase:
:param path:
:return: {
status: "success"|"failure",
result: content of the file if success
}
"""
pass
@mcp.tool()
async def read_file_knowledge(codebase, path):
"""
read abstract of the file from db, if not exist, generate it
:param codebase:
:param path:
:return: {
status: "success"|"failure",
result: CodeFile/ConfigFile document is success
}
"""
pass
@mcp.tool()
async def read_snippet(codebase, line_start, line_end):
"""
read specific lines of code from file, other parts are replaced by their abstracts from db
:param codebase:
:param line_start:
:param line_end:
:return:
"""
pass
class EditPatch:
def __init__(self, edit_type, line_start, line_end, content):
"""
:param edit_type: "add"|"remove"|"replace"
:param line_start:
:param line_end:
:param content: only used for add and replace, for remove, content is ignored
"""
self.edit_type = edit_type
self.line_start = line_start
self.line_end = line_end
self.content = content
@mcp.tool()
async def edit_file(codebase, file_path, patches):
"""
edit file with patches, trigger scan_file after edit, and update parent directories recursively till the codebase root
:param codebase:
:param file_path:
:param patches: list of edit patches, patches done in parallel, all line numbers in patches are referring to the original file
:return:
"""
pass
def apply_patches(content: str, patches: List[EditPatch]) -> Dict[str, Any]:
try:
lines = content.splitlines()
result_lines = []
current_line = 0
sorted_patches = sorted(patches, key=lambda x: x.line_start)
for patch in sorted_patches:
edit_type = patch.edit_type
line_start = patch.line_start - 1
line_end = patch.line_end - 1
while current_line < line_start:
result_lines.append(lines[current_line])
current_line += 1
if edit_type == "add":
new_lines = patch['content'].splitlines()
result_lines.extend(new_lines)
elif edit_type == "remove":
current_line = line_end + 1
elif edit_type == "replace":
new_lines = patch['content'].splitlines()
result_lines.extend(new_lines)
current_line = line_end + 1
else:
return {"status": "failure", "result": f"Unknown edit type: {edit_type}"}
while current_line < len(lines):
result_lines.append(lines[current_line])
current_line += 1
return {"status": "success", "result": "\n".join(result_lines)}
except Exception as e:
return {"status": "failure", "result": str(e)}

0
src/utils/__init__.py Normal file
View File

View File

@@ -0,0 +1,38 @@
import os
from threading import Lock
from pymongo import MongoClient
_client = None
_db = None
_lock = Lock()
def init_db():
global _client, _db
if _client is None:
with _lock:
if _client is None:
uri = os.getenv('MONGODB_URI', 'mongodb://localhost:27017')
db_name = 'ckb'
max_pool = 100
_client = MongoClient(uri, maxPoolSize=max_pool)
if db_name not in _client.list_database_names():
tmp = _client[db_name].create_collection('_init')
_client[db_name].drop_collection('_init')
_db = _client[db_name]
return _db
def get_db():
if _db is None:
return init_db()
return _db
def get_client():
if _client is None:
init_db()
return _client

View File

@@ -0,0 +1,25 @@
import os
PROVIDER_API_KEYS = {
'openai': os.getenv('OPENAI_API_KEY', ''),
'deepseek': os.getenv('DEEPSEEK_API_KEY', ''),
'anthropic': os.getenv('ANTHROPIC_API_KEY', ''),
'google': os.getenv('GOOGLE_API_KEY', ''),
}
def set_openai_api_key(api_key: str):
global PROVIDER_API_KEYS
PROVIDER_API_KEYS['openai'] = api_key
def set_deepseek_api_key(api_key: str):
global PROVIDER_API_KEYS
PROVIDER_API_KEYS['deepseek'] = api_key
def set_anthropic_api_key(api_key: str):
global PROVIDER_API_KEYS
PROVIDER_API_KEYS['anthropic'] = api_key
def set_google_api_key(api_key: str):
global PROVIDER_API_KEYS
PROVIDER_API_KEYS['google'] = api_key

View File

@@ -0,0 +1,112 @@
import os
import paramiko
from threading import Lock
from typing import Tuple, Optional, List, Dict, Any
import json
class SSHConnectionManager:
_clients = {}
_lock = Lock()
HOST = os.getenv('SSH_HOST', 'host.docker.internal')
USERNAME = os.getenv('SSH_USERNAME')
PORT = os.getenv('SSH_PORT', 22)
PASSWORD = os.getenv('SSH_PASSWORD')
@classmethod
def get_client(cls, timeout=10):
key = (cls.HOST, cls.PORT, cls.USERNAME)
with cls._lock:
if key not in cls._clients:
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
pkey = None
client.connect(
hostname=cls.HOST,
port=cls.PORT,
username=cls.USERNAME,
password=cls.PASSWORD,
pkey=pkey,
timeout=timeout,
)
cls._clients[key] = client
return cls._clients[key]
def execute_command(command: str, timeout: int = 30) -> Tuple[int, str, str]:
client = SSHConnectionManager.get_client(timeout=timeout)
stdin, stdout, stderr = client.exec_command(command, timeout=timeout)
exit_code = stdout.channel.recv_exit_status()
return exit_code, stdout.read().decode('utf-8'), stderr.read().decode('utf-8')
def list_directory(path: str, include_ignore: bool = True) -> Dict[str, Any]:
try:
client = SSHConnectionManager.get_client()
sftp = client.open_sftp()
files = sftp.listdir_attr(path)
result = []
for file in files:
if not include_ignore and file.filename.startswith('.'):
continue
result.append({
'name': file.filename,
'size': file.st_size,
'mode': file.st_mode,
'mtime': file.st_mtime,
'is_dir': file.st_mode & 0o40000 != 0
})
sftp.close()
return {"status": "success", "result": result}
except Exception as e:
return {"status": "failure", "result": str(e)}
def read_file_content(path: str) -> Dict[str, Any]:
try:
client = SSHConnectionManager.get_client()
sftp = client.open_sftp()
with sftp.open(path, 'r') as f:
content = f.read().decode('utf-8')
sftp.close()
return {"status": "success", "result": content}
except Exception as e:
return {"status": "failure", "result": str(e)}
def write_file_content(path: str, content: str) -> Dict[str, Any]:
try:
client = SSHConnectionManager.get_client()
sftp = client.open_sftp()
with sftp.open(path, 'w') as f:
f.write(content)
sftp.close()
return {"status": "success", "result": None}
except Exception as e:
return {"status": "failure", "result": str(e)}
def get_file_md5(path: str) -> Dict[str, Any]:
try:
exit_code, stdout, stderr = execute_command(f"md5sum {path}")
if exit_code == 0:
md5 = stdout.split()[0]
return {"status": "success", "result": md5}
return {"status": "failure", "result": stderr}
except Exception as e:
return {"status": "failure", "result": str(e)}
def execute_in_sandbox(command: str, timeout: int = 30) -> Dict[str, Any]:
try:
sandbox_cmd = f"docker run --rm --network none --memory=512m --cpus=1 alpine sh -c '{command}'"
exit_code, stdout, stderr = execute_command(sandbox_cmd, timeout)
if exit_code == 0:
return {"status": "success", "result": stdout}
return {"status": "failure", "result": stderr}
except Exception as e:
return {"status": "failure", "result": str(e)}

1
tests/conftest.py Normal file
View File

@@ -0,0 +1 @@
pytest_plugins = ["pytest_asyncio"]