Skip to content

Pipecat Adapter

Draft · v0.2.0 · 2026-06-30

Status: Draft — Rewritten for Option C Architecture Source-of-truth statement: Domain specification is source of truth. Pipecat config is generated adapter output. Pipecat should not be responsible for domain-level evidence ledger, marking policy, or authoritative transition approval.

The Pipecat Adapter translates a compiled InterviewRuntime (derived from the domain specification) into a Pipecat FlowManager configuration that a voice-agent pipeline (LiveKit · STT · LLM · TTS) can execute. The adapter is a one-way compiler: IR → Pipecat config. Runtime state mutations, evidence writes, and transition approvals remain in the Runtime Controller.


  1. Adapter Principles
  2. Architecture: Option C — Pipecat Flows as Engine, Runtime Controller as Driver
  3. The report_observation Protocol
  4. Node Mapping: IR Node → Pipecat NodeConfig
  5. Runtime Controller Integration
  6. Context Strategy
  7. Transcript and Evidence Flow
  8. Output Validation Pipeline
  9. LiveKit Data Channel Events
  10. Candidate Commands
  11. Adapter Error Handling
  12. Adapter Versioning

#PrincipleRationale
A-1IR → config, never config → IRPipecat config is a derived artefact. Reverse-engineering is unsupported and unsafe.
A-2No domain logic in Pipecat nodesA NodeConfig carries prompts, tool schemas, and routing metadata — never scoring rubrics, evidence policies, or transition approval logic.
A-3LLM observes, Controller decidesThe LLM calls report_observation to report what it sees. The Runtime Controller evaluates and acts. The LLM NEVER triggers transitions directly.
A-4Deterministic mappingThe same specification node always produces the same NodeConfig. No randomness, no heuristic inference.
A-5Lossless for critical dataNode ID, question text, evidence targets, time budgets, allowed actions, forbidden actions, and transition conditions MUST survive adapter output verbatim.
A-6Adapter is statelessThe adapter does not track session state. It compiles a full specification package once; state is managed by the Runtime Controller during execution.
A-7Fail-closed on ambiguityIf a specification construct has no Pipecat equivalent, the adapter MUST either (a) encode it as metadata for the Runtime Controller to handle, or (b) fail with a compile error — never silently drop it.

Pipecat Flows is the engine. Runtime Controller is the driver.

Pipecat Flows provides: pipeline orchestration (STT → LLM → TTS), node management (set_node_from_config), context strategy (APPEND / RESET), function registration, LiveKit transport integration, and TTS/STT service coordination.

Runtime Controller provides: exam state machine, guardrail enforcement, evidence ledger writes, follow-up counting, time budget management, LLM output validation, transition decisions, and event emission.

The LLM has one function: report_observation. It is not a transition request — it is an observation report. The Runtime Controller evaluates observations and decides what happens next.

┌──────────────────────────────────────────────────────────────────┐
│                     Domain Specification Package                  │
│   (nodes, transitions, evidence, policies, persona, scenario)    │
└────────────────────────┬─────────────────────────────────────────┘
                         │ compile

              ┌─────────────────────┐
              │   Pipecat Adapter   │  (stateless compiler)
              └────────┬────────────┘
                       │ produces

