Events

AI Computer provides a rich event system for real-time monitoring and control of code execution. Events are emitted during streaming execution to provide immediate feedback and enable interactive applications.

Event Structure

All events follow a consistent structure with type information, data payload, and timing details.

1@dataclass
2class StreamEvent:
3    type: str      # Event type identifier
4    data: str      # Event payload
5    timestamp: float  # Unix timestamp of event
6    sequence: int  # Event sequence number
7
8# Example event handling
9async for event in client.execute_code_stream(code):
10    print(f"Event {event.sequence} at {event.timestamp}: {event.type}")

Event Types

Output Events

stdout

Standard output from the code execution.

1if event.type == 'stdout':
2    print(f"Output: {event.data}")

stderr

Standard error and warning messages.

1if event.type == 'stderr':
2    print(f"Error: {event.data}")

Status Events

started

Emitted when code execution begins.

1if event.type == 'started':
2    print("Execution started")

completed

Signals successful completion.

1if event.type == 'completed':
2    print("Execution finished")

Error Events

error

Execution or system errors.

1if event.type == 'error':
2    print(f"Error occurred: {event.data}")

resource_limit

Resource limit violations.

1if event.type == 'resource_limit':
2    print(f"Resource limit exceeded: {event.data}")

Event Handling Patterns

Basic Event Loop

Simple pattern for handling streaming execution events.

1async def handle_execution(code: str):
2    client = SandboxClient()
3    await client.setup()
4    
5    try:
6        async for event in client.execute_code_stream(code):
7            match event.type:
8                case 'stdout':
9                    print(f"Output: {event.data}")
10                case 'stderr':
11                    print(f"Error: {event.data}")
12                case 'error':
13                    print(f"Execution error: {event.data}")
14                case 'completed':
15                    print("Execution completed")
16    finally:
17        await client.cleanup()

Progress Tracking

Track execution progress using event timestamps and sequences.

1async def track_progress(code: str):
2    start_time = time.time()
3    event_count = 0
4    
5    async for event in client.execute_code_stream(code):
6        event_count += 1
7        elapsed = event.timestamp - start_time
8        
9        print(f"Event {event_count} at {elapsed:.2f}s: {event.type}")
10        
11        if event.type == 'completed':
12            print(f"Execution completed in {elapsed:.2f} seconds")

Output Buffering

Buffer and process output events in chunks.

1async def buffer_output(code: str, buffer_size: int = 1024):
2    buffer = []
3    buffer_bytes = 0
4    
5    async for event in client.execute_code_stream(code):
6        if event.type in ('stdout', 'stderr'):
7            buffer.append(event.data)
8            buffer_bytes += len(event.data)
9            
10            if buffer_bytes >= buffer_size:
11                print("".join(buffer))
12                buffer = []
13                buffer_bytes = 0
14        
15        elif event.type == 'completed':
16            if buffer:  # Flush remaining buffer
17                print("".join(buffer))

Best Practices

  • Handle All Events: Always handle all possible event types to ensure robust execution monitoring.
  • Error Recovery: Implement appropriate error handling for error events to maintain application stability.
  • Resource Management: Use the cleanup pattern with try/finally to ensure proper resource cleanup.
  • Buffer Management: Consider buffering output for better performance with high-volume output.
  • Progress Monitoring: Use event timestamps and sequences to track execution progress and timing.