#!/usr/bin/env python3 """ E2E Bot Mediation Test Two LLM-powered agents discover the Servanda API, register, create a session, and negotiate through AI mediation until resolution. This script acts as an external developer would: it discovers the API from /llms-full.txt, registers bots, and drives the full flow. Any failures here indicate gaps in the public API or documentation. Usage: uv run python scripts/e2e_bot_mediation.py [--base-url http://localhost:8000] [--max-turns 15] uv run python scripts/e2e_bot_mediation.py --mode agreement # agreement mode (open-ended) uv run python scripts/e2e_bot_mediation.py --mode resolution # resolution mode (default, enforces outcome) """ import argparse import asyncio import json import random import sys from dotenv import load_dotenv load_dotenv() import httpx import websockets from anthropic import Anthropic # ── Agent Personas ────────────────────────────────────────────────────────── # Resolution mode: a concrete, resolvable dispute ALEX_RESOLUTION = { "name": "Alex", "stance": ( "You are Alex. You share a flat with Jordan and you had a quiet hours agreement " "(10 PM - 8 AM, no loud music or noise).\n\n" "The problem: Jordan broke the quiet hours agreement 3 times this past week, " "claiming 'gig emergencies' — packing gear, last-minute rehearsals at midnight, " "a loud phone call at 6 AM with a venue manager.\n\n" "What you want:\n" "- Acknowledgment that the agreement was broken\n" "- Enforcement of quiet hours going forward with limited, defined exceptions\n" "- If emergencies happen, Jordan must text you first and keep noise minimal\n\n" "You're not trying to be unreasonable — you understand gigs are important. " "But 3 violations in one week is a pattern, not an emergency." ), } JORDAN_RESOLUTION = { "name": "Jordan", "stance": ( "You are Jordan. You share a flat with Alex and there's a quiet hours agreement " "(10 PM - 8 AM).\n\n" "What happened: You had a big gig this week and needed to prep urgently. " "You packed gear late twice and took one early morning call from the venue. " "These were genuine emergencies — the gig paid well and was a career opportunity.\n\n" "What you want:\n" "- Emergency exceptions to quiet hours for legitimate gig-related needs\n" "- A reasonable definition of what counts as an emergency\n" "- To not be treated like a rule-breaker when you were handling real work\n\n" "You respect the quiet hours and don't want to break them habitually. " "You're willing to give advance notice and keep noise down during exceptions." ), } # Agreement mode: original open-ended negotiation ALEX_AGREEMENT = { "name": "Alex", "stance": ( "You are Alex, a remote software engineer who works from home full-time.\n" "Your core needs:\n" "- Quiet during work hours (9 AM - 5 PM) for video calls and deep focus\n" "- Soundproofing investment for the shared wall between your office and the music room\n" "- Clear, enforceable rules so you can plan your workday reliably\n" "\n" "You're willing to compromise on evenings and weekends, but work hours are non-negotiable.\n" "You believe investing in soundproofing benefits both parties long-term." ), } JORDAN_AGREEMENT = { "name": "Jordan", "stance": ( "You are Jordan, a semi-professional musician who practices drums at home.\n" "Your core needs:\n" "- Daily practice time (at least 2-3 hours) to maintain your skills and prepare for gigs\n" "- Creative expression is a fundamental right in your own home\n" "- Flexibility in scheduling — inspiration doesn't follow a clock\n" "\n" "You're willing to compromise on timing and volume during certain hours, but you won't accept\n" "being told you can't play at all. You think shared soundproofing costs are fair." ), } # ── Colors for terminal output ────────────────────────────────────────────── BOLD = "\033[1m" DIM = "\033[2m" RESET = "\033[0m" BLUE = "\033[34m" GREEN = "\033[32m" YELLOW = "\033[33m" RED = "\033[31m" CYAN = "\033[36m" MAGENTA = "\033[35m" def log(prefix: str, msg: str, color: str = ""): print(f"{color}{BOLD}[{prefix}]{RESET} {msg}") # ── API Interactions ──────────────────────────────────────────────────────── async def discover_api(client: httpx.AsyncClient, base_url: str) -> str: """Step 1: Fetch /llms-full.txt to discover the API (proves discovery works).""" resp = await client.get(f"{base_url}/llms-full.txt") resp.raise_for_status() return resp.text async def register_bot(client: httpx.AsyncClient, base_url: str, name: str) -> dict: """Register a bot via POST /api/bot/register. Returns {token, participant_id, name}.""" resp = await client.post( f"{base_url}/api/bot/register", json={"name": name}, ) resp.raise_for_status() return resp.json() async def create_session( client: httpx.AsyncClient, base_url: str, token: str, title: str, description: str = "", mode: str = "agreement", binding_turns: int = None, ) -> dict: """Create a session via POST /api/bot/sessions. Returns {session_id, invite_url, websocket_url}.""" body = { "title": title, "description": description, "mediator_style": "collaborative", "mode": mode, } if binding_turns is not None: body["binding_turns"] = binding_turns resp = await client.post( f"{base_url}/api/bot/sessions", headers={"Authorization": f"Bearer {token}"}, json=body, ) resp.raise_for_status() return resp.json() async def claim_invite( client: httpx.AsyncClient, base_url: str, token: str, invite_token: str, ) -> dict: """ Claim an invite so Bot B joins the session. Uses POST /api/invites/{token}/claim — discovered by exploring the API. The public docs mention invite_url but don't document this endpoint explicitly. """ resp = await client.post( f"{base_url}/api/invites/{invite_token}/claim", headers={"Authorization": f"Bearer {token}"}, ) resp.raise_for_status() return resp.json() async def start_session( client: httpx.AsyncClient, base_url: str, token: str, session_id: str, ) -> dict: """Start the session via POST /api/bot/sessions/{id}/start.""" resp = await client.post( f"{base_url}/api/bot/sessions/{session_id}/start", headers={"Authorization": f"Bearer {token}"}, ) resp.raise_for_status() return resp.json() async def get_session( client: httpx.AsyncClient, base_url: str, token: str, session_id: str, ) -> dict: """Get session details via GET /api/bot/sessions/{id}.""" resp = await client.get( f"{base_url}/api/bot/sessions/{session_id}", headers={"Authorization": f"Bearer {token}"}, ) resp.raise_for_status() return resp.json() # ── Agent WebSocket Loop ─────────────────────────────────────────────────── async def run_agent( name: str, token: str, session_id: str, stance: str, ws_url: str, api_docs: str, max_turns: int, my_role: str = "party_0", mode: str = "resolution", binding_turns: int = None, benchmark_mode: bool = False, ): """ Run one agent: connect via WebSocket, listen for mediator messages, generate responses via Haiku, handle draft proposals, respect turn control. Architecture: two concurrent tasks — 1. receiver: reads WS events, updates state, signals when response is needed 2. responder: waits for signal, generates + sends responses This lets turn_update events arrive even while generating a response. """ anthropic = Anthropic() color = BLUE if my_role == "party_0" else MAGENTA if benchmark_mode and binding_turns: # Aggressive benchmark-style prompt: agents argue to WIN, not cooperate system_prompt = ( f"You are {name} in a mediation.\n\n" f"{stance}\n\n" f"You have {binding_turns} turns before an AI mediator delivers a BINDING ruling.\n\n" "YOUR GOAL IS TO WIN. You are scored on how favorable the ruling is to YOUR position.\n" "- A ruling that gives you everything = maximum score\n" "- A ruling that splits the difference = mediocre score\n" "- A ruling that favors the other party = you lose\n\n" "STRATEGY:\n" "- Advocate AGGRESSIVELY. Do NOT seek compromise or middle ground.\n" "- Do NOT offer concessions unless you get something significant in return.\n" "- Make the mediator feel your position is the ONLY fair outcome.\n" "- Use strong evidence, legal/ethical arguments, and emotional appeals.\n" "- Attack weaknesses in the other party's reasoning.\n" "- Hold firm on your core demands through ALL turns.\n\n" "STYLE:\n" "- Talk naturally, like in a group chat. No need to state your name, turn number, or headers.\n" "- Do NOT use markdown headers (# or ##). Just speak directly and persuasively.\n" "- Keep each message under 2000 characters. Messages longer than 2000 chars will be trimmed.\n\n" "The mediator has web search tools and will fact-check claims, so do not " "fabricate statistics or make verifiably false statements." ) else: turn_budget_note = "" if binding_turns: turn_budget_note = ( f"\n\nIMPORTANT — Turn budget:\n" f"You have {binding_turns} turns total. After that, an AI arbiter delivers a binding ruling.\n" f"- Turns 1-2: State your position and key facts clearly.\n" f"- Turns 3-4: Engage with the other party's points. Find common ground.\n" f"- Turn {binding_turns}: This is your LAST chance to speak before the ruling. " f"Make sure your position is clear and you've acknowledged any agreements reached.\n" f"Don't waste turns repeating yourself. Every message should move toward resolution.\n" ) if mode == "resolution": system_prompt = ( f"You are {name}, participating in an AI-mediated dispute resolution on Servanda.\n\n" f"{stance}\n\n" "Instructions:\n" "- Respond to the mediator naturally and concisely (2-4 sentences).\n" "- Focus on reaching a concrete resolution to this specific dispute.\n" "- State your needs clearly but be willing to accept reasonable compromises.\n" "- When the mediator proposes a resolution, evaluate it honestly.\n" "- If a proposal addresses your core concerns, say so clearly — e.g., 'That works for me.'\n" "- Don't rehash points you've already made. Move the conversation forward.\n" "- Keep the conversation productive and respectful.\n" f"{turn_budget_note}" ) else: system_prompt = ( f"You are {name}, participating in an AI-mediated negotiation on Servanda.\n\n" f"{stance}\n\n" "Instructions:\n" "- Respond to the mediator naturally and concisely (2-4 sentences).\n" "- Be firm on your core needs but genuinely seek creative solutions.\n" "- When a draft agreement is proposed, evaluate it carefully against your needs.\n" "- If the draft is acceptable, approve it. If not, explain what needs to change.\n" "- Keep the conversation productive and respectful.\n" ) conversation: list[dict] = [] turn_count = 0 can_speak = True has_pending_mediator_msg = False # True when a mediator msg awaits our response should_stop = False session_result: dict = {} # Populated on session_closed respond_signal = asyncio.Event() # Set when we should try to generate a response # Build WS URL with auth token separator = "&" if "?" in ws_url else "?" full_ws_url = f"{ws_url}{separator}token={token}" log(name, "Connecting to WebSocket...", color) try: async with websockets.connect(full_ws_url) as ws: log(name, "Connected!", color) async def receiver(): """Read WS events, update shared state, signal responder.""" nonlocal can_speak, has_pending_mediator_msg, should_stop, turn_count, session_result current_stream: dict = {"sender": None, "chunks": []} async for raw_msg in ws: try: event = json.loads(raw_msg) except json.JSONDecodeError: log(name, f"Non-JSON message: {raw_msg[:80]}", RED) continue event_type = event.get("event") data = event.get("data", {}) if event_type == "stream_start": sender = data.get("sender_name", data.get("sender", "unknown")) current_stream = {"sender": sender, "chunks": []} elif event_type == "stream_chunk": chunk = data.get("content", data.get("chunk", "")) current_stream["chunks"].append(chunk) elif event_type == "stream_end": full_text = "".join(current_stream["chunks"]) sender = current_stream["sender"] or data.get("sender_name", "unknown") preview = full_text[:120].replace("\n", " ") log(name, f"{'<' * 3} [{sender}]: {preview}{'...' if len(full_text) > 120 else ''}", DIM) is_mediator = "mediator" in sender.lower() or sender.lower() == "ai" if is_mediator: if turn_count >= max_turns: log(name, f"Max turns reached ({max_turns}), stopping", YELLOW) should_stop = True respond_signal.set() break # Inject turn status so the bot knows where it stands turn_note = "" if binding_turns: remaining = binding_turns - turn_count if remaining <= 0: turn_note = "\n[System: Binding deadline reached. Ruling incoming.]" elif remaining == 1: turn_note = f"\n[System: This is your LAST turn ({turn_count + 1}/{binding_turns}) before the binding ruling.]" elif remaining <= 2: turn_note = f"\n[System: Turn {turn_count + 1}/{binding_turns} — {remaining} turns left. Wrap up your position.]" else: turn_note = f"\n[System: Turn {turn_count + 1}/{binding_turns}]" conversation.append( {"role": "user", "content": f"[Mediator]: {full_text}{turn_note}"} ) has_pending_mediator_msg = True if can_speak: respond_signal.set() else: log(name, "Not my turn — buffering mediator message", DIM) elif sender != name: conversation.append( {"role": "user", "content": f"[{sender}]: {full_text}"} ) elif event_type == "message": content = data.get("content", "") sender = data.get("sender_name", data.get("sender", "unknown")) preview = content[:120].replace("\n", " ") log(name, f"MSG [{sender}]: {preview}", DIM) if sender != name: conversation.append( {"role": "user", "content": f"[{sender}]: {content}"} ) elif event_type == "draft_proposed": draft = data.get("draft", data.get("principles", data)) log(name, "Draft agreement proposed! Evaluating...", YELLOW) draft_text = ( json.dumps(draft, indent=2) if isinstance(draft, (dict, list)) else str(draft) ) conversation.append( { "role": "user", "content": ( f"[System]: A draft agreement has been proposed:\n" f"{draft_text}\n\n" "Evaluate this draft against your core needs. " "Reply with ONLY 'APPROVE' if acceptable, or " "'REJECT: ' if changes are needed." ), } ) response = await asyncio.to_thread( anthropic.messages.create, model="claude-haiku-4-5-20251001", max_tokens=200, system=system_prompt, messages=conversation, ) decision = response.content[0].text.strip() conversation.append({"role": "assistant", "content": decision}) if "APPROVE" in decision.upper() and "REJECT" not in decision.upper(): log(name, "Approving draft", GREEN) await ws.send(json.dumps({"action": "approve_draft"})) else: feedback = decision if "REJECT:" in decision.upper(): feedback = decision.split(":", 1)[1].strip() log(name, f"Rejecting draft: {feedback[:80]}...", RED) await ws.send( json.dumps( {"action": "reject_draft", "feedback": feedback} ) ) elif event_type == "pending_confirmation_added": principle = data.get("principle", data) log(name, f"Principle pending confirmation: {str(principle)[:80]}", YELLOW) await ws.send(json.dumps({"action": "approve_draft"})) elif event_type == "binding_deadline_proposed": turns = data.get("turns_each", "?") log(name, f"Binding deadline proposed: {turns} turns each. Accepting...", YELLOW) await ws.send(json.dumps({"action": "accept_binding_deadline"})) elif event_type == "binding_deadline_accepted": who = data.get("name", data.get("party", "?")) log(name, f"Binding deadline accepted by {who}", DIM) elif event_type == "binding_deadline_active": turns = data.get("turns_each", "?") log(name, f"Binding deadline ACTIVE — ruling after {turns} turns each", GREEN) elif event_type == "binding_deadline_rejected": who = data.get("name", data.get("party", "?")) log(name, f"Binding deadline rejected by {who}", YELLOW) elif event_type == "binding_deadline_reached": log(name, "Binding deadline reached — ruling incoming!", YELLOW) elif event_type == "ruling_stream_start": current_stream = {"sender": "Arbiter", "chunks": []} log(name, "Ruling stream started...", YELLOW) elif event_type == "ruling_stream_chunk": chunk = data.get("chunk", "") current_stream["chunks"].append(chunk) elif event_type == "ruling_stream_end": full_ruling = "".join(current_stream["chunks"]) log(name, f"Ruling delivered ({len(full_ruling)} chars):", GREEN) # Print first ~300 chars of the ruling preview = full_ruling[:300].replace("\n", "\n ") log(name, f" {preview}{'...' if len(full_ruling) > 300 else ''}", DIM) current_stream = {"sender": None, "chunks": []} elif event_type == "agreement_finalized": log(name, "Agreement finalized!", GREEN) should_stop = True respond_signal.set() break elif event_type == "session_closed": session_result = data outcome = data.get("outcome", "unknown") summary = data.get("summary", "") next_steps = data.get("next_steps", "") log(name, f"Session closed — outcome: {outcome}", GREEN) if summary: log(name, f" Summary: {summary}", DIM) if next_steps: log(name, f" Next steps: {next_steps}", DIM) should_stop = True respond_signal.set() break elif event_type in ("turn_update", "turn_state"): allowed = data.get("allowed_speakers", []) mediator_responding = data.get("mediator_responding", False) can_speak = ("all" in allowed or my_role in allowed) and not mediator_responding log(name, f"Turn: allowed={allowed}, can_speak={can_speak}", DIM) # Key fix: if we now can speak and have a buffered message, signal if can_speak and has_pending_mediator_msg: respond_signal.set() elif event_type == "turn_rejected": log(name, f"Turn rejected: {data.get('message', '')}", YELLOW) # Re-queue: we still have something to say, wait for our turn can_speak = False elif event_type == "presence_update": roles = data.get("live_roles", []) log(name, f"Presence: {roles}", DIM) elif event_type == "error": log(name, f"Error from server: {data}", RED) elif event_type in ( "stream_cancelled", "tool_status", "topics_generating", "topics_generated", "status_changed", "draft_approved", "draft_created", "draft_complete", "pending_confirmations_list", "agreement_started", "principle_added", "principle_updated", "principle_removed", "title_changed", "ruling_suggested", ): pass else: log(name, f"Unhandled event: {event_type} — {str(data)[:80]}", DIM) # WS closed — wake up responder so it can exit should_stop = True respond_signal.set() async def responder(): """Wait for signal, then generate and send a response.""" nonlocal turn_count, has_pending_mediator_msg while not should_stop: await respond_signal.wait() respond_signal.clear() if should_stop or not has_pending_mediator_msg: continue # Small delay to feel natural await asyncio.sleep(random.uniform(1.0, 2.0)) # Re-check after delay — turn may have changed if not can_speak: log(name, "Turn changed during delay — keeping message queued", DIM) # Don't clear has_pending_mediator_msg — retry when turn comes back continue if not has_pending_mediator_msg: continue # Generate response via Haiku (in thread so receiver keeps running) response = await asyncio.to_thread( anthropic.messages.create, model="claude-haiku-4-5-20251001", max_tokens=300, system=system_prompt, messages=list(conversation), # snapshot to avoid mutation ) reply = response.content[0].text # Final check — turn may have changed during generation if not can_speak: log(name, "Turn changed during generation — keeping message queued", DIM) # Don't clear has_pending_mediator_msg — retry when turn comes back continue conversation.append({"role": "assistant", "content": reply}) has_pending_mediator_msg = False log(name, f"{'>' * 3} {reply[:120]}{'...' if len(reply) > 120 else ''}", color) await ws.send( json.dumps({"action": "send_message", "content": reply}) ) turn_count += 1 if turn_count >= max_turns: log(name, f"Max turns reached ({max_turns}), stopping", YELLOW) break # Run both tasks; if either finishes, cancel the other recv_task = asyncio.create_task(receiver()) resp_task = asyncio.create_task(responder()) done, pending = await asyncio.wait( [recv_task, resp_task], return_when=asyncio.FIRST_COMPLETED ) for t in pending: t.cancel() try: await t except asyncio.CancelledError: pass # Re-raise any exceptions from the completed task for t in done: if t.exception(): raise t.exception() log(name, f"Disconnected after {turn_count} turns", color) except websockets.exceptions.ConnectionClosed as e: log(name, f"WebSocket closed: {e}", RED) except Exception as e: log(name, f"Error: {e}", RED) raise return session_result # ── Main Orchestration ───────────────────────────────────────────────────── async def main(): parser = argparse.ArgumentParser(description="E2E Bot Mediation Test") parser.add_argument( "--base-url", default="http://localhost:8000", help="Server base URL (default: http://localhost:8000)", ) parser.add_argument( "--max-turns", type=int, default=15, help="Max turns per agent before stopping (default: 15)", ) parser.add_argument( "--mode", choices=["agreement", "resolution"], default="resolution", help="Session mode: 'resolution' (default) for dispute resolution, 'agreement' for open-ended negotiation", ) parser.add_argument( "--binding-turns", type=int, default=None, help="Enable binding deadline: after N turns per party, a binding ruling is auto-delivered (resolution mode only)", ) parser.add_argument( "--case", type=str, default=None, help="Load a benchmark case by ID from scripts/benchmark_cases.json (e.g., 'conjoined-twins-010'). Overrides --mode to resolution.", ) args = parser.parse_args() base_url = args.base_url.rstrip("/") ws_base = base_url.replace("http://", "ws://").replace("https://", "wss://") mode = args.mode # Select personas: from benchmark case file or built-in if args.case: import pathlib cases_file = pathlib.Path(__file__).parent / "benchmark_cases.json" with open(cases_file) as f: all_cases = json.load(f)["cases"] case = next((c for c in all_cases if c["id"] == args.case), None) if not case: # Try partial match case = next((c for c in all_cases if args.case.lower() in c["id"].lower() or args.case.lower() in c["title"].lower()), None) if not case: print(f"{RED}{BOLD}[Error]{RESET} Case '{args.case}' not found. Available cases:") for c in all_cases: print(f" {c['id']}: {c['title']}") sys.exit(1) mode = "resolution" alex_persona = {"name": case["side_a"]["label"], "stance": case["side_a"]["stance_prompt"]} jordan_persona = {"name": case["side_b"]["label"], "stance": case["side_b"]["stance_prompt"]} session_title = case["title"] session_desc = case["description"] print(f" Case: {case['id']} ({case['difficulty']})") elif mode == "resolution": alex_persona = ALEX_RESOLUTION jordan_persona = JORDAN_RESOLUTION session_title = "Quiet Hours Dispute" session_desc = ( "Jordan broke the quiet hours agreement 3 times this week " "claiming gig emergencies. Alex wants enforcement with exceptions." ) else: alex_persona = ALEX_AGREEMENT jordan_persona = JORDAN_AGREEMENT session_title = "Home Office Noise Policy" session_desc = ( "Negotiating quiet hours, soundproofing investment, " "and practice schedules for a shared living space." ) binding_turns = args.binding_turns print(f"\n{BOLD}{'=' * 60}{RESET}") print(f"{BOLD} Servanda E2E Bot Mediation Test{RESET}") print(f"{BOLD}{'=' * 60}{RESET}") print(f" Server: {base_url}") print(f" Mode: {mode}") print(f" Max turns: {args.max_turns}") if binding_turns: print(f" Binding deadline: {binding_turns} turns per party") async with httpx.AsyncClient(timeout=30) as client: # ── Step 1: Discover API ──────────────────────────────────────── print(f"\n{CYAN}{BOLD}[Step 1] Discovering API...{RESET}") try: api_docs = await discover_api(client, base_url) log("Discovery", f"Fetched /llms-full.txt ({len(api_docs)} chars)", GREEN) except httpx.HTTPStatusError as e: log("Discovery", f"FAILED: {e.response.status_code} — is the server running?", RED) sys.exit(1) # ── Step 2: Register both bots ────────────────────────────────── print(f"\n{CYAN}{BOLD}[Step 2] Registering bots...{RESET}") alex_reg = await register_bot(client, base_url, alex_persona["name"]) log(alex_persona["name"], f"Registered — participant_id={alex_reg['participant_id']}", BLUE) jordan_reg = await register_bot(client, base_url, jordan_persona["name"]) log(jordan_persona["name"], f"Registered — participant_id={jordan_reg['participant_id']}", MAGENTA) # ── Step 3: Alex creates session ──────────────────────────────── print(f"\n{CYAN}{BOLD}[Step 3] Alex creating {mode} session...{RESET}") session = await create_session( client, base_url, alex_reg["token"], title=session_title, description=session_desc, mode=mode, binding_turns=binding_turns, ) session_id = session["session_id"] invite_url = session["invite_url"] log("Session", f"Created: {session_id} (mode={mode})", GREEN) log("Session", f"Invite URL: {invite_url}", DIM) # ── Step 4: Jordan claims the invite ──────────────────────────── print(f"\n{CYAN}{BOLD}[Step 4] Jordan claiming invite...{RESET}") # Extract invite token from URL (format: /join/{token}) invite_token = invite_url.rstrip("/").split("/")[-1] log("Jordan", f"Claiming invite token: {invite_token}", MAGENTA) claim_result = await claim_invite( client, base_url, jordan_reg["token"], invite_token ) log("Jordan", f"Joined session: {claim_result}", GREEN) # ── Step 5: Alex starts the session ───────────────────────────── print(f"\n{CYAN}{BOLD}[Step 5] Alex starting session...{RESET}") start_result = await start_session( client, base_url, alex_reg["token"], session_id ) log("Session", f"Started: {start_result}", GREEN) # ── Step 6: Both agents negotiate via WebSocket ───────────────── print(f"\n{CYAN}{BOLD}[Step 6] Starting mediation ({mode} mode)...{RESET}") print(f"{DIM}{'-' * 60}{RESET}\n") ws_url = f"{ws_base}/ws/agreement/{session_id}" # Timeout: 60s per turn + 30s buffer timeout_secs = args.max_turns * 60 + 30 results = [None, None] try: results = await asyncio.wait_for( asyncio.gather( run_agent( name=alex_persona["name"], token=alex_reg["token"], session_id=session_id, stance=alex_persona["stance"], ws_url=ws_url, api_docs=api_docs, max_turns=args.max_turns, my_role="party_0", mode=mode, binding_turns=binding_turns, benchmark_mode=bool(args.case), ), run_agent( name=jordan_persona["name"], token=jordan_reg["token"], session_id=session_id, stance=jordan_persona["stance"], ws_url=ws_url, api_docs=api_docs, max_turns=args.max_turns, my_role="party_1", mode=mode, binding_turns=binding_turns, benchmark_mode=bool(args.case), ), ), timeout=timeout_secs, ) except asyncio.TimeoutError: log("Session", f"Timed out after {timeout_secs}s", RED) # ── Done ──────────────────────────────────────────────────────────── print(f"\n{BOLD}{'=' * 60}{RESET}") # Show session outcome if available (resolution mode) session_data = None if results: for r in results: if r and isinstance(r, dict) and r.get("outcome"): session_data = r break if session_data: outcome = session_data.get("outcome", "unknown") summary = session_data.get("summary", "") next_steps = session_data.get("next_steps", "") outcome_color = GREEN if outcome == "resolved" else YELLOW print(f"{outcome_color}{BOLD} Outcome: {outcome}{RESET}") if summary: print(f" Summary: {summary}") if next_steps: print(f" Next steps: {next_steps}") else: print(f"{YELLOW}{BOLD} No session_closed event received{RESET}") if mode == "resolution": print(f" (mediator may not have called close_session within max turns)") print(f"\n View session: {base_url}/admin/conversations") print(f"{BOLD}{'=' * 60}{RESET}\n") if __name__ == "__main__": asyncio.run(main())