┌──────────────────────────────────────────────────────────────────┐
│                    Runtime Controller                             │
│  ┌─────────────────────────────────────────────────────────────┐ │
│  │  Exam State Machine (exam → scaffolding → ready → in_progress│ │
│  │  → completed / aborted / expired)                            │ │
│  ├─────────────────────────────────────────────────────────────┤ │
│  │  Guardrails: persona, time budget, follow-up count,          │ │
│  │  equity, hint refusal, topic containment                     │ │
│  ├─────────────────────────────────────────────────────────────┤ │
│  │  Evidence Ledger (write on every observation)                │ │
│  ├─────────────────────────────────────────────────────────────┤ │
│  │  LLM Output Validation (content/topic/action/length filters) │ │
│  ├─────────────────────────────────────────────────────────────┤ │
│  │  Event Emitter → LiveKit DataChannel                         │ │
│  └──────────────────────────────┬──────────────────────────────┘ │
│                                 │ calls                          │
│                                 ▼                                │
│  ┌──────────────────────────────────────────────────────────────┐│
│  │              Pipecat FlowManager                              ││
│  │  • Node management (set_node_from_config)                    ││
│  │  • Context strategy (APPEND / RESET + summarization)         ││
│  │  • Function registration (report_observation only)           ││
│  │  • role_message (persona persistence)                        ││
│  └──────────────────────────────┬──────────────────────────────┘│
│                                 │ uses                           │
│                                 ▼                                │
│  ┌──────────────────────────────────────────────────────────────┐│
│  │              Pipecat Pipeline                                 ││
│  │  • LiveKitTransport (audio + data channel)                   ││
│  │  • STT service (Deepgram / Whisper / etc.)                   ││
│  │  • LLM service (OpenAI / Gemini / etc.)                      ││
│  │  • TTS service (Cartesia / ElevenLabs / etc.)                ││
│  └──────────────────────────────────────────────────────────────┘│
└──────────────────────────────────────────────────────────────────┘
ConcernPipecat FlowManagerRuntime Controller
Pipeline (STT→LLM→TTS)✓ Owns
Node managementset_node_from_config()Decides when to call it
Context strategy✓ APPEND / RESETDecides which strategy per node
Function registration✓ Registers report_observationDefines the handler
Persona / role_message✓ Persists across nodesSets from the specification persona
Guardrails✓ Full enforcement
Follow-up counting✓ Runtime state
Time budget✓ Runtime timer
Evidence Ledger✓ Writes on every observation
LLM output validation✓ Intercepts spoken_text
Transition decisions✓ Evaluates observations
Event emission✓ LiveKit DataChannel
Candidate commands✓ Dispatches actions
Recovery orchestration✓ Handles failures

In Pipecat Flows, functions serve dual purposes: node functions execute operations within a node (return (FlowResult, None)), while edge functions trigger transitions (return (FlowResult, NodeConfig)). Our architecture collapses all LLM interaction into a single function that always returns (FlowResult, None) — the LLM never returns a NodeConfig. Transitions are driven by the Runtime Controller calling flow_manager.set_node_from_config() directly.

This gives us:

  • Single function: LLM has one tool, reducing hallucination risk.
  • Observation-based: LLM reports what it sees, not what should happen.
  • Controller authority: All structural decisions stay in the Runtime Controller.
  • Composability: Signals, commands, and intent are bundled in one call, reducing round-trips.
interface ReportObservationArgs {
  // What the candidate demonstrated (evidence signals)
  signals: Array<{
    signalType: string;           // MUST match IR evidence vocabulary
    rubricLevel?: string;         // Observed level (e.g., "description", "analysis")
    excerpt: string;              // Short candidate quote (max 200 chars)
    confidence: number;           // 0.0 – 1.0
    scaffoldingIntensity?: number; // 0–3: how much scaffolding was provided before this signal
    scaffoldingEffective?: boolean; // Did candidate improve after scaffolding?
  }>;

  // What the candidate said (if it was a command, not an answer)
  commandDetected?: "repeat" | "clarification" | "request_rephrase" | "slow_down" | "pause" | "thinking_aloud" | "help" | "skip" | "revise_earlier_answer" | "finish";

  // LLM's assessment of the candidate's response
  answerQuality: "substantive" | "partial" | "off_topic" | "silence" | "unclear";

  // Does the LLM want to ask a follow-up?
  needsFollowUp: boolean;
  followUpType?: "probe" | "redirect" | "scaffold" | "challenge" | "nudge" | "confirm" | "extend" | "concede";

  // Does the LLM believe enough evidence has been gathered?
  evidenceSufficient: boolean;

  // Does the LLM detect candidate anxiety?
  anxietyDetected: boolean;

  // Beyond anxiety: crying, aggressive tone, refusal to continue
  distressDetected: boolean;

  // Rapport move (affective dimension, does NOT count toward maxFollowUps)
  rapportMove?: "encouragement" | "acknowledgement" | "reassurance" | "none";

  // Dialogue move (structural conversation act)
  dialogueMove?: "paraphrase" | "transition" | "none";

  // Misconception detection (optional)
  misconceptions?: Array<{
    concept: string;           // What the candidate misunderstood
    misconception: string;     // The specific error in understanding
    correction: string;        // What correct understanding looks like
  }>;

  // What the LLM wants to say to the candidate next
  // (Runtime validates this through output filters before presenting)
  spokenText: string;
}
from pipecat_flows import FlowsFunctionSchema

