#!/usr/bin/env python3 """ Minimal E2E bot mediation test. Two bots join a resolution session, respond to the mediator, and exit when done. Watch the conversation live in the web UI at the URL printed on start. Usage: uv run python scripts/e2e_bot_simple.py uv run python scripts/e2e_bot_simple.py --base-url https://servanda.ai --binding-turns 3 uv run python scripts/e2e_bot_simple.py --arbiter --binding-turns 3 """ import argparse import asyncio import json from dotenv import load_dotenv load_dotenv() import httpx import websockets from anthropic import Anthropic # ── Minimal personas ────────────────────────────────────────────────────── ALEX = ( "You are Alex. You share a flat with Jordan and had a quiet hours agreement " "(10 PM - 8 AM). Jordan broke it 3 times this week for gig emergencies. " "You want acknowledgment and enforcement going forward, with limited exceptions. " "Be concise (2-3 sentences). Don't repeat yourself." ) JORDAN = ( "You are Jordan. You share a flat with Alex with quiet hours (10 PM - 8 AM). " "You had genuine gig emergencies this week — packing gear, midnight rehearsal, 6 AM venue call. " "You want reasonable emergency exceptions defined. You respect the agreement. " "Be concise (2-3 sentences). Don't repeat yourself." ) # ── HTTP helpers ────────────────────────────────────────────────────────── async def api(client, method, url, token=None, **kwargs): headers = {"Authorization": f"Bearer {token}"} if token else {} resp = await getattr(client, method)(url, headers=headers, **kwargs) resp.raise_for_status() return resp.json() if resp.content else {} # ── Bot WebSocket loop ──────────────────────────────────────────────────── async def run_bot(name, token, _session_id, stance, ws_url, max_turns, my_role, binding_turns): anthropic = Anthropic() system = ( f"{stance}\n\n" f"You have {binding_turns} turns before a binding ruling. " "Make each turn count — state your position, engage, and converge." ) conversation = [] turns = 0 can_speak = True pending = False stop = False signal = asyncio.Event() stream_chunks = [] stream_sender = None sep = "&" if "?" in ws_url else "?" async with websockets.connect(f"{ws_url}{sep}token={token}") as ws: print(f" [{name}] connected") async def recv(): nonlocal can_speak, pending, stop, turns, stream_chunks, stream_sender async for raw in ws: ev = json.loads(raw) t, d = ev.get("event"), ev.get("data", {}) if t == "stream_start": stream_sender = d.get("sender_name", d.get("sender", "")) stream_chunks = [] elif t == "stream_chunk": stream_chunks.append(d.get("content", d.get("chunk", ""))) elif t == "stream_end": text = "".join(stream_chunks) sender = stream_sender or d.get("sender_name", "") is_mediator = "mediator" in sender.lower() or sender.lower() == "ai" if is_mediator and turns < max_turns: note = f" [turn {turns+1}/{binding_turns}]" if binding_turns else "" conversation.append({"role": "user", "content": f"[Mediator]: {text}{note}"}) pending = True if can_speak: signal.set() elif sender != name: conversation.append({"role": "user", "content": f"[{sender}]: {text}"}) stream_chunks, stream_sender = [], None elif t == "turn_update": allowed = d.get("allowed_speakers", []) can_speak = ("all" in allowed or my_role in allowed) and not d.get("mediator_responding") if can_speak and pending: signal.set() elif t == "turn_rejected": can_speak = False elif t == "binding_deadline_proposed": await ws.send(json.dumps({"action": "accept_binding_deadline"})) elif t == "draft_proposed": await ws.send(json.dumps({"action": "approve_draft"})) elif t == "pending_confirmation_added": await ws.send(json.dumps({"action": "approve_draft"})) elif t == "ruling_stream_start": stream_chunks, stream_sender = [], "Arbiter" elif t == "ruling_stream_chunk": stream_chunks.append(d.get("chunk", "")) elif t == "ruling_stream_end": ruling = "".join(stream_chunks) print(f" [{name}] ruling received ({len(ruling)} chars)") stream_chunks, stream_sender = [], None elif t in ("agreement_finalized", "session_closed"): if t == "session_closed": print(f" [{name}] session closed — {d.get('outcome', '?')}") stop = True signal.set() return stop = True signal.set() async def respond(): nonlocal turns, pending while not stop: await signal.wait() signal.clear() if stop or not pending or not can_speak: continue resp = await asyncio.to_thread( anthropic.messages.create, model="claude-haiku-4-5-20251001", max_tokens=200, system=system, messages=list(conversation), ) reply = resp.content[0].text if not can_speak: continue conversation.append({"role": "assistant", "content": reply}) pending = False print(f" [{name}] turn {turns+1}: {reply[:100]}{'...' if len(reply)>100 else ''}") await ws.send(json.dumps({"action": "send_message", "content": reply})) turns += 1 r, s = asyncio.create_task(recv()), asyncio.create_task(respond()) done, rest = await asyncio.wait([r, s], return_when=asyncio.FIRST_COMPLETED) for t in rest: t.cancel() try: await t except asyncio.CancelledError: pass for t in done: if t.exception(): raise t.exception() # ── Main ────────────────────────────────────────────────────────────────── async def main(): p = argparse.ArgumentParser() p.add_argument("--base-url", default="http://localhost:8000") p.add_argument("--binding-turns", type=int, default=5) p.add_argument("--max-turns", type=int, default=15) p.add_argument("--arbiter", action="store_true", help="Create and use a custom arbiter") args = p.parse_args() base = args.base_url.rstrip("/") ws_base = base.replace("http://", "ws://").replace("https://", "wss://") async with httpx.AsyncClient(timeout=30) as c: # Register alex = await api(c, "post", f"{base}/api/bot/register", json={"name": "Alex"}) jordan = await api(c, "post", f"{base}/api/bot/register", json={"name": "Jordan"}) if args.arbiter: # ── Arbiter flow ────────────────────────────────────────── # Alex creates an arbiter (requires Plus/Pro — use admin override for testing) arbiter_slug = f"quiet-hours-{alex['participant_id'][:8]}" print(f"\n Creating arbiter: {arbiter_slug}") arbiter = await api(c, "post", f"{base}/api/arbiters", token=alex["token"], json={ "slug": arbiter_slug, "name": "Quiet Hours Arbiter", "description": "Specialized in flatmate noise disputes", "custom_instructions": ( "You specialize in noise and quiet hours disputes between flatmates. " "Always ask about: time windows, emergency definitions, and notification " "expectations. Aim for a concrete schedule with exception rules." ), "mediator_style": "collaborative", "default_mode": "resolution", "default_binding_turns": args.binding_turns, "is_public": True, }) print(f" Arbiter created: {arbiter.get('id', '?')}") # Verify it shows up in the public directory directory = await api(c, "get", f"{base}/api/bot/arbiters") print(f" Public directory: {len(directory)} arbiter(s)") # Jordan creates a session under the arbiter session = await api(c, "post", f"{base}/api/bot/arbiters/{arbiter_slug}/sessions", token=jordan["token"], json={ "title": "Quiet Hours Dispute", "description": "Flatmate noise dispute over gig emergencies.", }) else: # ── Direct session flow (existing) ──────────────────────── session = await api(c, "post", f"{base}/api/bot/sessions", token=alex["token"], json={ "title": "Quiet Hours Dispute", "description": "Flatmate noise dispute over gig emergencies.", "mediator_style": "collaborative", "mode": "resolution", "binding_turns": args.binding_turns, }) sid = session["session_id"] invite_token = session["invite_url"].rstrip("/").split("/")[-1] # Join + start (creator differs: jordan in arbiter mode, alex otherwise) if args.arbiter: await api(c, "post", f"{base}/api/invites/{invite_token}/claim", token=alex["token"]) await api(c, "post", f"{base}/api/bot/sessions/{sid}/start", token=jordan["token"]) else: await api(c, "post", f"{base}/api/invites/{invite_token}/claim", token=jordan["token"]) await api(c, "post", f"{base}/api/bot/sessions/{sid}/start", token=alex["token"]) print(f"\n Session: {sid}") print(f" Watch: {base}/agreement/{sid}?observe=1") print(f" Binding: {args.binding_turns} turns each") if args.arbiter: print(f" Arbiter: {arbiter_slug}") print() # Run both bots ws_url = f"{ws_base}/ws/agreement/{sid}" try: await asyncio.wait_for( asyncio.gather( run_bot("Alex", alex["token"], sid, ALEX, ws_url, args.max_turns, "party_0" if not args.arbiter else "party_1", args.binding_turns), run_bot("Jordan", jordan["token"], sid, JORDAN, ws_url, args.max_turns, "party_1" if not args.arbiter else "party_0", args.binding_turns), ), timeout=args.max_turns * 60 + 30, ) except asyncio.TimeoutError: print(" Timed out") # If arbiter mode, verify owner can list sessions if args.arbiter: sessions = await api(c, "get", f"{base}/api/arbiters/{arbiter['id']}/sessions", token=alex["token"]) print(f"\n Arbiter sessions: {len(sessions)}") print("\n Done.\n") if __name__ == "__main__": asyncio.run(main())