Skip to content
BAEM1N.DEV — AI, RAG, LLMOps 개발 블로그
Go back

DeepCoWork #6: HITL Approval Flow -- interrupt_on, Approval Queue, Timeout, Rejection Recovery

TL;DR: Write/execute tool calls trigger a LangGraph interrupt-based graph suspension with a 300-second auto-reject timeout for safety.

Table of contents

Open Table of contents

Why HITL

When an AI agent overwrites files or runs rm -rf, there is no undo. HITL (Human-in-the-Loop) is the safety gate that requires human confirmation before dangerous operations.

The implementation follows the tool call review pattern from the LangGraph HITL how-to guide.

ToolNeeds ApprovalReason
read_file, ls, glob, grepNoRead-only, no side effects
web_search, memory_readNoExternal lookup, non-destructive
write_fileYesCreates/overwrites files
edit_fileYesModifies files
executeYesRuns shell commands

The Complete Flow

Agent calls write_file
    |
    v
LangGraph interrupt_on -> graph suspended
    |
    v
_pump_agent: check graph_state.tasks[].interrupts for pending
    |
    v
_request_approval() -> send approval SSE event
    |
    v
Frontend ApprovalModal displayed
    |
    +-- Approve -> POST /agent/approve -> asyncio.Event.set()
    +-- Reject -> POST /agent/approve (approved=false)
    +-- Timeout (300s) -> auto-reject
    |
    v
Command(resume={"decisions": [...]}) -> agent resumes

Backend: Interrupt Detection

_pump_agent() checks for interrupts after each agent execution:

for _iter in range(config.MAX_AGENT_ITERATIONS):
    async for chunk in stream_events(agent, agent_input, cfg, active_subagents):
        await out.put(chunk)

    graph_state = await get_agent_state(agent, cfg)
    pending = []
    for task in (graph_state.tasks or []):
        for interrupt in (getattr(task, "interrupts", None) or []):
            pending.append(interrupt)

    if not pending:
        break  # No interrupt = agent complete

    decisions = []
    for interrupt in pending:
        value = getattr(interrupt, "value", interrupt) or {}
        action_requests = value.get("action_requests", [])

        for action_req in action_requests:
            tool_name = action_req.get("name", "")
            if tool_name in config.READ_ONLY_TOOLS:
                decisions.append({"type": "approve"})
                continue
            approved = await _request_approval(tool_name, action_req.get("args", {}), thread_id, out)
            decisions.append({"type": "approve" if approved else "reject"})

    agent_input = resume_agent_input(decisions)

Key: MAX_AGENT_ITERATIONS (default 25) prevents infinite loops.

Approval Request and Wait

async def _request_approval(tool_name, tool_args, thread_id, out) -> bool:
    approval_id = str(uuid.uuid4())
    evt = asyncio.Event()
    pending_approvals[approval_id] = evt
    thread_approval_ids.setdefault(thread_id, set()).add(approval_id)

    await out.put(sse({
        "type": "approval",
        "approval_id": approval_id,
        "tool_name": tool_name,
        "args": tool_args,
        "source": "main",
    }))

    try:
        await asyncio.wait_for(evt.wait(), timeout=config.APPROVAL_TIMEOUT_SEC)
    except asyncio.TimeoutError:
        approval_results[approval_id] = False  # Timeout = reject

    return approval_results.pop(approval_id, False)

asyncio.Event synchronizes the approval result. The SSE connection stays alive while waiting.

Global State Management

Global dictionaries in hitl.py:

pending_approvals: dict[str, asyncio.Event] = {}
approval_results: dict[str, bool] = {}
thread_approval_ids: dict[str, set[str]] = {}
thread_output_queues: dict[str, asyncio.Queue] = {}
abort_signals: dict[str, bool] = {}

cleanup_thread() clears all state when the stream ends.

Frontend: ApprovalModal

The modal shows tool-specific formatting:

A queue badge shows how many approvals are pending, processing them one at a time.

Rejection Recovery

On rejection, the agent receives a reject decision and typically:

  1. Tries an alternative approach to the same goal
  2. Asks the user why they rejected
  3. Skips the task and moves to the next one

Benchmark

MetricValue
Approval timeout300 seconds (5 minutes)
Approval request to modal display latency~80ms
Average user response time (internal testing)~3.2 seconds
Rejection recovery success rate (Claude Sonnet)~70%
MAX_AGENT_ITERATIONS25 (infinite loop prevention)

Abort: Full Stop

When the user presses “Stop”:

const handleAbort = useCallback(async () => {
  abortRef.current?.abort();          // Cancel fetch
  await abortThread(threadId);        // Signal server
  finalizeStream();                   // Clean up UI
}, [activeThreadId, finalizeStream]);

Server-side, abort_signals[thread_id] = True terminates _pump_agent’s loop immediately.

FAQ

What is the approval timeout?

Default 300 seconds (5 minutes). Configurable via config.APPROVAL_TIMEOUT_SEC.

What if multiple approval requests come at once?

They are queued. A queueSize badge shows how many are pending, and they are processed sequentially.

Do sub-agents also require HITL?

No. Sub-agents are created with with_hitl=False. The main agent in ACP mode already received user approval when calling the task() tool.


Series

  1. DeepCoWork: I Built an AI Agent Desktop App
  2. Tauri 2 + Python Sidecar
  3. DeepAgents SDK Internals
  4. System Prompt Design per Mode
  5. SSE Streaming Pipeline
  6. [This post] HITL Approval Flow
  7. Multi-Agent ACP Mode
  8. Agent Memory 4 Layers
  9. Skills System
  10. LLM Provider Integration
  11. Security Checklist
  12. GitHub Actions Cross-Platform Build

AI-assisted content
Share this post on:

Previous Post
DeepCoWork #7: Multi-Agent ACP Mode -- task() Tool, Sub-Agent Creation, Stream Merging
Next Post
DeepCoWork #5: SSE Streaming Pipeline -- From agent.astream to React UI