report_observation = FlowsFunctionSchema(
    name="report_observation",
    description=(
        "Report your observations about the candidate's response. "
        "Include any evidence signals you detected, whether the candidate "
        "issued a command, your assessment of answer quality, and what you "
        "want to say next. Call this after every candidate response."
    ),
    properties={
        "signals": {
            "type": "array",
            "description": "Evidence signals observed in the candidate's response",
            "items": {
                "type": "object",
                "properties": {
                    "signalType": {
                        "type": "string",
                        "description": "Evidence type from the assessment rubric"
                    },
                    "rubricLevel": {
                        "type": "string",
                        "description": "Observed rubric level (e.g., description, analysis, evaluation)"
                    },
                    "excerpt": {
                        "type": "string",
                        "description": "Short quote from the candidate (max 200 chars)"
                    },
                    "confidence": {
                        "type": "number",
                        "description": "Confidence that this signal was demonstrated (0.0-1.0)"
                    }
                },
                "required": ["signalType", "excerpt", "confidence"]
            }
        },
        "commandDetected": {
            "type": "string",
            "enum": ["repeat", "clarification", "request_rephrase", "slow_down", "pause", "thinking_aloud", "help", "skip", "revise_earlier_answer", "finish"],
            "description": "If the candidate issued a command instead of answering"
        },
        "answerQuality": {
            "type": "string",
            "enum": ["substantive", "partial", "off_topic", "silence", "unclear"],
            "description": "Assessment of the candidate's response quality"
        },
        "needsFollowUp": {
            "type": "boolean",
            "description": "Whether you want to ask a follow-up question"
        },
        "followUpType": {
            "type": "string",
            "enum": ["probe", "redirect", "scaffold", "challenge", "nudge", "confirm", "extend", "concede"],
            "description": "Type of follow-up you want to ask"
        },
        "evidenceSufficient": {
            "type": "boolean",
            "description": "Whether you believe enough evidence has been gathered for this topic"
        },
        "anxietyDetected": {
            "type": "boolean",
            "description": "Whether you detect the candidate is anxious or stressed"
        },
        "misconceptions": {
            "type": "array",
            "description": "Specific misconceptions detected in the candidate's response. Use when the candidate demonstrates incorrect understanding, not just incomplete understanding.",
            "items": {
                "type": "object",
                "properties": {
                    "concept": {
                        "type": "string",
                        "description": "What the candidate misunderstood"
                    },
                    "misconception": {
                        "type": "string",
                        "description": "The specific error in understanding"
                    },
                    "correction": {
                        "type": "string",
                        "description": "What correct understanding looks like"
                    }
                },
                "required": ["concept", "misconception", "correction"]
            }
        },
        "spokenText": {
            "type": "string",
            "description": "What you want to say to the candidate next"
        }
    },
    required=["signals", "answerQuality", "needsFollowUp", "evidenceSufficient", "anxietyDetected", "spokenText"],
    handler=handle_observation,  # Runtime Controller handler
)
async def handle_observation(args: dict, flow_manager: FlowManager):
    """
    Runtime Controller handler for report_observation.
    This is where ALL domain logic executes.
    """
    # 1. Extract observation
    signals = args.get("signals", [])
    command = args.get("commandDetected")
    spoken_text = args["spokenText"]
    needs_followup = args["needsFollowUp"]
    evidence_sufficient = args["evidenceSufficient"]

    # 2. If candidate command detected, dispatch it FIRST
    if command:
        await runtime_controller.dispatch_command(command, flow_manager)
        return {"status": "command_dispatched", "command": command}

    # 3. Validate LLM output through the output validation pipeline
    #    (persona_break, rubric_leak, topic_containment, length, leading_question filters)
    validated_text = await runtime_controller.validate_output(
        spoken_text, current_node=flow_manager.current_node
    )

    # 4. Write evidence signals to ledger (with STT confidence provenance)
    for signal in signals:
        stt_summary = runtime_controller.compute_stt_confidence_summary(
            signal, transcript_segments=flow_manager.recent_segments
        )
        if stt_summary.min < 0.5:
            # EVD-010: reject signals from low-confidence transcript segments
            await runtime_controller.trigger_recovery("stt_low_confidence", flow_manager)
            continue
        await runtime_controller.record_evidence(
            signal, node_id=flow_manager.current_node, stt_confidence_summary=stt_summary
        )

    # 4b. Record misconceptions if detected
    misconceptions = args.get("misconceptions", [])
    for m in misconceptions:
        await runtime_controller.record_misconception(m, node_id=flow_manager.current_node)

    # 5. Check guardrails
    guardrail_result = await runtime_controller.check_guardrails(
        needs_followup=needs_followup,
        evidence_sufficient=evidence_sufficient,
        anxiety_detected=args.get("anxietyDetected", False),
        answer_quality=args["answerQuality"],
        node_id=flow_manager.current_node,
    )

    # 6. Decide: stay or transition?
    if guardrail_result.should_transition:
        # Build next node config from IR
        next_node_config = runtime_controller.build_next_node_config(
            guardrail_result.next_node_id
        )
        # Runtime Controller drives the transition
        await flow_manager.set_node_from_config(next_node_config)
        return {"status": "transitioned", "nextNode": guardrail_result.next_node_id}

    elif guardrail_result.should_followup:
        # Inject follow-up context into LLM
        await runtime_controller.inject_followup_context(
            flow_manager, followup_type=args.get("followUpType", "probe")
        )
        return {"status": "followup_injected", "type": args.get("followUpType")}

    else:
        # Continue conversation in current node
        return {"status": "continue", "validatedText": validated_text}

