Appearance
Event Sourcing Guide
Overview
MonoTask's event sourcing system provides a complete audit trail of all state changes, actions, and system events. This document explains the architecture, usage patterns, and performance considerations.
Architecture
Core Components
┌─────────────────────────────────────────────────────────────────┐
│ Event Sourcing System │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Event Store │───▶│ Snapshots │───▶│ Projections │ │
│ │ (Append-Only)│ │ (Performance)│ │ (Queries) │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │ │ │ │
│ └────────────────────┴───────────────────┘ │
│ │ │
│ ┌─────────▼─────────┐ │
│ │ State Reconstruction│ │
│ └──────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘Database Schema
1. Event Store Table
The central append-only log of all events.
sql
CREATE TABLE event_store (
event_id INTEGER PRIMARY KEY AUTOINCREMENT,
event_uuid TEXT NOT NULL UNIQUE,
event_type TEXT NOT NULL,
event_version INTEGER NOT NULL DEFAULT 1,
-- Aggregate information
aggregate_type TEXT NOT NULL,
aggregate_id TEXT NOT NULL,
aggregate_version INTEGER NOT NULL,
-- Event metadata
event_timestamp DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
correlation_id TEXT,
causation_id TEXT,
-- Actor information
actor_type TEXT NOT NULL,
actor_id TEXT,
-- Event payload
event_data JSON NOT NULL,
metadata JSON DEFAULT '{}',
-- State snapshots (optional)
state_before JSON,
state_after JSON
);Key Design Decisions:
- Append-only: Events are never updated or deleted (immutability)
- Aggregate versioning: Each event increments the aggregate version
- Correlation/causation tracking: Link related events across the system
- State snapshots: Optional before/after state for quick queries
2. Specialized Event Tables
State Machine Events:
sql
CREATE TABLE state_machine_events (
event_uuid TEXT NOT NULL UNIQUE,
task_id TEXT NOT NULL,
from_state TEXT NOT NULL,
to_state TEXT NOT NULL,
transition_type TEXT NOT NULL,
triggered_by TEXT NOT NULL,
trigger_type TEXT NOT NULL,
reason TEXT,
conditions_checked JSON,
conditions_passed JSON,
conditions_failed JSON,
event_timestamp DATETIME NOT NULL,
event_store_id INTEGER -- Links to event_store
);Task Action Events:
sql
CREATE TABLE task_action_events (
action_uuid TEXT NOT NULL UNIQUE,
task_id TEXT NOT NULL,
action_type TEXT NOT NULL,
action_category TEXT NOT NULL,
actor_type TEXT NOT NULL,
actor_id TEXT,
action_data JSON NOT NULL,
changes JSON,
action_timestamp DATETIME NOT NULL,
event_store_id INTEGER
);System Events:
sql
CREATE TABLE system_events (
event_uuid TEXT NOT NULL UNIQUE,
event_type TEXT NOT NULL,
event_category TEXT NOT NULL,
severity TEXT NOT NULL,
event_data JSON NOT NULL,
error_message TEXT,
stack_trace TEXT,
event_timestamp DATETIME NOT NULL,
event_store_id INTEGER
);3. Aggregate Snapshots
Snapshots optimize performance by storing aggregate state at specific versions.
sql
CREATE TABLE aggregate_snapshots (
snapshot_id INTEGER PRIMARY KEY AUTOINCREMENT,
snapshot_uuid TEXT NOT NULL UNIQUE,
aggregate_type TEXT NOT NULL,
aggregate_id TEXT NOT NULL,
aggregate_version INTEGER NOT NULL,
snapshot_data JSON NOT NULL,
snapshot_timestamp DATETIME NOT NULL,
first_event_id INTEGER NOT NULL,
last_event_id INTEGER NOT NULL,
event_count INTEGER NOT NULL,
created_by TEXT NOT NULL DEFAULT 'system'
);Snapshot Strategy:
- Create snapshot every 50 state transitions (configurable)
- Store complete aggregate state at that point
- Enable fast state reconstruction without replaying all events
Usage Patterns
1. Recording State Transitions
Automatic (via EventSourcedStateMachine):
typescript
import { createEventSourcedStateMachine } from '@monotask/core/state-machine/event-sourcing-integration';
import { createSQLiteAdapter } from '@monotask/shared/database/sqlite-adapter';
import { Database } from 'bun:sqlite';
const db = new Database('.monotask/project.db');
const adapter = createSQLiteAdapter(db);
const stateMachine = createEventSourcedStateMachine(db, adapter);
// State transition automatically records event
await stateMachine.transition(
taskId,
TaskState.IMPLEMENTING,
{
triggeredBy: 'agent-001',
reason: 'All tests passing, starting implementation',
metadata: {
correlationId: 'workflow-123',
testResults: { passed: 10, failed: 0 }
}
}
);Manual (via EventSourcingService):
typescript
import { createEventSourcingService } from '@monotask/core/event-sourcing/event-sourcing-service';
const eventService = createEventSourcingService(adapter);
await eventService.recordStateTransition({
taskId: 'task-001',
fromState: 'WRITING_TESTS',
toState: 'TESTS_COMPLETE',
transitionType: 'automatic',
triggerType: 'condition',
actorType: 'system',
reason: 'All test files created and passing',
conditionsChecked: ['test_files_exist', 'tests_passing'],
conditionsPassed: ['test_files_exist', 'tests_passing'],
conditionsFailed: []
});2. Recording Task Actions
typescript
await stateMachine.recordTaskAction(
taskId,
'test_added',
'testing',
{
testFile: 'packages/core/src/validation.test.ts',
testCount: 5,
testSuite: 'ValidationEngine'
},
'agent',
'agent-001',
{
filesAdded: ['packages/core/src/validation.test.ts'],
linesAdded: 120
}
);3. Recording System Events
typescript
await eventService.recordSystemEvent(
'worker_failed',
'worker',
'error',
{
workerId: 'worker-validation-001',
taskId: 'task-001',
errorType: 'ValidationError',
attemptCount: 3
},
{
message: 'Validation worker exceeded timeout',
stack: error.stack
}
);4. Retrieving Event History
Get complete audit trail:
typescript
const auditTrail = await stateMachine.getTaskAuditTrail(taskId);
// Returns:
// [
// {
// timestamp: Date,
// eventType: 'state_transition',
// actor: 'agent-001',
// description: 'State changed from PENDING to ELICITATION',
// details: { transitionType: 'automatic', reason: '...' }
// },
// {
// timestamp: Date,
// eventType: 'action',
// actor: 'agent-001',
// description: 'Action: test_added',
// details: { category: 'testing', actionData: {...} }
// }
// ]Get specific event types:
typescript
const history = await stateMachine.getTaskEventHistory(taskId);
console.log('State Transitions:', history.stateTransitions);
console.log('Actions:', history.actions);
console.log('All Events:', history.allEvents);5. Event Replay and State Reconstruction
Rebuild current state from events:
typescript
const rebuilt = await stateMachine.rebuildTaskFromEvents(taskId);
console.log('Current State:', rebuilt.currentState);
console.log('Version:', rebuilt.version);
console.log('Event Count:', rebuilt.eventCount);Get state at a specific timestamp:
typescript
const pastTimestamp = new Date('2025-10-01T12:00:00Z');
const pastState = await stateMachine.getTaskStateAtTimestamp(taskId, pastTimestamp);
console.log('State at', pastTimestamp, ':', pastState);6. Manual Snapshot Creation
typescript
// Manually create a snapshot (useful before major operations)
await stateMachine.createTaskSnapshot(taskId);Performance Optimizations
1. Indexes for D1
The schema includes optimized indexes for common query patterns:
sql
-- Aggregate queries (most common)
CREATE INDEX idx_event_store_aggregate
ON event_store(aggregate_type, aggregate_id, aggregate_version);
-- Temporal queries
CREATE INDEX idx_event_store_timestamp
ON event_store(event_timestamp DESC);
-- Event type queries
CREATE INDEX idx_event_store_type
ON event_store(event_type);
-- Correlation tracking
CREATE INDEX idx_event_store_correlation
ON event_store(correlation_id)
WHERE correlation_id IS NOT NULL;
-- Actor activity queries
CREATE INDEX idx_event_store_actor
ON event_store(actor_type, actor_id);2. Snapshot Strategy
Configuration:
typescript
const eventService = createEventSourcingService(adapter, {
snapshotThreshold: 50, // Snapshot every 50 events
enableAutomaticSnapshots: true, // Auto-create snapshots
enableProjectionUpdates: true, // Update projections
batchSize: 1000, // Batch size for replay
retentionDays: 365 // Keep events for 1 year
});Benefits:
- Fast state reconstruction: Load snapshot + recent events (vs. all events)
- Reduced query time: Avoid replaying 1000+ events
- Configurable trade-offs: More snapshots = faster reads, more storage
Example Performance:
- Without snapshots: Rebuild from 1000 events = ~500ms
- With snapshots (every 50): Load snapshot + 50 events = ~50ms
- 10x faster for long-running tasks
3. D1-Specific Optimizations
Batch Operations:
typescript
// Instead of N individual inserts
const events = [event1, event2, event3, ...];
await repository.appendEventsBatch(events); // Single atomic batchProjection Materialization: Use the event_projections table to cache expensive aggregations:
typescript
// Materialized view of task state summary
const projection = await repository.getProjection('task_state_summary');
// Projection data is pre-computed and cached
console.log(projection.projection_data);Integration with State Machine
Automatic Event Recording
The EventSourcedStateMachine extends the base StateMachine to automatically record events:
typescript
class EventSourcedStateMachine extends StateMachine {
async transition(taskId, toState, context) {
const fromState = this.getCurrentState(taskId);
// Execute transition
const result = await super.transition(taskId, toState, context);
// If successful, record event
if (result.success) {
await this.recordTransitionEvent(taskId, fromState, toState, context, result);
}
return result;
}
}Benefits:
- No code changes required in existing transition logic
- Events recorded automatically for all state changes
- Complete audit trail without manual instrumentation
Triggers for Event Propagation
Database triggers automatically propagate events to event_store:
sql
CREATE TRIGGER tr_sm_event_to_event_store
AFTER INSERT ON state_machine_events
BEGIN
INSERT INTO event_store (...) VALUES (...);
UPDATE state_machine_events SET event_store_id = last_insert_rowid() ...;
END;Flow:
- State transition creates
state_machine_eventsrow - Trigger fires and creates
event_storerow - Foreign key links the two
- Projection rebuild flags are set
Best Practices
1. Event Naming Conventions
typescript
// Good: Descriptive, past tense
'task_created'
'state_transitioned'
'test_added'
'implementation_completed'
// Bad: Vague or present tense
'task'
'transition'
'add_test'2. Event Data Structure
Include enough context for replay:
typescript
{
event_data: {
// What changed
from_state: 'WRITING_TESTS',
to_state: 'TESTS_COMPLETE',
// Why it changed
reason: 'All 10 tests passing',
// Who triggered it
triggered_by: 'agent-001',
// Context for replay
test_results: { passed: 10, failed: 0 },
test_files: ['test1.ts', 'test2.ts']
}
}3. Correlation and Causation
Link related events:
typescript
// Workflow-level correlation
const correlationId = EventSourcingService.generateCorrelationId();
// Event 1: Test writing started
await recordTaskAction({ ..., correlationId });
// Event 2: Test completed (same correlation)
await recordTaskAction({ ..., correlationId });
// Event 3: State transition caused by Event 2
await recordStateTransition({
...,
correlationId,
causationId: testCompletedEventId
});4. Error Handling
typescript
try {
await stateMachine.transition(taskId, newState, context);
} catch (error) {
// Record failure as system event
await eventService.recordSystemEvent(
'state_transition_failed',
'worker',
'error',
{
taskId,
attemptedTransition: { from: oldState, to: newState },
context
},
{
message: error.message,
stack: error.stack
}
);
throw error;
}Querying Events
Views for Common Queries
Latest state for each aggregate:
sql
SELECT * FROM v_aggregate_latest_state
WHERE aggregate_type = 'task';Task state timeline:
sql
SELECT * FROM v_task_state_timeline
WHERE task_id = 'task-001'
ORDER BY event_timestamp;Event statistics:
sql
SELECT * FROM v_event_statistics;Migration Notes
Applying the Migration
bash
# Run the event sourcing migration
bun run db:migrate
# Verify tables created
sqlite3 .monotask/project.db "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE '%event%'"Rollback Procedure
bash
# Rollback if needed (drops all event sourcing tables)
# WARNING: This will lose all event history
bun run db:migrate:rollbackD1 Deployment
Migration to D1
bash
# Create D1 database
wrangler d1 create monotask-production
# Apply migrations
wrangler d1 migrations apply monotask-production
# Import existing SQLite events (if migrating)
wrangler d1 execute monotask-production --file=events_export.sqlD1 Limitations and Workarounds
Limitation 1: Query Timeout (30s)
- Workaround: Use snapshots to avoid long event replays
- Workaround: Batch large queries into smaller chunks
Limitation 2: 100 Bind Parameters
- Workaround: Use JSON columns for complex data
- Workaround: Batch large inserts into multiple statements
Limitation 3: No Full-Text Search
- Workaround: Use LIKE queries with indexes
- Workaround: Consider external search service for event search
Monitoring and Observability
Key Metrics
typescript
// Event throughput
SELECT
DATE(event_timestamp) as date,
COUNT(*) as event_count,
COUNT(DISTINCT aggregate_id) as aggregate_count
FROM event_store
GROUP BY DATE(event_timestamp)
ORDER BY date DESC;
// Snapshot efficiency
SELECT
aggregate_type,
COUNT(*) as snapshot_count,
AVG(event_count) as avg_events_per_snapshot,
AVG(compression_ratio) as avg_compression
FROM aggregate_snapshots
GROUP BY aggregate_type;
// Actor activity
SELECT
actor_type,
actor_id,
COUNT(*) as event_count,
MAX(event_timestamp) as last_activity
FROM event_store
GROUP BY actor_type, actor_id
ORDER BY event_count DESC;Testing
Unit Testing Event Sourcing
typescript
import { describe, it, expect } from 'bun:test';
import { createEventSourcingService } from '@monotask/core/event-sourcing/event-sourcing-service';
describe('Event Sourcing', () => {
it('should record state transitions', async () => {
const service = createEventSourcingService(testAdapter);
await service.recordStateTransition({
taskId: 'test-001',
fromState: 'PENDING',
toState: 'IN_PROGRESS',
transitionType: 'manual',
triggerType: 'user',
actorType: 'user',
actorId: 'user-001'
});
const history = await service.getTaskEventHistory('test-001');
expect(history.stateTransitions).toHaveLength(1);
expect(history.stateTransitions[0].to_state).toBe('IN_PROGRESS');
});
it('should rebuild state from events', async () => {
// Record multiple transitions
await recordMultipleTransitions();
// Rebuild state
const rebuilt = await service.rebuildTaskState('test-001');
expect(rebuilt.currentState.current_state).toBe('COMPLETED');
expect(rebuilt.version).toBeGreaterThan(0);
expect(rebuilt.eventCount).toBeGreaterThan(0);
});
});Conclusion
MonoTask's event sourcing system provides:
✅ Complete audit trail of all state changes ✅ State reconstruction at any point in time ✅ Performance optimization via snapshots ✅ D1 compatibility with optimized indexes ✅ Integration with existing state machine ✅ Debugging capabilities via event replay
For advanced usage and customization, see the API documentation in:
/packages/core/src/event-sourcing/event-sourcing-service.ts/packages/shared/src/repositories/event-store-repository.ts