构建自己的Harness系统--基于Kimi2.5大模型
跳转到导航
跳转到搜索
本文基于前文的Harness系统原理介绍,实现了基于Kimi2.5大模型的简单agent模型,重点在于学习和了解Harness的系统的实现原理。同时也做到了国内大模型的开箱即用。
#!/usr/bin/env python3
# Harness: all mechanisms combined -- the complete cockpit for the model.
"""
s_full.py - Full Reference Agent
Capstone implementation combining every mechanism from s01-s11.
Session s12 (task-aware worktree isolation) is taught separately.
NOT a teaching session -- this is the "put it all together" reference.
+------------------------------------------------------------------+
| FULL AGENT |
| |
| System prompt (s05 skills, task-first + optional todo nag) |
| |
| Before each LLM call: |
| +--------------------+ +------------------+ +--------------+ |
| | Microcompact (s06) | | Drain bg (s08) | | Check inbox | |
| | Auto-compact (s06) | | notifications | | (s09) | |
| +--------------------+ +------------------+ +--------------+ |
| |
| Tool dispatch (s02 pattern): |
| +--------+----------+----------+---------+-----------+ |
| | bash | read | write | edit | TodoWrite | |
| | task | load_sk | compress | bg_run | bg_check | |
| | t_crt | t_get | t_upd | t_list | spawn_tm | |
| | list_tm| send_msg | rd_inbox | bcast | shutdown | |
| | plan | idle | claim | | | |
| +--------+----------+----------+---------+-----------+ |
| |
| Subagent (s04): spawn -> work -> return summary |
| Teammate (s09): spawn -> work -> idle -> auto-claim (s11) |
| Shutdown (s10): request_id handshake |
| Plan gate (s10): submit -> approve/reject |
+------------------------------------------------------------------+
REPL commands: /compact /tasks /team /inbox
"""
import json
import os
import re
import subprocess
import threading
import time
import uuid
from pathlib import Path
from queue import Queue
from openai import OpenAI
WORKDIR = Path.cwd()
client = OpenAI(
api_key = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
base_url = "https://api.moonshot.cn/v1",
)
# https://api.moonshot.c
MODEL = "kimi-k2.5"
TEAM_DIR = WORKDIR / ".team"
INBOX_DIR = TEAM_DIR / "inbox"
TASKS_DIR = WORKDIR / ".tasks"
SKILLS_DIR = WORKDIR / "skills"
TRANSCRIPT_DIR = WORKDIR / ".transcripts"
TOKEN_THRESHOLD = 100000
POLL_INTERVAL = 5
IDLE_TIMEOUT = 60
VALID_MSG_TYPES = {"message", "broadcast", "shutdown_request",
"shutdown_response", "plan_approval_response"}
'''
kimi 提示顺序:
system:.....
user: .....
assistant: .....
tool: .....
tool: .....
assistant: .....'''
# === SECTION: base_tools ===
def safe_path(p: str) -> Path:
path = (WORKDIR / p).resolve()
if not path.is_relative_to(WORKDIR):
raise ValueError(f"Path escapes workspace: {p}")
return path
def run_bash(command: str) -> str:
dangerous = ["rm -rf /", "sudo", "shutdown", "reboot", "> /dev/"]
if any(d in command for d in dangerous):
return "Error: Dangerous command blocked"
try:
r = subprocess.run(command, shell=True, cwd=WORKDIR,
capture_output=True, text=True, timeout=120)
out = (r.stdout + r.stderr).strip()
return out[:50000] if out else "(no output)"
except subprocess.TimeoutExpired:
return "Error: Timeout (120s)"
def run_read(path: str, limit: int = None) -> str:
try:
lines = safe_path(path).read_text().splitlines()
if limit and limit < len(lines):
lines = lines[:limit] + [f"... ({len(lines) - limit} more)"]
return "\n".join(lines)[:50000]
except Exception as e:
return f"Error: {e}"
def run_write(path: str, content: str) -> str:
try:
fp = safe_path(path)
fp.parent.mkdir(parents=True, exist_ok=True)
fp.write_text(content)
return f"Wrote {len(content)} bytes to {path}"
except Exception as e:
return f"Error: {e}"
def run_edit(path: str, old_text: str, new_text: str) -> str:
try:
fp = safe_path(path)
c = fp.read_text()
if old_text not in c:
return f"Error: Text not found in {path}"
fp.write_text(c.replace(old_text, new_text, 1))
return f"Edited {path}"
except Exception as e:
return f"Error: {e}"
# === SECTION: todos (s03) ===
class TodoManager:
def __init__(self):
self.items = []
def update(self, items: list) -> str:
validated, ip = [], 0
for i, item in enumerate(items):
content = str(item.get("content", "")).strip()
status = str(item.get("status", "pending")).lower()
af = str(item.get("activeForm", "")).strip()
if not content: raise ValueError(f"Item {i}: content required")
if status not in ("pending", "in_progress", "completed"):
raise ValueError(f"Item {i}: invalid status '{status}'")
if not af: raise ValueError(f"Item {i}: activeForm required")
if status == "in_progress": ip += 1
validated.append({"content": content, "status": status, "activeForm": af})
if len(validated) > 20: raise ValueError("Max 20 todos")
if ip > 1: raise ValueError("Only one in_progress allowed")
self.items = validated
return self.render()
def render(self) -> str:
if not self.items: return "No todos."
lines = []
for item in self.items:
m = {"completed": "[x]", "in_progress": "[>]", "pending": "[ ]"}.get(item["status"], "[?]")
suffix = f" <- {item['activeForm']}" if item["status"] == "in_progress" else ""
lines.append(f"{m} {item['content']}{suffix}")
done = sum(1 for t in self.items if t["status"] == "completed")
lines.append(f"\n({done}/{len(self.items)} completed)")
return "\n".join(lines)
def has_open_items(self) -> bool:
return any(item.get("status") != "completed" for item in self.items)
# === SECTION: subagent (s04) ===
def run_subagent(prompt: str, agent_type: str = "Explore") -> str:
sub_tools = [
{
"type": "function",
"function": {
"name": "bash",
"description": "Run command.",
"parameters": {
"type": "object",
"properties": {"command": {"type": "string"}},
"required": ["command"]
}
}
},
{
"type": "function",
"function": {
"name": "read_file",
"description": "Read file.",
"parameters": {
"type": "object",
"properties": {"path": {"type": "string"}},
"required": ["path"]
}
}
}]
if agent_type != "Explore":
sub_tools += [
{
"type": "function",
"function": {
"name": "write_file",
"description": "Write file.",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string"},
"content": {"type": "string"}
},
"required": ["path", "content"]
}
}
},
{
"type": "function",
"function": {
"name": "edit_file",
"description": "Edit file.",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string"},
"old_text": {"type": "string"},
"new_text": {"type": "string"}
},
"required": ["path", "old_text", "new_text"]
}
}
}]
sub_handlers = {
"bash": lambda **kw: run_bash(kw["command"]),
"read_file": lambda **kw: run_read(kw["path"]),
"write_file": lambda **kw: run_write(kw["path"], kw["content"]),
"edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]),
}
sub_msgs = [{"role": "user", "content": prompt}]
resp = None
for _ in range(30):
resp = client.chat.completions.create(model=MODEL, messages=sub_msgs, tools=sub_tools)
msg = resp.choices[0].message
sub_msgs.append(msg)
if resp.choices[0].finish_reason != "tool_calls":
break
for b in msg.tool_calls or []:
if b.type == "function":
h = sub_handlers.get(b.function.name, lambda **kw: "Unknown tool")
output = h(**json.loads(b.function.arguments))
sub_msgs.append({"role": "tool", "tool_call_id": b.id, "name": b.function.name, "content": json.dumps({"result": str(output)})})
if resp:
return resp.choices[0].message.content or "(no summary)"
return "(subagent failed)"
# === SECTION: skills (s05) ===
class SkillLoader:
def __init__(self, skills_dir: Path):
self.skills = {}
if skills_dir.exists():
for f in sorted(skills_dir.rglob("SKILL.md")):
text = f.read_text()
match = re.match(r"^---\n(.*?)\n---\n(.*)", text, re.DOTALL)
meta, body = {}, text
if match:
for line in match.group(1).strip().splitlines():
if ":" in line:
k, v = line.split(":", 1)
meta[k.strip()] = v.strip()
body = match.group(2).strip()
name = meta.get("name", f.parent.name)
self.skills[name] = {"meta": meta, "body": body}
def descriptions(self) -> str:
if not self.skills: return "(no skills)"
return "\n".join(f" - {n}: {s['meta'].get('description', '-')}" for n, s in self.skills.items())
def load(self, name: str) -> str:
s = self.skills.get(name)
if not s: return f"Error: Unknown skill '{name}'. Available: {', '.join(self.skills.keys())}"
return f"<skill name=\"{name}\">\n{s['body']}\n</skill>"
# === SECTION: compression (s06) ===
def estimate_tokens(messages: list) -> int:
return len(json.dumps(messages, default=str)) // 4
def microcompact(messages: list):
indices = []
return
for i, msg in enumerate(messages):
if msg["role"] == "user" and isinstance(msg.get("content"), list):
for part in msg["content"]:
if isinstance(part, dict) and part.get("type") == "tool_result":
indices.append(part)
if len(indices) <= 3:
return
for part in indices[:-3]:
if isinstance(part.get("content"), str) and len(part["content"]) > 100:
part["content"] = "[cleared]"
def auto_compact(messages: list) -> list:
TRANSCRIPT_DIR.mkdir(exist_ok=True)
path = TRANSCRIPT_DIR / f"transcript_{int(time.time())}.jsonl"
with open(path, "w") as f:
for msg in messages:
f.write(json.dumps(msg, default=str) + "\n")
conv_text = json.dumps(messages, default=str)[:80000]
resp = client.chat.completions.create(
model=MODEL,
messages=[{"role": "user", "content": f"Summarize for continuity:\n{conv_text}"}],
max_tokens=2000,
)
summary = resp.choices[0].message.content
return [
{"role": "user", "content": f"[Compressed. Transcript: {path}]\n{summary}"},
{"role": "assistant", "content": "Understood. Continuing with summary context."},
]
# === SECTION: file_tasks (s07) ===
class TaskManager:
def __init__(self):
TASKS_DIR.mkdir(exist_ok=True)
def _next_id(self) -> int:
ids = [int(f.stem.split("_")[1]) for f in TASKS_DIR.glob("task_*.json")]
return max(ids, default=0) + 1
def _load(self, tid: int) -> dict:
p = TASKS_DIR / f"task_{tid}.json"
if not p.exists(): raise ValueError(f"Task {tid} not found")
return json.loads(p.read_text())
def _save(self, task: dict):
(TASKS_DIR / f"task_{task['id']}.json").write_text(json.dumps(task, indent=2))
def create(self, subject: str, description: str = "") -> str:
task = {"id": self._next_id(), "subject": subject, "description": description,
"status": "pending", "owner": None, "blockedBy": [], "blocks": []}
self._save(task)
return json.dumps(task, indent=2)
def get(self, tid: int) -> str:
return json.dumps(self._load(tid), indent=2)
def update(self, tid: int, status: str = None,
add_blocked_by: list = None, add_blocks: list = None) -> str:
task = self._load(tid)
if status:
task["status"] = status
if status == "completed":
for f in TASKS_DIR.glob("task_*.json"):
t = json.loads(f.read_text())
if tid in t.get("blockedBy", []):
t["blockedBy"].remove(tid)
self._save(t)
if status == "deleted":
(TASKS_DIR / f"task_{tid}.json").unlink(missing_ok=True)
return f"Task {tid} deleted"
if add_blocked_by:
task["blockedBy"] = list(set(task["blockedBy"] + add_blocked_by))
if add_blocks:
task["blocks"] = list(set(task["blocks"] + add_blocks))
self._save(task)
return json.dumps(task, indent=2)
def list_all(self) -> str:
tasks = [json.loads(f.read_text()) for f in sorted(TASKS_DIR.glob("task_*.json"))]
if not tasks: return "No tasks."
lines = []
for t in tasks:
m = {"pending": "[ ]", "in_progress": "[>]", "completed": "[x]"}.get(t["status"], "[?]")
owner = f" @{t['owner']}" if t.get("owner") else ""
blocked = f" (blocked by: {t['blockedBy']})" if t.get("blockedBy") else ""
lines.append(f"{m} #{t['id']}: {t['subject']}{owner}{blocked}")
return "\n".join(lines)
def claim(self, tid: int, owner: str) -> str:
task = self._load(tid)
task["owner"] = owner
task["status"] = "in_progress"
self._save(task)
return f"Claimed task #{tid} for {owner}"
# === SECTION: background (s08) ===
class BackgroundManager:
def __init__(self):
self.tasks = {}
self.notifications = Queue()
def run(self, command: str, timeout: int = 120) -> str:
tid = str(uuid.uuid4())[:8]
self.tasks[tid] = {"status": "running", "command": command, "result": None}
threading.Thread(target=self._exec, args=(tid, command, timeout), daemon=True).start()
return f"Background task {tid} started: {command[:80]}"
def _exec(self, tid: str, command: str, timeout: int):
try:
r = subprocess.run(command, shell=True, cwd=WORKDIR,
capture_output=True, text=True, timeout=timeout)
output = (r.stdout + r.stderr).strip()[:50000]
self.tasks[tid].update({"status": "completed", "result": output or "(no output)"})
except Exception as e:
self.tasks[tid].update({"status": "error", "result": str(e)})
self.notifications.put({"task_id": tid, "status": self.tasks[tid]["status"],
"result": self.tasks[tid]["result"][:500]})
def check(self, tid: str = None) -> str:
if tid:
t = self.tasks.get(tid)
return f"[{t['status']}] {t.get('result', '(running)')}" if t else f"Unknown: {tid}"
return "\n".join(f"{k}: [{v['status']}] {v['command'][:60]}" for k, v in self.tasks.items()) or "No bg tasks."
def drain(self) -> list:
notifs = []
while not self.notifications.empty():
notifs.append(self.notifications.get_nowait())
return notifs
# === SECTION: messaging (s09) ===
class MessageBus:
def __init__(self):
INBOX_DIR.mkdir(parents=True, exist_ok=True)
def send(self, sender: str, to: str, content: str,
msg_type: str = "message", extra: dict = None) -> str:
msg = {"type": msg_type, "from": sender, "content": content,
"timestamp": time.time()}
if extra: msg.update(extra)
with open(INBOX_DIR / f"{to}.jsonl", "a") as f:
f.write(json.dumps(msg) + "\n")
return f"Sent {msg_type} to {to}"
def read_inbox(self, name: str) -> list:
path = INBOX_DIR / f"{name}.jsonl"
if not path.exists(): return []
msgs = [json.loads(l) for l in path.read_text().strip().splitlines() if l]
path.write_text("")
return msgs
def broadcast(self, sender: str, content: str, names: list) -> str:
count = 0
for n in names:
if n != sender:
self.send(sender, n, content, "broadcast")
count += 1
return f"Broadcast to {count} teammates"
# === SECTION: shutdown + plan tracking (s10) ===
shutdown_requests = {}
plan_requests = {}
# === SECTION: team (s09/s11) ===
class TeammateManager:
def __init__(self, bus: MessageBus, task_mgr: TaskManager):
TEAM_DIR.mkdir(exist_ok=True)
self.bus = bus
self.task_mgr = task_mgr
self.config_path = TEAM_DIR / "config.json"
self.config = self._load()
self.threads = {}
def _load(self) -> dict:
if self.config_path.exists():
return json.loads(self.config_path.read_text())
return {"team_name": "default", "members": []}
def _save(self):
self.config_path.write_text(json.dumps(self.config, indent=2))
def _find(self, name: str) -> dict:
for m in self.config["members"]:
if m["name"] == name: return m
return None
def spawn(self, name: str, role: str, prompt: str) -> str:
member = self._find(name)
if member:
if member["status"] not in ("idle", "shutdown"):
return f"Error: '{name}' is currently {member['status']}"
member["status"] = "working"
member["role"] = role
else:
member = {"name": name, "role": role, "status": "working"}
self.config["members"].append(member)
self._save()
threading.Thread(target=self._loop, args=(name, role, prompt), daemon=True).start()
return f"Spawned '{name}' (role: {role})"
def _set_status(self, name: str, status: str):
member = self._find(name)
if member:
member["status"] = status
self._save()
def _loop(self, name: str, role: str, prompt: str):
team_name = self.config["team_name"]
sys_prompt = (f"You are '{name}', role: {role}, team: {team_name}, at {WORKDIR}. "
f"Use idle when done with current work. You may auto-claim tasks.")
messages = [{"role": "system", "content": sys_prompt}, {"role": "user", "content": prompt}]
tools = [
{
"type": "function",
"function": {"name": "bash", "description": "Run command.",
"parameters": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}}
},
{
"type": "function",
"function": {"name": "read_file", "description": "Read file.",
"parameters": {"type": "object", "properties": {"path": {"type": "string"}}, "required": ["path"]}}
},
{
"type": "function",
"function": {"name": "write_file", "description": "Write file.",
"parameters": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}}
},
{
"type": "function",
"function": {"name": "edit_file", "description": "Edit file.",
"parameters": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}}
},
{
"type": "function",
"function": {"name": "send_message", "description": "Send message.",
"parameters": {"type": "object", "properties": {"to": {"type": "string"}, "content": {"type": "string"}}, "required": ["to", "content"]}}
},
{
"type": "function",
"function": {"name": "idle", "description": "Signal no more work.",
"parameters": {"type": "object", "properties": {}}}
},
{
"type": "function",
"function": {"name": "claim_task", "description": "Claim task by ID.",
"parameters": {"type": "object", "properties": {"task_id": {"type": "integer"}}, "required": ["task_id"]}}
},
]
while True:
# -- WORK PHASE --
for _ in range(50):
inbox = self.bus.read_inbox(name)
for msg in inbox:
if msg.get("type") == "shutdown_request":
self._set_status(name, "shutdown")
return
messages.append({"role": "user", "content": json.dumps(msg)})
try:
response = client.chat.completions.create(
model=MODEL, messages=messages,
tools=tools, max_tokens=8000)
except Exception:
self._set_status(name, "shutdown")
return
msg = response.choices[0].message
messages.append(msg)
if response.choices[0].finish_reason != "tool_calls":
break
idle_requested = False
for block in msg.tool_calls or []:
if block.type == "function":
if block.function.name == "idle":
idle_requested = True
output = "Entering idle phase."
elif block.function.name == "claim_task":
args = json.loads(block.function.arguments)
output = self.task_mgr.claim(args["task_id"], name)
elif block.function.name == "send_message":
args = json.loads(block.function.arguments)
output = self.bus.send(name, args["to"], args["content"])
else:
args = json.loads(block.function.arguments)
dispatch = {"bash": lambda **kw: run_bash(kw["command"]),
"read_file": lambda **kw: run_read(kw["path"]),
"write_file": lambda **kw: run_write(kw["path"], kw["content"]),
"edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"])}
output = dispatch.get(block.function.name, lambda **kw: "Unknown")(**args)
print(f" [{name}] {block.function.name}: {str(output)[:120]}")
messages.append({"role": "tool", "tool_call_id": block.id, "name": block.function.name, "content": json.dumps({"result": str(output)})})
if idle_requested:
break
# -- IDLE PHASE: poll for messages and unclaimed tasks --
self._set_status(name, "idle")
resume = False
for _ in range(IDLE_TIMEOUT // max(POLL_INTERVAL, 1)):
time.sleep(POLL_INTERVAL)
inbox = self.bus.read_inbox(name)
if inbox:
for msg in inbox:
if msg.get("type") == "shutdown_request":
self._set_status(name, "shutdown")
return
messages.append({"role": "user", "content": json.dumps(msg)})
resume = True
break
unclaimed = []
for f in sorted(TASKS_DIR.glob("task_*.json")):
t = json.loads(f.read_text())
if t.get("status") == "pending" and not t.get("owner") and not t.get("blockedBy"):
unclaimed.append(t)
if unclaimed:
task = unclaimed[0]
self.task_mgr.claim(task["id"], name)
# Identity re-injection for compressed contexts
if len(messages) <= 3:
messages.insert(0, {"role": "system", "content":
f"<identity>You are '{name}', role: {role}, team: {team_name}.</identity>"})
messages.insert(1, {"role": "assistant", "content": f"I am {name}. Continuing."})
messages.append({"role": "user", "content":
f"<auto-claimed>Task #{task['id']}: {task['subject']}\n{task.get('description', '')}</auto-claimed>"})
messages.append({"role": "assistant", "content": f"Claimed task #{task['id']}. Working on it."})
resume = True
break
if not resume:
self._set_status(name, "shutdown")
return
self._set_status(name, "working")
def list_all(self) -> str:
if not self.config["members"]: return "No teammates."
lines = [f"Team: {self.config['team_name']}"]
for m in self.config["members"]:
lines.append(f" {m['name']} ({m['role']}): {m['status']}")
return "\n".join(lines)
def member_names(self) -> list:
return [m["name"] for m in self.config["members"]]
# === SECTION: global_instances ===
TODO = TodoManager()
SKILLS = SkillLoader(SKILLS_DIR)
TASK_MGR = TaskManager()
BG = BackgroundManager()
BUS = MessageBus()
TEAM = TeammateManager(BUS, TASK_MGR)
# === SECTION: system_prompt ===
SYSTEM = f"""You are a coding agent at {WORKDIR}. Use tools to solve tasks.
Prefer task_create/task_update/task_list for multi-step work. Use TodoWrite for short checklists.
Use task for subagent delegation. Use load_skill for specialized knowledge.
Skills: {SKILLS.descriptions()}"""
# === SECTION: shutdown_protocol (s10) ===
def handle_shutdown_request(teammate: str) -> str:
req_id = str(uuid.uuid4())[:8]
shutdown_requests[req_id] = {"target": teammate, "status": "pending"}
BUS.send("lead", teammate, "Please shut down.", "shutdown_request", {"request_id": req_id})
return f"Shutdown request {req_id} sent to '{teammate}'"
# === SECTION: plan_approval (s10) ===
def handle_plan_review(request_id: str, approve: bool, feedback: str = "") -> str:
req = plan_requests.get(request_id)
if not req: return f"Error: Unknown plan request_id '{request_id}'"
req["status"] = "approved" if approve else "rejected"
BUS.send("lead", req["from"], feedback, "plan_approval_response",
{"request_id": request_id, "approve": approve, "feedback": feedback})
return f"Plan {req['status']} for '{req['from']}'"
# === SECTION: tool_dispatch (s02) ===
TOOL_HANDLERS = {
"bash": lambda **kw: run_bash(kw["command"]),
"read_file": lambda **kw: run_read(kw["path"], kw.get("limit")),
"write_file": lambda **kw: run_write(kw["path"], kw["content"]),
"edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]),
"TodoWrite": lambda **kw: TODO.update(kw["items"]),
"task": lambda **kw: run_subagent(kw["prompt"], kw.get("agent_type", "Explore")),
"load_skill": lambda **kw: SKILLS.load(kw["name"]),
"compress": lambda **kw: "Compressing...",
"background_run": lambda **kw: BG.run(kw["command"], kw.get("timeout", 120)),
"check_background": lambda **kw: BG.check(kw.get("task_id")),
"task_create": lambda **kw: TASK_MGR.create(kw["subject"], kw.get("description", "")),
"task_get": lambda **kw: TASK_MGR.get(kw["task_id"]),
"task_update": lambda **kw: TASK_MGR.update(kw["task_id"], kw.get("status"), kw.get("add_blocked_by"), kw.get("add_blocks")),
"task_list": lambda **kw: TASK_MGR.list_all(),
"spawn_teammate": lambda **kw: TEAM.spawn(kw["name"], kw["role"], kw["prompt"]),
"list_teammates": lambda **kw: TEAM.list_all(),
"send_message": lambda **kw: BUS.send("lead", kw["to"], kw["content"], kw.get("msg_type", "message")),
"read_inbox": lambda **kw: json.dumps(BUS.read_inbox("lead"), indent=2),
"broadcast": lambda **kw: BUS.broadcast("lead", kw["content"], TEAM.member_names()),
"shutdown_request": lambda **kw: handle_shutdown_request(kw["teammate"]),
"plan_approval": lambda **kw: handle_plan_review(kw["request_id"], kw["approve"], kw.get("feedback", "")),
"idle": lambda **kw: "Lead does not idle.",
"claim_task": lambda **kw: TASK_MGR.claim(kw["task_id"], "lead"),
}
TOOLS = [
{
"type": "function",
"function": {
"name": "bash",
"description": "Run a shell command.",
"parameters": {
"type": "object",
"properties": {"command": {"type": "string"}},
"required": ["command"]
}
}
},
{
"type": "function",
"function": {
"name": "read_file",
"description": "Read file contents.",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string"},
"limit": {"type": "integer"}
},
"required": ["path"]
}
}
},
{
"type": "function",
"function": {
"name": "write_file",
"description": "Write content to file.",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string"},
"content": {"type": "string"}
},
"required": ["path", "content"]
}
}
},
{
"type": "function",
"function": {
"name": "edit_file",
"description": "Replace exact text in file.",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string"},
"old_text": {"type": "string"},
"new_text": {"type": "string"}
},
"required": ["path", "old_text", "new_text"]
}
}
},
{
"type": "function",
"function": {
"name": "TodoWrite",
"description": "Update task tracking list.",
"parameters": {
"type": "object",
"properties": {
"items": {
"type": "array",
"items": {
"type": "object",
"properties": {
"content": {"type": "string"},
"status": {"type": "string", "enum": ["pending", "in_progress", "completed"]},
"activeForm": {"type": "string"}
},
"required": ["content", "status", "activeForm"]
}
}
},
"required": ["items"]
}
}
},
{
"type": "function",
"function": {
"name": "task",
"description": "Spawn a subagent for isolated exploration or work.",
"parameters": {
"type": "object",
"properties": {
"prompt": {"type": "string"},
"agent_type": {"type": "string", "enum": ["Explore", "general-purpose"]}
},
"required": ["prompt"]
}
}
},
{
"type": "function",
"function": {
"name": "load_skill",
"description": "Load specialized knowledge by name.",
"parameters": {
"type": "object",
"properties": {"name": {"type": "string"}},
"required": ["name"]
}
}
},
{
"type": "function",
"function": {
"name": "compress",
"description": "Manually compress conversation context.",
"parameters": {
"type": "object",
"properties": {}
}
}
},
{
"type": "function",
"function": {
"name": "background_run",
"description": "Run command in background thread.",
"parameters": {
"type": "object",
"properties": {
"command": {"type": "string"},
"timeout": {"type": "integer"}
},
"required": ["command"]
}
}
},
{
"type": "function",
"function": {
"name": "check_background",
"description": "Check background task status.",
"parameters": {
"type": "object",
"properties": {"task_id": {"type": "string"}}
}
}
},
{
"type": "function",
"function": {
"name": "task_create",
"description": "Create a persistent file task.",
"parameters": {
"type": "object",
"properties": {
"subject": {"type": "string"},
"description": {"type": "string"}
},
"required": ["subject"]
}
}
},
{
"type": "function",
"function": {
"name": "task_get",
"description": "Get task details by ID.",
"parameters": {
"type": "object",
"properties": {"task_id": {"type": "integer"}},
"required": ["task_id"]
}
}
},
{
"type": "function",
"function": {
"name": "task_update",
"description": "Update task status or dependencies.",
"parameters": {
"type": "object",
"properties": {
"task_id": {"type": "integer"},
"status": {"type": "string", "enum": ["pending", "in_progress", "completed", "deleted"]},
"add_blocked_by": {"type": "array", "items": {"type": "integer"}},
"add_blocks": {"type": "array", "items": {"type": "integer"}}
},
"required": ["task_id"]
}
}
},
{
"type": "function",
"function": {
"name": "task_list",
"description": "List all tasks.",
"parameters": {
"type": "object",
"properties": {}
}
}
},
{
"type": "function",
"function": {
"name": "spawn_teammate",
"description": "Spawn a persistent autonomous teammate.",
"parameters": {
"type": "object",
"properties": {
"name": {"type": "string"},
"role": {"type": "string"},
"prompt": {"type": "string"}
},
"required": ["name", "role", "prompt"]
}
}
},
{
"type": "function",
"function": {
"name": "list_teammates",
"description": "List all teammates.",
"parameters": {
"type": "object",
"properties": {}
}
}
},
{
"type": "function",
"function": {
"name": "send_message",
"description": "Send a message to a teammate.",
"parameters": {
"type": "object",
"properties": {
"to": {"type": "string"},
"content": {"type": "string"},
"msg_type": {"type": "string", "enum": list(VALID_MSG_TYPES)}
},
"required": ["to", "content"]
}
}
},
{
"type": "function",
"function": {
"name": "read_inbox",
"description": "Read and drain the lead's inbox.",
"parameters": {
"type": "object",
"properties": {}
}
}
},
{
"type": "function",
"function": {
"name": "broadcast",
"description": "Send message to all teammates.",
"parameters": {
"type": "object",
"properties": {"content": {"type": "string"}},
"required": ["content"]
}
}
},
{
"type": "function",
"function": {
"name": "shutdown_request",
"description": "Request a teammate to shut down.",
"parameters": {
"type": "object",
"properties": {"teammate": {"type": "string"}},
"required": ["teammate"]
}
}
},
{
"type": "function",
"function": {
"name": "plan_approval",
"description": "Approve or reject a teammate's plan.",
"parameters": {
"type": "object",
"properties": {
"request_id": {"type": "string"},
"approve": {"type": "boolean"},
"feedback": {"type": "string"}
},
"required": ["request_id", "approve"]
}
}
},
{
"type": "function",
"function": {
"name": "idle",
"description": "Enter idle state.",
"parameters": {
"type": "object",
"properties": {}
}
}
},
{
"type": "function",
"function": {
"name": "claim_task",
"description": "Claim a task from the board.",
"parameters": {
"type": "object",
"properties": {"task_id": {"type": "integer"}},
"required": ["task_id"]
}
}
}
]
# === SECTION: agent_loop ===
def agent_loop(messages: list):
rounds_without_todo = 0
while True:
# s06: compression pipeline
microcompact(messages)
if estimate_tokens(messages) > TOKEN_THRESHOLD:
print("[auto-compact triggered]")
messages[:] = auto_compact(messages)
# s08: drain background notifications
notifs = BG.drain()
if notifs:
txt = "\n".join(f"[bg:{n['task_id']}] {n['status']}: {n['result']}" for n in notifs)
messages.append({"role": "user", "content": f"<background-results>\n{txt}\n</background-results>"})
# s10: check lead inbox
inbox = BUS.read_inbox("lead")
if inbox:
messages.append({"role": "user", "content": f"<inbox>{json.dumps(inbox, indent=2)}</inbox>"})
# LLM call
response = client.chat.completions.create(
model=MODEL, messages=messages,
tools=TOOLS # , max_tokens=8000,
)
msg = response.choices[0].message
messages.append(msg)
if response.choices[0].finish_reason != "tool_calls":
return
# Tool execution
used_todo = False
manual_compress = False
for block in msg.tool_calls or []:
if block.type == "function":
if block.function.name == "compress":
manual_compress = True
output = "Compressing..."
else:
handler = TOOL_HANDLERS.get(block.function.name)
try:
args = json.loads(block.function.arguments)
output = handler(**args) if handler else f"Unknown tool: {block.function.name}"
except Exception as e:
output = f"Error: {e}"
print(f"> {block.function.name}: {str(output)[:200]}")
messages.append({"role": "tool", "tool_call_id": block.id, "name": block.function.name, "content": json.dumps({"result": str(output)})})
if block.function.name == "TodoWrite":
used_todo = True
# s03: nag reminder (only when todo workflow is active)
rounds_without_todo = 0 if used_todo else rounds_without_todo + 1
if TODO.has_open_items() and rounds_without_todo >= 3:
messages.append({"role": "user", "content": "<reminder>Update your todos.</reminder>"})
rounds_without_todo = 0
# s06: manual compress
if manual_compress:
print("[manual compact]")
messages[:] = auto_compact(messages)
# === SECTION: repl ===
if __name__ == "__main__":
history = []
while True:
try:
query = input("\033[36ms_full >> \033[0m")
except (EOFError, KeyboardInterrupt):
break
if query.strip().lower() in ("q", "exit", ""):
break
if query.strip() == "/compact":
if history:
print("[manual compact via /compact]")
history[:] = auto_compact(history)
continue
if query.strip() == "/tasks":
print(TASK_MGR.list_all())
continue
if query.strip() == "/team":
print(TEAM.list_all())
continue
if query.strip() == "/inbox":
print(json.dumps(BUS.read_inbox("lead"), indent=2))
continue
history.append({"role": "user", "content": query})
# system=SYSTEM,
agent_loop(history)