The previous design had three functions: request_transition, report_evidence_signal, report_candidate_command. This was replaced with one report_observation because:

ConcernOld (3 functions)New (1 function)
LLM call count per turn1–3 calls1 call always
LatencyMultiple round-tripsSingle round-trip
Hallucination riskLLM might call wrong functionOne function, one schema
AtomicitySignals might arrive before transition requestAll observations bundled
Controller complexityMust correlate 3 separate callsSingle handler, single evaluation

Each specification node compiles to a Pipecat NodeConfig:

def compile_ir_node_to_node_config(ir_node: IRNode) -> NodeConfig:
    """Stateless compilation: IR node → Pipecat NodeConfig."""
    return {
        "name": ir_node.nodeId,

        # Role/persona — persists across nodes via Pipecat's role_message
        "role_message": ir_node.persona or "You are an examiner conducting an oral assessment.",

        # Task messages — the LLM's instructions for this node
        "task_messages": [
            {
                "role": "developer",
                "content": build_task_message(ir_node)
            }
        ],

        # The ONE function the LLM can call
        "functions": [report_observation],

        # Pre-actions: things to do when entering the node
        "pre_actions": build_pre_actions(ir_node),

        # Post-actions: things to do when leaving the node
        "post_actions": build_post_actions(ir_node),

        # Context strategy: RESET between nodes to avoid context bloat
        "context_strategy": ContextStrategyConfig(
            strategy=ContextStrategy.RESET
        ),
    }

The task_messages are the LLM’s instructions for the current node. They are constructed from the specification node’s scenario context, rubric criteria (as evidence vocabulary), and guardrail reminders.

def build_task_message(ir_node: IRNode) -> str:
    """Build the developer message for this node."""
    parts = []

    # Scenario context
    if ir_node.scenario:
        parts.append(f"SCENARIO: {ir_node.scenario}")

    # Conversation prompt (sentence-starter)
    if ir_node.conversationPrompt:
        parts.append(f"OPENING: {ir_node.conversationPrompt}")

    # Rubric criteria as evidence vocabulary (what to listen for)
    if ir_node.evidenceSignals:
        criteria_list = "\n".join(
            f"- {sig.signalType}: {sig.description} (levels: {', '.join(sig.levels or [])})"
            for sig in ir_node.evidenceSignals
        )
        parts.append(f"EVIDENCE TO LISTEN FOR:\n{criteria_list}")

    # Guardrail reminders
    guardrails = []
    if ir_node.maxFollowUps:
        guardrails.append(f"- Maximum {ir_node.maxFollowUps} follow-up questions")
    if ir_node.timeBudgetSec:
        guardrails.append(f"- Time budget: {ir_node.timeBudgetSec} seconds")
    if ir_node.forbiddenActions:
        guardrails.append(f"- NEVER: {', '.join(ir_node.forbiddenActions)}")
    if guardrails:
        parts.append("CONSTRAINTS:\n" + "\n".join(guardrails))

    # Report observation reminder
    parts.append(
        "After every candidate response, call report_observation with your "
        "assessment of the response, any evidence signals you detected, "
        "and what you want to say next."
    )

    # Prompting consistency directive (POL-007)
    # Grounded in Pearce & Chiavaroli (2020), via Fenton (2025):
    # prompting must be consistent across candidates.
    parts.append(
        "CONSISTENCY: Use the same questioning approach for all candidates. "
        "Do not vary your level of scaffolding or hint-giving based on "
        "perceived candidate ability. Maintain a consistent tone and "
        "difficulty level throughout the assessment."
    )

    return "\n\n".join(parts)
