AI News HubLIVE
站内改写5 min read

An Implementation of the Microsoft Agent Governance Toolkit for Safe AI Agent Tool Use with Policies, Approvals, Audit Logs, and Risk Controls

In this tutorial, we build a governed AI-agent workflow using Microsoft’s Agent Governance Toolkit as the reference point. We create a Colab-ready implementation where agents do not directly execute tools; instead, every action first passes through a governance layer that checks the agent’s identity, trust score, risk tier, requested tool, action type, sensitivity level, and policy rules. We define a YAML-based policy that controls destructive database operations, external email sending, shell execution, access to sensitive data, and financial transfers. We then wrap each tool with governance logic so that actions can be allowed, denied, sandboxed, or routed through an approval step before execution. We also generate tamper-evident audit records, run policy tests, activate a kill switch, summarize governance decisions, and visualize the relationships between agents, tools, rules, and outcomes as a graph.

SourceMarkTechPostAuthor: Sana Hassan

In this tutorial, we build a governed AI-agent workflow using Microsoft’s Agent Governance Toolkit as the reference point. We create a Colab-ready implementation where agents do not directly execute tools; instead, every action first passes through a governance layer that checks the agent’s identity, trust score, risk tier, requested tool, action type, sensitivity level, and policy rules. We define a YAML-based policy that controls destructive database operations, external email sending, shell execution, access to sensitive data, and financial transfers. We then wrap each tool with governance logic so that actions can be allowed, denied, sandboxed, or routed through an approval step before execution. We also generate tamper-evident audit records, run policy tests, activate a kill switch, summarize governance decisions, and visualize the relationships between agents, tools, rules, and outcomes as a graph.

Copy CodeCopiedUse a different Browser

import os import sys import json import time import uuid import hmac import yaml import hashlib import random import shutil import subprocess from dataclasses import dataclass, asdict from datetime import datetime, timezone from typing import Any, Dict, List, Callable, Optional def pip_install(*packages): subprocess.run( [sys.executable, "-m", "pip", "install", "-q", *packages], check=False ) pip_install("pyyaml", "pandas", "networkx", "matplotlib", "rich") pip_install("agent-governance-toolkit[full]") from rich.console import Console from rich.table import Table from rich.panel import Panel from rich import box import pandas as pd import networkx as nx import matplotlib.pyplot as plt console = Console() REPO_URL = "https://github.com/microsoft/agent-governance-toolkit" REPO_DIR = "/content/agent-governance-toolkit" if not os.path.exists(REPO_DIR): subprocess.run(["git", "clone", "--depth", "1", REPO_URL, REPO_DIR], check=False) official_govern = None official_import_error = None try: from agentmesh.governance import govern as official_govern except Exception as e: official_import_error = repr(e)

We set up the Colab environment by installing the required libraries and importing everything needed for policy handling, auditing, visualization, and data analysis. We also clone the Microsoft Agent Governance Toolkit repository to keep the notebook connected to the original project. We then try to import the official governance function, while keeping the tutorial runnable even if the preview package changes.

Copy CodeCopiedUse a different Browser

