AI News HubLIVE
In-site rewrite6 min read

How to Design an OpenHarness Style Agent Runtime with Tools, Memory, Permissions, Skills, and Multi-Agent Coordination

A comprehensive tutorial that builds an OpenHarness-style agent harness from scratch, covering tool use, permissions, memory, skills, context compaction, retry logic, cost tracking, and multi-agent coordination, with fully runnable code.

SourceMarkTechPostAuthor: Sana Hassan

In this tutorial, we build OpenHarness from scratch to better understand how a practical agent harness works. We recreate the major building blocks that make an agent system useful, including tool use, typed tool schemas, permissions, lifecycle hooks, memory, skills, context compaction, retry logic, cost tracking, and multi-agent coordination. Instead of treating an agent framework as a black box, we expose the full control flow and watch how the harness receives a user task, lets the model decide the next action, validates and executes tool calls, returns observations, and continues the loop until the task is complete. We also keep the implementation runnable so we can experiment with the architecture without needing API keys or complex infrastructure.

Setting Up the OpenHarness Core

Copy CodeCopiedUse a different Browser

from future import annotations import asyncio import contextlib import dataclasses import fnmatch import io import json import os import re import tempfile import textwrap import time import traceback import types import typing import urllib.error import urllib.request from dataclasses import dataclass, field from enum import Enum MISSING = dataclasses.MISSING UnionType = getattr(types, "UnionType", None) def run_async(coro): """Run a coroutine to completion from sync code, even inside a live loop.""" try: loop = asyncio.get_running_loop() except RuntimeError: loop = None if loop is not None and loop.is_running(): try: import nest_asyncio nest_asyncio.apply() return loop.run_until_complete(coro) except Exception: import threading box: dict = {} def _runner(): new_loop = asyncio.new_event_loop() try: box["value"] = new_loop.run_until_complete(coro) finally: new_loop.close() t = threading.Thread(target=_runner) t.start() t.join() return box["value"] return asyncio.run(coro) BANNER = "═" * 78 def banner(title: str) -> None: print("\n" + BANNER) print(f" {title}") print(BANNER) def explain(title: str, body: str) -> None: banner(title) print(textwrap.fill(textwrap.dedent(body).strip(), width=78)) print("-" * 78) def short(text: str, n: int = 240) -> str: text = " ".join(str(text).split()) return text if len(text) "Usage": return Usage(self.input_tokens + other.input_tokens, self.output_tokens + other.output_tokens) @dataclass class ToolCall: id: str name: str arguments: dict @dataclass class AssistantTurn: """One turn produced by the model: some text + zero or more tool calls.""" text: str = "" tool_calls: list = field(default_factory=list) stop_reason: str = "end_turn" usage: Usage = field(default_factory=Usage) @dataclass class Message: """A single message in the running conversation transcript.""" role: str content: str = "" tool_calls: list = field(default_factory=list) tool_call_id: str = "" name: str = "" def count_tokens(text: str) -> int: """Cheap, provider-agnostic token estimate (~4 chars/token).""" if not text: return 0 return max(1, round(len(text) / 4)) PRICE_BOOK = { "mock-sonnet": (3.00, 15.00), "claude-sonnet-4": (3.00, 15.00), "gpt-4.1": (2.00, 8.00), "default": (1.00, 3.00), } class CostMeter: """Accumulates token usage and converts it to an estimated dollar cost.""" def init(self, model: str): self.model = model self.total = Usage() self.calls = 0 def add(self, usage: Usage) -> None: self.total = self.total + usage self.calls += 1 @property def dollars(self) -> float: pin, pout = PRICE_BOOK.get(self.model, PRICE_BOOK["default"]) return (self.total.input_tokens / 1e6) * pin + \ (self.total.output_tokens / 1e6) * pout def summary(self) -> str: return (f"{self.calls} model call(s) | " f"in={self.total.input_tokens} out={self.total.output_tokens} tok | " f"~${self.dollars:.5f} ({self.model})") def fld(description: str = "", default=MISSING, default_factory=MISSING): """Declare a tool-input field with a description (and optional default).""" md = {"description": description} if default_factory is not MISSING: return field(default_factory=default_factory, metadata=md) if default is not MISSING: return field(default=default, metadata=md) return field(metadata=md) def _is_optional(t) -> bool: origin = typing.get_origin(t) if origin is typing.Union or (UnionType is not None and origin is UnionType): return type(None) in typing.get_args(t) return False def _py_to_json_type(t) -> dict: origin = typing.get_origin(t) if origin is typing.Union or (UnionType is not None and origin is UnionType): args = [a for a in typing.get_args(t) if a is not type(None)] return _py_to_json_type(args[0]) if args else {"type": "string"} if t is str: return {"type": "string"} if t is bool: return {"type": "boolean"} if t is int: return {"type": "integer"} if t is float: return {"type": "number"} if origin is list or t is list: args = typing.get_args(t) item = _py_to_json_type(args[0]) if args else {"type": "string"} return {"type": "array", "items": item} if origin is dict or t is dict: return {"type": "object"} return {"type": "string"} def build_json_schema(model_cls) -> dict: """Turn a dataclass input model into a JSON Schema (object with properties).""" hints = typing.get_type_hints(model_cls) props, required = {}, [] for f in dataclasses.fields(model_cls): t = hints.get(f.name, str) js = dict(_py_to_json_type(t)) desc = f.metadata.get("description", "") if desc: js["description"] = desc props[f.name] = js has_default = (f.default is not MISSING) or (f.default_factory is not MISSING) if not has_default and not _is_optional(t): required.append(f.name) schema = {"type": "object", "properties": props} if required: schema["required"] = required return schema def _coerce(v, t): origin = typing.get_origin(t) if origin is typing.Union or (UnionType is not None and origin is UnionType): if v is None: return None args = [a for a in typing.get_args(t) if a is not type(None)] return _coerce(v, args[0]) if args else v if t is str: return v if isinstance(v, str) else str(v) if t is bool: if isinstance(v, bool): return v if isinstance(v, str): return v.strip().lower() in ("1", "true", "yes", "y", "on") return bool(v) if t is int: return int(v) if t is float: return float(v) if origin is list or t is list: args = typing.get_args(t) it = args[0] if args else str if not isinstance(v, list): v = [v] return [_coerce(x, it) for x in v] if origin is dict or t is dict: return dict(v) if v else {} return v def instantiate(model_cls, raw: dict): """Validate + coerce raw JSON args into a typed input instance.""" hints = typing.get_type_hints(model_cls) raw = raw or {} kwargs = {} for f in dataclasses.fields(model_cls): t = hints.get(f.name, str) if f.name in raw and raw[f.name] is not None: try: kwargs[f.name] = _coerce(raw[f.name], t) except (TypeError, ValueError) as e: raise ValueError(f"Bad value for '{f.name}': {e}") elif f.default is not MISSING: kwargs[f.name] = f.default elif f.default_factory is not MISSING: kwargs[f.name] = f.default_factory() elif _is_optional(t): kwargs[f.name] = None else: raise ValueError(f"Missing required argument '{f.name}'") return model_cls(kwargs) class PermissionKind(Enum): """How dangerous a tool is — drives the default permission policy.""" READ = "read" WRITE = "write" EXECUTE = "execute" META = "meta" @dataclass class ToolResult: output: str is_error: bool = False metadata: dict = field(default_factory=dict) class ToolContext: """Everything a tool may need at runtime (services + shared state).""" def init(self, services): self.dict.update(services) class BaseTool: """Base class for all tools. Subclasses set name/description/InputModel/kind and implement execute. Schema + validation are handled here.""" name: str = "base" description: str = "" InputModel = None kind: PermissionKind = PermissionKind.READ def schema(self) -> dict: return { "name": self.name, "description": self.description, "kind": self.kind.value, "input_schema": (build_json_schema(self.InputModel) if self.InputModel else {"type": "object", "properties": {}}), } async def run(self, raw_args: dict, ctx: ToolContext) -> ToolResult: args = instantiate(self.InputModel, raw_args) if self.InputModel else None return await self.execute(args, ctx) async def execute(self, args, ctx: ToolContext) -> ToolResult: raise NotImplementedError class ToolRegistry: def init(self): self._tools: dict = {} def register(self, tool: BaseTool) -> "ToolRegistry": self._tools[tool.name] = tool return self def get(self, name: str) -> BaseTool | None: return self._tools.get(name) def schemas(self) -> list: return [t.schema() for t in self._tools.values()] def names(self) -> list: return list(self._tools) class VirtualFS: """In-memory filesystem. Keeps the tutorial safe & deterministic in Colab.""" def init(self): self.files: dict = {} @staticmethod def norm(path: str) -> str: return path.lstrip("./").strip() def write(self, path: str, content: str) -> None: self.files[self.norm(path)] = content def read(self, path: str) -> str: return self.files[self.norm(path)] def exists(self, path: str) -> bool: return self.norm(path) in self.files def list(self, pattern: str = "*") -> list: return sorted(p for p in self.files if fnmatch.fnmatch(p, pattern)) def tree(self) -> str: if not self.files: return "(empty)" return "\n".join(f" {p} ({len(c)} bytes)" for p, c in sorted(self.files.items())) class PermissionMode(Enum): DEFAULT = "default" AUTO = "auto" PLAN = "plan" @dataclass class PermissionDecision: action: str reason: str = "" SENSITIVE_PATTERNS = [ "/etc/*", "*/.ssh/*", "*.pem", "*id_rsa*", "*/.aws/*", "*credentials*", "*.env", "*/secrets/*", ] class PermissionChecker: def init(self, mode: PermissionMode = PermissionMode.DEFAULT, path_rules: list | None = None, denied_commands: list | None = None): self.mode = mode self.path_rules = path_rules or [] self.denied_commands = denied_commands or [] def _check_path(self, path: str) -> PermissionDecision | None: for pat in SENSITIVE_PATTERNS: if fnmatch.fnmatch(path, pat): return PermissionDecision("deny", f"sensitive path '{path}' ({pat})") for rule in self.path_rules: if fnmatch.fnmatch(path, rule["pattern"]): if rule.get("allow", True): return PermissionDecision("allow", f"path rule allows '{rule['pattern']}'") return PermissionDecision("deny", f"path rule blocks '{rule['pattern']}'") return None def _check_command(self, command: str) -> PermissionDecision | None: for pat in self.denied_commands: if re.search(pat, command): return PermissionDecision("deny", f"denied command matched /{pat}/") return None def check(self, tool: BaseTool, args: dict) -> PermissionDecision: if "path" in args and tool.kind in (PermissionKind.WRITE, PermissionKind.EXECUTE): d = self._check_path(str(args["path"])) if d: return d if "command" in args: d = self._check_command(str(args["command"])) if d: return d if self.mode is PermissionMode.AUTO: return PermissionDecision("allow", "auto mode") if self.mode is PermissionMode.PLAN: if tool.kind in (PermissionKind.WRITE, PermissionKind.EXECUTE): return PermissionDecision("deny", "plan mode blocks writes/executes") return PermissionDecision("allow", "plan mode allows reads") if tool.kind in (PermissionKind.READ, PermissionKind.META): return PermissionDecision("allow", "safe tool") return PermissionDecision("ask", f"{tool.kind.value} requires approval") async def auto_approve(tool, args, reason) -> bool: print(f" approval needed: {tool.name} ({reason}) -> [auto-approved]") return True async def interactive_approve(tool, args, reason) -> bool: ans = input(f" Allow {tool.name}({short(json.dumps(args), 80)})? [y/N] ") return ans.strip().lower().startswith("y") @dataclass class HookOutcome: blocked: bool = False reason: str = "" arguments: dict | None = None class HookManager: """Lifecycle events around every tool call (like PreToolUse/PostToolUse).""" def init(self): self.pre: list = [] self.post: list = [] def add_pre(self, fn): self.pre.append(fn); return self def add_post(self, fn): self.post.append(fn); return self def run_pre(self, call: ToolCall, tool: BaseTool, ctx: ToolCon

[truncated for AI cost control]