def build_pre_actions(ir_node: IRNode) -> list:
    """Build pre-actions for entering a node."""
    actions = []

    # Emit node_entered event via data channel
    actions.append({
        "type": "function",
        "handler": lambda action, fm: runtime_controller.emit_node_entered(
            ir_node.nodeId, fm
        )
    })

    # If there's a scenario introduction, speak it first
    if ir_node.scenarioIntro:
        actions.append({
            "type": "tts_say",
            "text": ir_node.scenarioIntro
        })

    return actions


def build_post_actions(ir_node: IRNode) -> list:
    """Build post-actions for leaving a node."""
    return [
        {
            "type": "function",
            "handler": lambda action, fm: runtime_controller.finalize_node(
                ir_node.nodeId, fm
            )
        }
    ]
IR ConstructPipecat OutputNotes
nodeIdNodeConfig.namePreserved verbatim.
personaNodeConfig.role_messagePersists across nodes until overridden.
scenarioEmbedded in task_messages[0].contentPart of the developer message.
conversationPromptEmbedded in task_messages[0].contentSentence-starter for the LLM.
evidenceSignalsEmbedded in task_messages[0].contentRubric criteria as evidence vocabulary.
maxFollowUpsEmbedded in task_messages[0].contentInformative; enforcement is controller-side.
timeBudgetMs (converted to seconds in adapter)Embedded in task_messages[0].contentInformative; enforcement is controller-side.
forbiddenActionsEmbedded in task_messages[0].contentPrompt-level guardrail; controller validates output.
allowedActionsEmbedded in task_messages[0].contentPrompt-level guidance.
contextStrategyNodeConfig.context_strategyRESET between nodes; APPEND within a node.
transitionConditionsNOT in Pipecat configController evaluates; never exposed to LLM.
transversalSkillsEmbedded in task_messages[0].contentEvidence vocabulary for cross-cutting skills.
Prompting consistency directiveEmbedded in task_messages[0].contentPOL-007: instructs LLM to use consistent questioning approach.
outputValidationFiltersFlowConfig.outputValidationFiltersADP-016: persona_break, rubric_leak, topic_containment, length filters.
forbiddenActions (rubric-leak)Output validation pipelinePOL-006: evidence signal descriptions must be behavioral, not rubric descriptors.

async def create_exam_session(ir_package: InterviewRuntime, session_config: SessionConfig):
    """Create and start an exam session."""

    # 1. Create Pipecat pipeline components
    transport = LiveKitTransport(url=..., token=..., room_name=...)
    stt = DeepgramSTTService(api_key=...)
    llm = OpenAILLMService(api_key=..., model="gpt-4o")
    tts = CartesiaTTSService(api_key=...)

    # 2. Create context aggregator with summarization
    context = LLMContext(messages=[
        {"role": "system", "content": "You are an AI examiner conducting an interactive oral assessment."}
    ])
    user_agg, assistant_agg = LLMContextAggregatorPair(
        context,
        assistant_params=LLMAssistantAggregatorParams(
            enable_auto_context_summarization=True,
        ),
    )

    # 3. Create pipeline
    pipeline = Pipeline([
        transport.input(),
        stt,
        user_agg,
        llm,
        tts,
        transport.output(),
        assistant_agg,
    ])

    # 4. Create Runtime Controller
    runtime_controller = RuntimeController(
        ir_package=ir_package,
        session_config=session_config,
        transport=transport,
    )

    # 5. Create FlowManager (Pipecat Flows)
    flow_manager = FlowManager(
        task=PipelineTask(pipeline),
        llm=llm,
        context_aggregator=user_agg,
        transport=transport,
    )

    # 6. Initialize with first node
    first_node = runtime_controller.build_initial_node_config()
    await flow_manager.initialize(initial_node=first_node)

    # 7. Start pipeline
    task = PipelineTask(pipeline)
    await task.run()

    return runtime_controller, flow_manager

When the Runtime Controller decides to transition (based on evaluating report_observation):

