Events
AI Computer provides a rich event system for real-time monitoring and control of code execution. Events are emitted during streaming execution to provide immediate feedback and enable interactive applications.
Event Structure
All events follow a consistent structure with type information, data payload, and timing details.
1@dataclass
2class StreamEvent:
3 type: str # Event type identifier
4 data: str # Event payload
5 timestamp: float # Unix timestamp of event
6 sequence: int # Event sequence number
7
8# Example event handling
9async for event in client.execute_code_stream(code):
10 print(f"Event {event.sequence} at {event.timestamp}: {event.type}")
Event Types
Output Events
stdout
Standard output from the code execution.
1if event.type == 'stdout':
2 print(f"Output: {event.data}")
stderr
Standard error and warning messages.
1if event.type == 'stderr':
2 print(f"Error: {event.data}")
Status Events
started
Emitted when code execution begins.
1if event.type == 'started':
2 print("Execution started")
completed
Signals successful completion.
1if event.type == 'completed':
2 print("Execution finished")
Error Events
error
Execution or system errors.
1if event.type == 'error':
2 print(f"Error occurred: {event.data}")
resource_limit
Resource limit violations.
1if event.type == 'resource_limit':
2 print(f"Resource limit exceeded: {event.data}")
Event Handling Patterns
Basic Event Loop
Simple pattern for handling streaming execution events.
1async def handle_execution(code: str):
2 client = SandboxClient()
3 await client.setup()
4
5 try:
6 async for event in client.execute_code_stream(code):
7 match event.type:
8 case 'stdout':
9 print(f"Output: {event.data}")
10 case 'stderr':
11 print(f"Error: {event.data}")
12 case 'error':
13 print(f"Execution error: {event.data}")
14 case 'completed':
15 print("Execution completed")
16 finally:
17 await client.cleanup()
Progress Tracking
Track execution progress using event timestamps and sequences.
1async def track_progress(code: str):
2 start_time = time.time()
3 event_count = 0
4
5 async for event in client.execute_code_stream(code):
6 event_count += 1
7 elapsed = event.timestamp - start_time
8
9 print(f"Event {event_count} at {elapsed:.2f}s: {event.type}")
10
11 if event.type == 'completed':
12 print(f"Execution completed in {elapsed:.2f} seconds")
Output Buffering
Buffer and process output events in chunks.
1async def buffer_output(code: str, buffer_size: int = 1024):
2 buffer = []
3 buffer_bytes = 0
4
5 async for event in client.execute_code_stream(code):
6 if event.type in ('stdout', 'stderr'):
7 buffer.append(event.data)
8 buffer_bytes += len(event.data)
9
10 if buffer_bytes >= buffer_size:
11 print("".join(buffer))
12 buffer = []
13 buffer_bytes = 0
14
15 elif event.type == 'completed':
16 if buffer: # Flush remaining buffer
17 print("".join(buffer))
Best Practices
- Handle All Events: Always handle all possible event types to ensure robust execution monitoring.
- Error Recovery: Implement appropriate error handling for error events to maintain application stability.
- Resource Management: Use the cleanup pattern with try/finally to ensure proper resource cleanup.
- Buffer Management: Consider buffering output for better performance with high-volume output.
- Progress Monitoring: Use event timestamps and sequences to track execution progress and timing.