POLICY_PATH = "/content/advanced_agent_policy.yaml" policy_yaml = """ apiVersion: governance.toolkit/v1 name: advanced-colab-governance-policy default_action: allow metadata: owner: ai-platform-team environment: tutorial description: > Demonstrates deterministic governance controls for AI agent tool calls. rules:

  • name: block-destructive-database-actions

description: "Agents must not perform destructive database operations." condition: "action.type in ['drop_table', 'delete_table', 'truncate_table']" action: deny severity: critical owasp_risk: "Tool misuse / Excessive agency"

  • name: require-human-approval-for-email

description: "External email requires approval before execution." condition: "action.type == 'send_email' and action.recipient_domain != 'internal.local'" action: require_approval approvers: ["security-team", "business-owner"] severity: high owasp_risk: "Goal hijacking / Unauthorized action"

  • name: sandbox-shell-execution

description: "Shell commands must run in a sandbox with blocked dangerous commands." condition: "action.type == 'shell_exec'" action: sandbox sandbox: blocked_terms: ["rm -rf", "curl http", "wget http", "chmod 777", "sudo"] max_runtime_seconds: 2 severity: high owasp_risk: "Tool misuse / Unsafe execution"

  • name: deny-low-trust-agent-sensitive-data

description: "Low-trust agents cannot access sensitive data." condition: "identity.trust_score 1000" action: require_approval approvers: ["finance-controller"] severity: critical owasp_risk: "Excessive agency / Business process compromise"

  • name: rate-limit-high-risk-agent

description: "High-risk agents are blocked from repeated autonomous actions." condition: "identity.risk_tier == 'high' and action.autonomous == True" action: deny severity: medium owasp_risk: "Rogue agent / Cascading failure" """ with open(POLICY_PATH, "w") as f: f.write(policy_yaml) with open(POLICY_PATH, "r") as f: policy = yaml.safe_load(f)

We create a YAML governance policy that defines how agent actions should be handled before execution. We add rules to block destructive database actions, require approval for external emails and financial transfers, sandbox shell commands, and restrict low-trust agents from sensitive data. We then save and reload this policy so the rest of the tutorial can use it as the main governance configuration.

Copy CodeCopiedUse a different Browser

@dataclass class AgentIdentity: agent_id: str name: str role: str owner: str trust_score: float risk_tier: str scopes: List[str] @dataclass class GovernanceDecision: decision_id: str timestamp: str policy_name: str agent_id: str agent_name: str tool_name: str action: Dict[str, Any] decision: str matched_rule: Optional[str] severity: Optional[str] reason: str approved_by: Optional[str] previous_hash: str record_hash: str class GovernanceDenied(Exception): pass class ApprovalRequired(Exception): pass class SandboxViolation(Exception): pass class DotDict(dict): def getattr(self, item): value = self.get(item) if isinstance(value, dict): return DotDict(value) return value def safe_eval_condition(condition: str, action: Dict[str, Any], identity: AgentIdentity) -> bool: safe_globals = { "builtins": {}, "True": True, "False": False, "None": None, } safe_locals = { "action": DotDict(action), "identity": DotDict(asdict(identity)), } try: return bool(eval(condition, safe_globals, safe_locals)) except Exception as e: return False

We define the core data structures for representing agent identities, governance decisions, and governance-related exceptions. We also create a small dot-access dictionary helper so that policy conditions can read values such as action.type and identity.trust_score. We then build a safe condition evaluator that checks whether each policy rule matches the current agent action.

Copy CodeCopiedUse a different Browser

class TamperEvidentAuditLog: def init(self, secret: bytes = b"tutorial-secret-key"): self.records: List[GovernanceDecision] = [] self.secret = secret self.last_hash = "GENESIS" def _hash_record(self, payload: Dict[str, Any], previous_hash: str) -> str: canonical = json.dumps( {"payload": payload, "previous_hash": previous_hash}, sort_keys=True, default=str ).encode() return hmac.new(self.secret, canonical, hashlib.sha256).hexdigest() def append( self, policy_name: str, identity: AgentIdentity, tool_name: str, action: Dict[str, Any], decision: str, matched_rule: Optional[str], severity: Optional[str], reason: str, approved_by: Optional[str] = None ) -> GovernanceDecision: base_payload = { "decision_id": str(uuid.uuid4()), "timestamp": datetime.now(timezone.utc).isoformat(), "policy_name": policy_name, "agent_id": identity.agent_id, "agent_name": identity.name, "tool_name": tool_name, "action": action, "decision": decision, "matched_rule": matched_rule, "severity": severity, "reason": reason, "approved_by": approved_by, } record_hash = self._hash_record(base_payload, self.last_hash) record = GovernanceDecision( **base_payload, previous_hash=self.last_hash, record_hash=record_hash ) self.records.append(record) self.last_hash = record_hash return record def verify(self) -> bool: previous = "GENESIS" for r in self.records: payload = asdict(r) record_hash = payload.pop("record_hash") previous_hash = payload.pop("previous_hash") if previous_hash != previous: return False expected = self._hash_record(payload, previous_hash) if expected != record_hash: return False previous = record_hash return True def to_dataframe(self) -> pd.DataFrame: return pd.DataFrame([asdict(r) for r in self.records]) audit_log = TamperEvidentAuditLog()