class RuntimeController:
    async def execute_transition(self, flow_manager: FlowManager, next_node_id: str):
        """Execute a node transition. Called ONLY by Runtime Controller."""

        # 1. Finalize current node
        await self.finalize_node(flow_manager.current_node)

        # 2. Build next node config from IR
        next_config = self.build_node_config(next_node_id)

        # 3. Drive transition via FlowManager
        await flow_manager.set_node_from_config(next_config)

        # 4. Emit events
        await self.emit_node_entered(next_node_id)

        # 5. Update internal state
        self.current_node_id = next_node_id
        self.followup_count = 0
        self.node_start_time = time.time()
class RuntimeController:
    async def check_guardrails(self, observation: dict) -> GuardrailResult:
        """Evaluate all guardrails against the observation."""

        node_id = self.current_node_id
        ir_node = self.ir_package.get_node(node_id)

        # Time budget check
        elapsed = time.time() - self.node_start_time
        if elapsed > ir_node.timeBudgetSec:
            return GuardrailResult(should_transition=True, reason="time_exhausted")

        # Follow-up limit check
        if observation["needsFollowUp"]:
            if self.followup_count >= ir_node.maxFollowUps:
                return GuardrailResult(should_transition=True, reason="followups_exhausted")
            self.followup_count += 1
            return GuardrailResult(should_followup=True)

        # Evidence sufficiency check
        if observation["evidenceSufficient"]:
            min_met = self.evidence_ledger.check_required_evidence(node_id, ir_node.completionPolicy.requiredEvidenceCount)
            if min_met:
                return GuardrailResult(should_transition=True, reason="evidence_sufficient")

        # Off-topic check
        if observation["answerQuality"] == "off_topic":
            self.off_topic_count += 1
            if self.off_topic_count >= ir_node.maxOffTopicRedirects:
                return GuardrailResult(should_transition=True, reason="off_topic_limit")

        # Anxiety neutrality check (POL-008)
        # Grounded in Fenton (2025), citing Pearce & Chiavaroli (2020):
        # prompting must neither discourage nor reassure the student.
        if observation.get("anxietyDetected", False):
            # Validate that spokenText doesn't contain assessment-relevant reassurance
            reassurance_patterns = ["you're doing great", "good answer", "that's correct",
                                    "excellent", "well done", "you're on the right track"]
            spoken_lower = observation.get("spokenText", "").lower()
            if any(pattern in spoken_lower for pattern in reassurance_patterns):
                # Intercept: replace with neutral procedural support
                observation["spokenText"] = "Take your time. Would you like me to repeat the question?"
            return GuardrailResult(
                recovery_action="calm_support",
                reason="anxiety_detected"
            )

        # Default: continue conversation
        return GuardrailResult(should_continue=True)

When transitioning between nodes, the context MUST be reset. The LLM starts each node with a fresh context containing only:

  1. The system message (persona + general instructions)
  2. The node’s task_messages (scenario, evidence vocabulary, constraints)

This prevents context from previous nodes leaking into the current assessment.

Within a single node, context accumulates normally (candidate responses, LLM follow-ups). Pipecat’s built-in context summarization handles long conversations within a node.

LLMAssistantAggregatorParams(
    enable_auto_context_summarization=True,
    auto_context_summarization_config=LLMAutoContextSummarizationConfig(
        max_context_tokens=4000,           # Trigger at 4000 tokens
        max_unsummarized_messages=10,      # Or after 10 messages
        summary_config=LLMContextSummaryConfig(
            target_context_tokens=2000,    # Target summary size
            min_messages_after_summary=2,  # Keep last 2 messages uncompressed
        ),
    ),
)

When transitioning, the Runtime Controller MAY inject a brief summary of the previous node into the new node’s context (as a developer message). This provides continuity without polluting the context:

def build_node_config(self, node_id: str) -> NodeConfig:
    config = compile_ir_node_to_node_config(self.ir_package.get_node(node_id))

    # Inject previous node summary if available
    if self.previous_node_summary:
        config["task_messages"].insert(1, {
            "role": "developer",
            "content": f"Previous topic summary: {self.previous_node_summary}"
        })

    return config

Candidate speaks → STT → Transcript segment

                              ├─► Pipecat context aggregator (for LLM context)

                              └─► Runtime Controller webhook

                                    ├─ Append to authoritative transcript
                                    ├─ Run command detection (safety net)
                                    ├─ Check silence thresholds
                                    ├─ Emit transcript event to data channel
                                    └─ Store for marking pipeline handoff
