How to Design an OpenHarness Style Agent Runtime with Tools, Memory, Permissions, Skills, and Multi-Agent Coordination
A comprehensive tutorial that builds an OpenHarness-style agent harness from scratch, covering tool use, permissions, memory, skills, context compaction, retry logic, cost tracking, and multi-agent coordination, with fully runnable code.
In this tutorial, we build OpenHarness from scratch to better understand how a practical agent harness works. We recreate the major building blocks that make an agent system useful, including tool use, typed tool schemas, permissions, lifecycle hooks, memory, skills, context compaction, retry logic, cost tracking, and multi-agent coordination. Instead of treating an agent framework as a black box, we expose the full control flow and watch how the harness receives a user task, lets the model decide the next action, validates and executes tool calls, returns observations, and continues the loop until the task is complete. We also keep the implementation runnable so we can experiment with the architecture without needing API keys or complex infrastructure.
Setting Up the OpenHarness Core
Copy CodeCopiedUse a different Browser
from future import annotations
import asyncio
import contextlib
import dataclasses
import fnmatch
import io
import json
import os
import re
import tempfile
import textwrap
import time
import traceback
import types
import typing
import urllib.error
import urllib.request
from dataclasses import dataclass, field
from enum import Enum
MISSING = dataclasses.MISSING
UnionType = getattr(types, "UnionType", None)
def run_async(coro):
"""Run a coroutine to completion from sync code, even inside a live loop."""
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = None
if loop is not None and loop.is_running():
try:
import nest_asyncio
nest_asyncio.apply()
return loop.run_until_complete(coro)
except Exception:
import threading
box: dict = {}
def _runner():
new_loop = asyncio.new_event_loop()
try:
box["value"] = new_loop.run_until_complete(coro)
finally:
new_loop.close()
t = threading.Thread(target=_runner)
t.start()
t.join()
return box["value"]
return asyncio.run(coro)
BANNER = "═" * 78
def banner(title: str) -> None:
print("\n" + BANNER)
print(f" {title}")
print(BANNER)
def explain(title: str, body: str) -> None:
banner(title)
print(textwrap.fill(textwrap.dedent(body).strip(), width=78))
print("-" * 78)
def short(text: str, n: int = 240) -> str:
text = " ".join(str(text).split())
return text if len(text) "Usage":
return Usage(self.input_tokens + other.input_tokens,
self.output_tokens + other.output_tokens)
@dataclass
class ToolCall:
id: str
name: str
arguments: dict
@dataclass
class AssistantTurn:
"""One turn produced by the model: some text + zero or more tool calls."""
text: str = ""
tool_calls: list = field(default_factory=list)
stop_reason: str = "end_turn"
usage: Usage = field(default_factory=Usage)
@dataclass
class Message:
"""A single message in the running conversation transcript."""
role: str
content: str = ""
tool_calls: list = field(default_factory=list)
tool_call_id: str = ""
name: str = ""
def count_tokens(text: str) -> int:
"""Cheap, provider-agnostic token estimate (~4 chars/token)."""
if not text:
return 0
return max(1, round(len(text) / 4))
PRICE_BOOK = {
"mock-sonnet": (3.00, 15.00),
"claude-sonnet-4": (3.00, 15.00),
"gpt-4.1": (2.00, 8.00),
"default": (1.00, 3.00),
}
class CostMeter:
"""Accumulates token usage and converts it to an estimated dollar cost."""
def init(self, model: str):
self.model = model
self.total = Usage()
self.calls = 0
def add(self, usage: Usage) -> None:
self.total = self.total + usage
self.calls += 1
@property
def dollars(self) -> float:
pin, pout = PRICE_BOOK.get(self.model, PRICE_BOOK["default"])
return (self.total.input_tokens / 1e6) * pin + \
(self.total.output_tokens / 1e6) * pout
def summary(self) -> str:
return (f"{self.calls} model call(s) | "
f"in={self.total.input_tokens} out={self.total.output_tokens} tok | "
f"~${self.dollars:.5f} ({self.model})")
def fld(description: str = "", default=MISSING, default_factory=MISSING):
"""Declare a tool-input field with a description (and optional default)."""
md = {"description": description}
if default_factory is not MISSING:
return field(default_factory=default_factory, metadata=md)
if default is not MISSING:
return field(default=default, metadata=md)
return field(metadata=md)
def _is_optional(t) -> bool:
origin = typing.get_origin(t)
if origin is typing.Union or (UnionType is not None and origin is UnionType):
return type(None) in typing.get_args(t)
return False
def _py_to_json_type(t) -> dict:
origin = typing.get_origin(t)
if origin is typing.Union or (UnionType is not None and origin is UnionType):
args = [a for a in typing.get_args(t) if a is not type(None)]
return _py_to_json_type(args[0]) if args else {"type": "string"}
if t is str:
return {"type": "string"}
if t is bool:
return {"type": "boolean"}
if t is int:
return {"type": "integer"}
if t is float:
return {"type": "number"}
if origin is list or t is list:
args = typing.get_args(t)
item = _py_to_json_type(args[0]) if args else {"type": "string"}
return {"type": "array", "items": item}
if origin is dict or t is dict:
return {"type": "object"}
return {"type": "string"}
def build_json_schema(model_cls) -> dict:
"""Turn a dataclass input model into a JSON Schema (object with properties)."""
hints = typing.get_type_hints(model_cls)
props, required = {}, []
for f in dataclasses.fields(model_cls):
t = hints.get(f.name, str)
js = dict(_py_to_json_type(t))
desc = f.metadata.get("description", "")
if desc:
js["description"] = desc
props[f.name] = js
has_default = (f.default is not MISSING) or (f.default_factory is not MISSING)
if not has_default and not _is_optional(t):
required.append(f.name)
schema = {"type": "object", "properties": props}
if required:
schema["required"] = required
return schema
def _coerce(v, t):
origin = typing.get_origin(t)
if origin is typing.Union or (UnionType is not None and origin is UnionType):
if v is None:
return None
args = [a for a in typing.get_args(t) if a is not type(None)]
return _coerce(v, args[0]) if args else v
if t is str:
return v if isinstance(v, str) else str(v)
if t is bool:
if isinstance(v, bool):
return v
if isinstance(v, str):
return v.strip().lower() in ("1", "true", "yes", "y", "on")
return bool(v)
if t is int:
return int(v)
if t is float:
return float(v)
if origin is list or t is list:
args = typing.get_args(t)
it = args[0] if args else str
if not isinstance(v, list):
v = [v]
return [_coerce(x, it) for x in v]
if origin is dict or t is dict:
return dict(v) if v else {}
return v
def instantiate(model_cls, raw: dict):
"""Validate + coerce raw JSON args into a typed input instance."""
hints = typing.get_type_hints(model_cls)
raw = raw or {}
kwargs = {}
for f in dataclasses.fields(model_cls):
t = hints.get(f.name, str)
if f.name in raw and raw[f.name] is not None:
try:
kwargs[f.name] = _coerce(raw[f.name], t)
except (TypeError, ValueError) as e:
raise ValueError(f"Bad value for '{f.name}': {e}")
elif f.default is not MISSING:
kwargs[f.name] = f.default
elif f.default_factory is not MISSING:
kwargs[f.name] = f.default_factory()
elif _is_optional(t):
kwargs[f.name] = None
else:
raise ValueError(f"Missing required argument '{f.name}'")
return model_cls(kwargs)
class PermissionKind(Enum):
"""How dangerous a tool is — drives the default permission policy."""
READ = "read"
WRITE = "write"
EXECUTE = "execute"
META = "meta"
@dataclass
class ToolResult:
output: str
is_error: bool = False
metadata: dict = field(default_factory=dict)
class ToolContext:
"""Everything a tool may need at runtime (services + shared state)."""
def init(self, services):
self.dict.update(services)
class BaseTool:
"""Base class for all tools. Subclasses set name/description/InputModel/kind
and implement execute. Schema + validation are handled here."""
name: str = "base"
description: str = ""
InputModel = None
kind: PermissionKind = PermissionKind.READ
def schema(self) -> dict:
return {
"name": self.name,
"description": self.description,
"kind": self.kind.value,
"input_schema": (build_json_schema(self.InputModel)
if self.InputModel else
{"type": "object", "properties": {}}),
}
async def run(self, raw_args: dict, ctx: ToolContext) -> ToolResult:
args = instantiate(self.InputModel, raw_args) if self.InputModel else None
return await self.execute(args, ctx)
async def execute(self, args, ctx: ToolContext) -> ToolResult:
raise NotImplementedError
class ToolRegistry:
def init(self):
self._tools: dict = {}
def register(self, tool: BaseTool) -> "ToolRegistry":
self._tools[tool.name] = tool
return self
def get(self, name: str) -> BaseTool | None:
return self._tools.get(name)
def schemas(self) -> list:
return [t.schema() for t in self._tools.values()]
def names(self) -> list:
return list(self._tools)
class VirtualFS:
"""In-memory filesystem. Keeps the tutorial safe & deterministic in Colab."""
def init(self):
self.files: dict = {}
@staticmethod
def norm(path: str) -> str:
return path.lstrip("./").strip()
def write(self, path: str, content: str) -> None:
self.files[self.norm(path)] = content
def read(self, path: str) -> str:
return self.files[self.norm(path)]
def exists(self, path: str) -> bool:
return self.norm(path) in self.files
def list(self, pattern: str = "*") -> list:
return sorted(p for p in self.files if fnmatch.fnmatch(p, pattern))
def tree(self) -> str:
if not self.files:
return "(empty)"
return "\n".join(f" {p} ({len(c)} bytes)"
for p, c in sorted(self.files.items()))
class PermissionMode(Enum):
DEFAULT = "default"
AUTO = "auto"
PLAN = "plan"
@dataclass
class PermissionDecision:
action: str
reason: str = ""
SENSITIVE_PATTERNS = [
"/etc/*", "*/.ssh/*", "*.pem", "*id_rsa*", "*/.aws/*",
"*credentials*", "*.env", "*/secrets/*",
]
class PermissionChecker:
def init(self, mode: PermissionMode = PermissionMode.DEFAULT,
path_rules: list | None = None,
denied_commands: list | None = None):
self.mode = mode
self.path_rules = path_rules or []
self.denied_commands = denied_commands or []
def _check_path(self, path: str) -> PermissionDecision | None:
for pat in SENSITIVE_PATTERNS:
if fnmatch.fnmatch(path, pat):
return PermissionDecision("deny", f"sensitive path '{path}' ({pat})")
for rule in self.path_rules:
if fnmatch.fnmatch(path, rule["pattern"]):
if rule.get("allow", True):
return PermissionDecision("allow", f"path rule allows '{rule['pattern']}'")
return PermissionDecision("deny", f"path rule blocks '{rule['pattern']}'")
return None
def _check_command(self, command: str) -> PermissionDecision | None:
for pat in self.denied_commands:
if re.search(pat, command):
return PermissionDecision("deny", f"denied command matched /{pat}/")
return None
def check(self, tool: BaseTool, args: dict) -> PermissionDecision:
if "path" in args and tool.kind in (PermissionKind.WRITE, PermissionKind.EXECUTE):
d = self._check_path(str(args["path"]))
if d:
return d
if "command" in args:
d = self._check_command(str(args["command"]))
if d:
return d
if self.mode is PermissionMode.AUTO:
return PermissionDecision("allow", "auto mode")
if self.mode is PermissionMode.PLAN:
if tool.kind in (PermissionKind.WRITE, PermissionKind.EXECUTE):
return PermissionDecision("deny", "plan mode blocks writes/executes")
return PermissionDecision("allow", "plan mode allows reads")
if tool.kind in (PermissionKind.READ, PermissionKind.META):
return PermissionDecision("allow", "safe tool")
return PermissionDecision("ask", f"{tool.kind.value} requires approval")
async def auto_approve(tool, args, reason) -> bool:
print(f" approval needed: {tool.name} ({reason}) -> [auto-approved]")
return True
async def interactive_approve(tool, args, reason) -> bool:
ans = input(f" Allow {tool.name}({short(json.dumps(args), 80)})? [y/N] ")
return ans.strip().lower().startswith("y")
@dataclass
class HookOutcome:
blocked: bool = False
reason: str = ""
arguments: dict | None = None
class HookManager:
"""Lifecycle events around every tool call (like PreToolUse/PostToolUse)."""
def init(self):
self.pre: list = []
self.post: list = []
def add_pre(self, fn):
self.pre.append(fn); return self
def add_post(self, fn):
self.post.append(fn); return self
def run_pre(self, call: ToolCall, tool: BaseTool, ctx: ToolCon
[truncated for AI cost control]