We implement a tamper-evident audit log that records every governance decision made by the system. We use chained hashes, so each new record depends on the previous record, making changes easier to detect. We also add methods to verify the audit chain and convert the records into a dataframe for later analysis.

Copy CodeCopiedUse a different Browser

class TutorialGovernanceEngine: def init(self, policy: Dict[str, Any], audit_log: TamperEvidentAuditLog): self.policy = policy self.audit_log = audit_log self.kill_switch_enabled = False self.error_budget = 5 self.recent_denials = 0 def activate_kill_switch(self): self.kill_switch_enabled = True def deactivate_kill_switch(self): self.kill_switch_enabled = False def evaluate( self, identity: AgentIdentity, tool_name: str, action: Dict[str, Any] ) -> GovernanceDecision: if self.kill_switch_enabled: return self.audit_log.append( policy_name=self.policy["name"], identity=identity, tool_name=tool_name, action=action, decision="deny", matched_rule="global-kill-switch", severity="critical", reason="Global governance kill switch is active." ) for rule in self.policy.get("rules", []): condition = rule.get("condition", "") if safe_eval_condition(condition, action, identity): rule_action = rule.get("action", "deny") matched_rule = rule.get("name") severity = rule.get("severity") description = rule.get("description", "Policy rule matched.") if rule_action == "deny": self.recent_denials += 1 return self.audit_log.append( policy_name=self.policy["name"], identity=identity, tool_name=tool_name, action=action, decision="deny", matched_rule=matched_rule, severity=severity, reason=description ) if rule_action == "require_approval": return self.audit_log.append( policy_name=self.policy["name"], identity=identity, tool_name=tool_name, action=action, decision="require_approval", matched_rule=matched_rule, severity=severity, reason=description ) if rule_action == "sandbox": blocked_terms = rule.get("sandbox", {}).get("blocked_terms", []) command = str(action.get("command", "")) for term in blocked_terms: if term in command: self.recent_denials += 1 return self.audit_log.append( policy_name=self.policy["name"], identity=identity, tool_name=tool_name, action=action, decision="deny", matched_rule=matched_rule, severity=severity, reason=f"Sandbox blocked command term: {term}" ) return self.audit_log.append( policy_name=self.policy["name"], identity=identity, tool_name=tool_name, action=action, decision="sandbox", matched_rule=matched_rule, severity=severity, reason=description ) return self.audit_log.append( policy_name=self.policy["name"], identity=identity, tool_name=tool_name, action=action, decision=self.policy.get("default_action", "allow"), matched_rule=None, severity=None, reason="No policy rule matched. Default action applied." ) engine = TutorialGovernanceEngine(policy, audit_log)

We build the main governance engine that compares each agent action against the YAML policy rules. We handle different outcomes such as deny, approval required, sandbox mode, and default allow. We also include a kill switch that immediately blocks all actions when needed.

Copy CodeCopiedUse a different Browser

def query_database(table: str, operation: str = "select") -> Dict[str, Any]: return { "status": "success", "operation": operation, "table": table, "rows_returned": random.randint(10, 100) } def send_email(to: str, subject: str, body: str) -> Dict[str, Any]: return { "status": "sent", "to": to, "subject": subject, "body_preview": body[:80] } def shell_exec(command: str) -> Dict[str, Any]: allowed_commands = ["echo", "date", "pwd", "ls"] first = command.strip().split()[0] if command.strip() else "" if first not in allowed_commands: return { "status": "blocked_by_tutorial_shell", "command": command, "reason": "Only harmless demo shell commands are executed." } resu

[truncated for AI cost control]