LLM processes candidate response

    └─► Calls report_observation(signals=[...], ...)

            └─► Runtime Controller handler

                  ├─ Validate signal types against IR vocabulary
                  ├─ Deduplicate signals
                  ├─ Check STT confidence of underlying transcript segments
                  │     (EVD-010: reject signals from segments with confidence < 0.5)
                  ├─ Compute sttConfidenceSummary (min, max, mean) per signal
                  ├─ Write to Evidence Ledger (with rubricLevel, sttConfidenceSummary)
                  ├─ Update coverage metrics
                  ├─ Emit evidence_update to data channel
                  └─ Check if requiredEvidenceCount threshold met
{
  "type": "transcript_segment",
  "sessionId": "sess-2026-0506-001",
  "nodeId": "scenario-hotel-breakfast",
  "speaker": "candidate",
  "text": "I would first analyse the current breakfast offerings...",
  "confidence": 0.92,
  "timestampMs": 1746487230000,
  "durationMs": 4200,
  "isFinal": true,
  "segmentId": "seg-0042"
}

The output validation pipeline intercepts the LLM’s proposed spokenText before it reaches the candidate via TTS. This is the last line of defense against persona breaks, rubric leakage, topic drift, and assessment-invalid speech.

LLM calls report_observation(spokenText="...")


  Runtime Controller handler

       ├─ 1. persona_break filter
       │     Check: Does spokenText break character?
 │     Action: Replace with neutral alternative

       ├─ 2. rubric_leak filter
       │     Check: Does spokenText contain rubric descriptors?
       │     Action: Intercept and rephrase

       ├─ 3. topic_containment filter
       │     Check: Does spokenText stay within node's scenario?
       │     Action: Redirect to current topic

       ├─ 4. length filter
       │     Check: Is spokenText <= 500 chars?
       │     Action: Summarize or split

       └─ 5. leading_question filter (SHOULD)
             Check: Does spokenText suggest the answer?
             Action: Rephrase as neutral question


  Validated text → TTS → Candidate

The adapter MUST include an outputValidationFilters configuration in the compiled FlowConfig:

output_validation_config = {
    "filters": [
        {
            "name": "persona_break",
            "enabled": True,
            "action": "replace",
            "patterns": [
                "as your examiner", "according to the rubric",
                "i'm an ai", "the grading criteria"
            ]
        },
        {
            "name": "rubric_leak",
            "enabled": True,
            "action": "intercept",
            "checkAgainst": "node.evidenceSignalDescriptions"
        },
        {
            "name": "topic_containment",
            "enabled": True,
            "action": "redirect",
            "domain": "node.scenarioDomain"
        },
        {
            "name": "length",
            "enabled": True,
            "action": "summarize",
            "maxChars": 500
        },
        {
            "name": "leading_question",
            "enabled": True,
            "action": "rephrase",
            "patterns": ["wouldn't you say", "don't you think", "surely you'd agree"]
        }
    ]
}

The output validation pipeline is complementary to, not a replacement for, the forbiddenActions guardrail system:

  • forbiddenActions operates at the prompt level — telling the LLM what not to do.
  • Output validation operates at the output level — catching what the LLM does anyway.
  • Together they form a defense-in-depth system: prompt instructions → LLM behavior → output filter → candidate.

The LiveKit data channel is the real-time communication bus between the exam bot and the frontend Exam Room. The Runtime Controller emits structured events; the frontend consumes them to update UI state.

Runtime Controller

       ├─ on exam_state_change ──► DataChannel: exam_state event
       ├─ on node_entered ───────► DataChannel: node_entered event
       ├─ on evidence_signal ────► DataChannel: evidence_update event
       ├─ on follow_up ──────────► DataChannel: follow_up_issued event
       ├─ on candidate_command ──► DataChannel: command_acknowledged event
       ├─ on transition ─────────► DataChannel: node_exit / node_entered
       ├─ on time_warning ───────► DataChannel: time_warning event
       ├─ on guardrail ──────────► DataChannel: guardrail_triggered event
       ├─ on recovery ───────────► DataChannel: recovery_event
       └─ on session_end ────────► DataChannel: exam_completed event
# LiveKit data channel configuration
transport = LiveKitTransport(
    url=os.environ["LIVEKIT_URL"],
    token=token,
    room_name=room_name,
    params=LiveKitParams(
        # Data channel for runtime events
        data_channel_topic="exam-runtime-events",
    ),
)

node_entered:

{
  "event": "node_entered",
  "sessionId": "sess-2026-0506-001",
  "timestamp": "2026-05-06T02:10:00Z",
  "payload": {
    "nodeId": "scenario-hotel-breakfast",
    "persona": "hotel_manager",
    "scenario": "You are meeting with the hotel's food & beverage team...",
    "evidenceTargets": ["proposes_options", "analyses_clientele", "considers_budget"],
    "maxFollowUps": 3,
    "timeBudgetSec": 300,
    "progress": { "currentNodeIndex": 2, "totalNodes": 5 }
  }
}

evidence_update:

{
  "event": "evidence_update",
  "sessionId": "sess-2026-0506-001",
  "timestamp": "2026-05-06T02:12:45Z",
  "payload": {
    "nodeId": "scenario-hotel-breakfast",
    "signalType": "proposes_options",
    "rubricLevel": "analysis",
    "excerpt": "I'd suggest a hybrid approach with both buffet and à la carte...",
    "confidence": 0.85,
    "coverage": 0.67
  }
}

guardrail_triggered:

{
  "event": "guardrail_triggered",
  "sessionId": "sess-2026-0506-001",
  "timestamp": "2026-05-06T02:13:30Z",
  "payload": {
    "guardrail": "persona_break",
    "nodeId": "scenario-hotel-breakfast",
    "action": "output_intercepted",
    "originalText": "As your examiner, I should note...",
    "replacementText": "That's an interesting point. Can you tell me more about the budget implications?"
  }
}

  1. Candidate speaks a command-like utterance (e.g., “Can you repeat the question?”)
  2. LLM detects the command intent and sets commandDetected in report_observation
  3. Runtime Controller handler dispatches the command action
  4. If LLM fails to detect, a secondary classifier (rule-based, on transcript) catches it as safety net
CommandRuntime Controller ActionData Channel Event
repeatRe-speak the node’s conversationPrompt via TTS. Timer continues.command_acknowledged { command: "repeat" }
clarificationInject clarification context into LLM. Count toward maxClarifications.command_acknowledged { command: "clarification" }
slow_downAdjust TTS speed setting.command_acknowledged { command: "slow_down" }
pauseTransition exam to paused state.exam_state { state: "paused" }
helpProvide general exam instructions (not question-specific).command_acknowledged { command: "help" }
skipIf policy allows, transition to next node. Otherwise refuse.command_acknowledged { command: "skip" }
finishConfirm with candidate, then end exam.exam_state { state: "completed" }

ErrorBehavior
IR node references undefined evidenceSignalCompile error. Adapter refuses to produce config.
IR transition targets non-existent nodeCompile error.
IR node has no conversationPromptCompile error for assessment nodes. Warning for scaffolding/end nodes.
IR forbiddenActions conflicts with allowedActionsCompile error.
Pipecat lacks a construct for a specification featureEncode as metadata for Runtime Controller. MUST NOT silently drop.
Runtime Controller unreachable during handlerPipecat MUST surface an error to the LLM and emit a system_error data channel event.
report_observation handler raises exceptionLog error, emit system_error, continue conversation with canned fallback.
Output validation filter detects persona breakIntercept spokenText, replace with neutral alternative, emit guardrail_triggered event.
Output validation filter detects rubric leakIntercept spokenText, rephrase to remove rubric content, emit guardrail_triggered event.
Output validation filter detects topic driftIntercept spokenText, redirect to current node’s scenario domain.
spokenText exceeds 500 char limitSummarize or split utterance before TTS.

adapterVersion: "pipecat-adapter/0.2"

This version tracks:

  • The report_observation function schema
  • The task message template
  • The node config compilation rules
  • The data channel event schemas

When the specification version changes, the adapter version MAY change. When only the mapping logic changes, the adapter version MUST change independently.

See 09-versioning.md for full versioning rules.

VersionDateChanges
v0.2.02026-06-30Rewritten for Option C architecture. Domain IR declared as source of truth; Pipecat config is generated adapter output. Updated terminology from ‘Exam Runtime IR’ to ‘IOA-ORM’.
v0.1.02026-05-06Initial release.