Skip to content

Event Sourcing Guide

Overview

MonoTask's event sourcing system provides a complete audit trail of all state changes, actions, and system events. This document explains the architecture, usage patterns, and performance considerations.

Architecture

Core Components

┌─────────────────────────────────────────────────────────────────┐
│                      Event Sourcing System                       │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  ┌──────────────┐    ┌──────────────┐    ┌──────────────┐     │
│  │ Event Store  │───▶│  Snapshots   │───▶│ Projections  │     │
│  │ (Append-Only)│    │ (Performance)│    │  (Queries)   │     │
│  └──────────────┘    └──────────────┘    └──────────────┘     │
│         │                    │                   │              │
│         └────────────────────┴───────────────────┘              │
│                              │                                  │
│                    ┌─────────▼─────────┐                       │
│                    │ State Reconstruction│                      │
│                    └──────────────────────┘                     │
└─────────────────────────────────────────────────────────────────┘

Database Schema

1. Event Store Table

The central append-only log of all events.

sql
CREATE TABLE event_store (
  event_id INTEGER PRIMARY KEY AUTOINCREMENT,
  event_uuid TEXT NOT NULL UNIQUE,
  event_type TEXT NOT NULL,
  event_version INTEGER NOT NULL DEFAULT 1,

  -- Aggregate information
  aggregate_type TEXT NOT NULL,
  aggregate_id TEXT NOT NULL,
  aggregate_version INTEGER NOT NULL,

  -- Event metadata
  event_timestamp DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
  correlation_id TEXT,
  causation_id TEXT,

  -- Actor information
  actor_type TEXT NOT NULL,
  actor_id TEXT,

  -- Event payload
  event_data JSON NOT NULL,
  metadata JSON DEFAULT '{}',

  -- State snapshots (optional)
  state_before JSON,
  state_after JSON
);

Key Design Decisions:

  • Append-only: Events are never updated or deleted (immutability)
  • Aggregate versioning: Each event increments the aggregate version
  • Correlation/causation tracking: Link related events across the system
  • State snapshots: Optional before/after state for quick queries

2. Specialized Event Tables

State Machine Events:

sql
CREATE TABLE state_machine_events (
  event_uuid TEXT NOT NULL UNIQUE,
  task_id TEXT NOT NULL,
  from_state TEXT NOT NULL,
  to_state TEXT NOT NULL,
  transition_type TEXT NOT NULL,
  triggered_by TEXT NOT NULL,
  trigger_type TEXT NOT NULL,
  reason TEXT,
  conditions_checked JSON,
  conditions_passed JSON,
  conditions_failed JSON,
  event_timestamp DATETIME NOT NULL,
  event_store_id INTEGER -- Links to event_store
);

Task Action Events:

sql
CREATE TABLE task_action_events (
  action_uuid TEXT NOT NULL UNIQUE,
  task_id TEXT NOT NULL,
  action_type TEXT NOT NULL,
  action_category TEXT NOT NULL,
  actor_type TEXT NOT NULL,
  actor_id TEXT,
  action_data JSON NOT NULL,
  changes JSON,
  action_timestamp DATETIME NOT NULL,
  event_store_id INTEGER
);

System Events:

sql
CREATE TABLE system_events (
  event_uuid TEXT NOT NULL UNIQUE,
  event_type TEXT NOT NULL,
  event_category TEXT NOT NULL,
  severity TEXT NOT NULL,
  event_data JSON NOT NULL,
  error_message TEXT,
  stack_trace TEXT,
  event_timestamp DATETIME NOT NULL,
  event_store_id INTEGER
);

3. Aggregate Snapshots

Snapshots optimize performance by storing aggregate state at specific versions.

sql
CREATE TABLE aggregate_snapshots (
  snapshot_id INTEGER PRIMARY KEY AUTOINCREMENT,
  snapshot_uuid TEXT NOT NULL UNIQUE,
  aggregate_type TEXT NOT NULL,
  aggregate_id TEXT NOT NULL,
  aggregate_version INTEGER NOT NULL,
  snapshot_data JSON NOT NULL,
  snapshot_timestamp DATETIME NOT NULL,
  first_event_id INTEGER NOT NULL,
  last_event_id INTEGER NOT NULL,
  event_count INTEGER NOT NULL,
  created_by TEXT NOT NULL DEFAULT 'system'
);

Snapshot Strategy:

  • Create snapshot every 50 state transitions (configurable)
  • Store complete aggregate state at that point
  • Enable fast state reconstruction without replaying all events

Usage Patterns

1. Recording State Transitions

Automatic (via EventSourcedStateMachine):

typescript
import { createEventSourcedStateMachine } from '@monotask/core/state-machine/event-sourcing-integration';
import { createSQLiteAdapter } from '@monotask/shared/database/sqlite-adapter';
import { Database } from 'bun:sqlite';

const db = new Database('.monotask/project.db');
const adapter = createSQLiteAdapter(db);
const stateMachine = createEventSourcedStateMachine(db, adapter);

// State transition automatically records event
await stateMachine.transition(
  taskId,
  TaskState.IMPLEMENTING,
  {
    triggeredBy: 'agent-001',
    reason: 'All tests passing, starting implementation',
    metadata: {
      correlationId: 'workflow-123',
      testResults: { passed: 10, failed: 0 }
    }
  }
);

Manual (via EventSourcingService):

typescript
import { createEventSourcingService } from '@monotask/core/event-sourcing/event-sourcing-service';

const eventService = createEventSourcingService(adapter);

await eventService.recordStateTransition({
  taskId: 'task-001',
  fromState: 'WRITING_TESTS',
  toState: 'TESTS_COMPLETE',
  transitionType: 'automatic',
  triggerType: 'condition',
  actorType: 'system',
  reason: 'All test files created and passing',
  conditionsChecked: ['test_files_exist', 'tests_passing'],
  conditionsPassed: ['test_files_exist', 'tests_passing'],
  conditionsFailed: []
});

2. Recording Task Actions

typescript
await stateMachine.recordTaskAction(
  taskId,
  'test_added',
  'testing',
  {
    testFile: 'packages/core/src/validation.test.ts',
    testCount: 5,
    testSuite: 'ValidationEngine'
  },
  'agent',
  'agent-001',
  {
    filesAdded: ['packages/core/src/validation.test.ts'],
    linesAdded: 120
  }
);

3. Recording System Events

typescript
await eventService.recordSystemEvent(
  'worker_failed',
  'worker',
  'error',
  {
    workerId: 'worker-validation-001',
    taskId: 'task-001',
    errorType: 'ValidationError',
    attemptCount: 3
  },
  {
    message: 'Validation worker exceeded timeout',
    stack: error.stack
  }
);

4. Retrieving Event History

Get complete audit trail:

typescript
const auditTrail = await stateMachine.getTaskAuditTrail(taskId);

// Returns:
// [
//   {
//     timestamp: Date,
//     eventType: 'state_transition',
//     actor: 'agent-001',
//     description: 'State changed from PENDING to ELICITATION',
//     details: { transitionType: 'automatic', reason: '...' }
//   },
//   {
//     timestamp: Date,
//     eventType: 'action',
//     actor: 'agent-001',
//     description: 'Action: test_added',
//     details: { category: 'testing', actionData: {...} }
//   }
// ]

Get specific event types:

typescript
const history = await stateMachine.getTaskEventHistory(taskId);

console.log('State Transitions:', history.stateTransitions);
console.log('Actions:', history.actions);
console.log('All Events:', history.allEvents);

5. Event Replay and State Reconstruction

Rebuild current state from events:

typescript
const rebuilt = await stateMachine.rebuildTaskFromEvents(taskId);

console.log('Current State:', rebuilt.currentState);
console.log('Version:', rebuilt.version);
console.log('Event Count:', rebuilt.eventCount);

Get state at a specific timestamp:

typescript
const pastTimestamp = new Date('2025-10-01T12:00:00Z');
const pastState = await stateMachine.getTaskStateAtTimestamp(taskId, pastTimestamp);

console.log('State at', pastTimestamp, ':', pastState);

6. Manual Snapshot Creation

typescript
// Manually create a snapshot (useful before major operations)
await stateMachine.createTaskSnapshot(taskId);

Performance Optimizations

1. Indexes for D1

The schema includes optimized indexes for common query patterns:

sql
-- Aggregate queries (most common)
CREATE INDEX idx_event_store_aggregate
  ON event_store(aggregate_type, aggregate_id, aggregate_version);

-- Temporal queries
CREATE INDEX idx_event_store_timestamp
  ON event_store(event_timestamp DESC);

-- Event type queries
CREATE INDEX idx_event_store_type
  ON event_store(event_type);

-- Correlation tracking
CREATE INDEX idx_event_store_correlation
  ON event_store(correlation_id)
  WHERE correlation_id IS NOT NULL;

-- Actor activity queries
CREATE INDEX idx_event_store_actor
  ON event_store(actor_type, actor_id);

2. Snapshot Strategy

Configuration:

typescript
const eventService = createEventSourcingService(adapter, {
  snapshotThreshold: 50,          // Snapshot every 50 events
  enableAutomaticSnapshots: true,  // Auto-create snapshots
  enableProjectionUpdates: true,   // Update projections
  batchSize: 1000,                 // Batch size for replay
  retentionDays: 365              // Keep events for 1 year
});

Benefits:

  • Fast state reconstruction: Load snapshot + recent events (vs. all events)
  • Reduced query time: Avoid replaying 1000+ events
  • Configurable trade-offs: More snapshots = faster reads, more storage

Example Performance:

  • Without snapshots: Rebuild from 1000 events = ~500ms
  • With snapshots (every 50): Load snapshot + 50 events = ~50ms
  • 10x faster for long-running tasks

3. D1-Specific Optimizations

Batch Operations:

typescript
// Instead of N individual inserts
const events = [event1, event2, event3, ...];
await repository.appendEventsBatch(events); // Single atomic batch

Projection Materialization: Use the event_projections table to cache expensive aggregations:

typescript
// Materialized view of task state summary
const projection = await repository.getProjection('task_state_summary');

// Projection data is pre-computed and cached
console.log(projection.projection_data);

Integration with State Machine

Automatic Event Recording

The EventSourcedStateMachine extends the base StateMachine to automatically record events:

typescript
class EventSourcedStateMachine extends StateMachine {
  async transition(taskId, toState, context) {
    const fromState = this.getCurrentState(taskId);

    // Execute transition
    const result = await super.transition(taskId, toState, context);

    // If successful, record event
    if (result.success) {
      await this.recordTransitionEvent(taskId, fromState, toState, context, result);
    }

    return result;
  }
}

Benefits:

  • No code changes required in existing transition logic
  • Events recorded automatically for all state changes
  • Complete audit trail without manual instrumentation

Triggers for Event Propagation

Database triggers automatically propagate events to event_store:

sql
CREATE TRIGGER tr_sm_event_to_event_store
AFTER INSERT ON state_machine_events
BEGIN
  INSERT INTO event_store (...) VALUES (...);
  UPDATE state_machine_events SET event_store_id = last_insert_rowid() ...;
END;

Flow:

  1. State transition creates state_machine_events row
  2. Trigger fires and creates event_store row
  3. Foreign key links the two
  4. Projection rebuild flags are set

Best Practices

1. Event Naming Conventions

typescript
// Good: Descriptive, past tense
'task_created'
'state_transitioned'
'test_added'
'implementation_completed'

// Bad: Vague or present tense
'task'
'transition'
'add_test'

2. Event Data Structure

Include enough context for replay:

typescript
{
  event_data: {
    // What changed
    from_state: 'WRITING_TESTS',
    to_state: 'TESTS_COMPLETE',

    // Why it changed
    reason: 'All 10 tests passing',

    // Who triggered it
    triggered_by: 'agent-001',

    // Context for replay
    test_results: { passed: 10, failed: 0 },
    test_files: ['test1.ts', 'test2.ts']
  }
}

3. Correlation and Causation

Link related events:

typescript
// Workflow-level correlation
const correlationId = EventSourcingService.generateCorrelationId();

// Event 1: Test writing started
await recordTaskAction({ ..., correlationId });

// Event 2: Test completed (same correlation)
await recordTaskAction({ ..., correlationId });

// Event 3: State transition caused by Event 2
await recordStateTransition({
  ...,
  correlationId,
  causationId: testCompletedEventId
});

4. Error Handling

typescript
try {
  await stateMachine.transition(taskId, newState, context);
} catch (error) {
  // Record failure as system event
  await eventService.recordSystemEvent(
    'state_transition_failed',
    'worker',
    'error',
    {
      taskId,
      attemptedTransition: { from: oldState, to: newState },
      context
    },
    {
      message: error.message,
      stack: error.stack
    }
  );

  throw error;
}

Querying Events

Views for Common Queries

Latest state for each aggregate:

sql
SELECT * FROM v_aggregate_latest_state
WHERE aggregate_type = 'task';

Task state timeline:

sql
SELECT * FROM v_task_state_timeline
WHERE task_id = 'task-001'
ORDER BY event_timestamp;

Event statistics:

sql
SELECT * FROM v_event_statistics;

Migration Notes

Applying the Migration

bash
# Run the event sourcing migration
bun run db:migrate

# Verify tables created
sqlite3 .monotask/project.db "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE '%event%'"

Rollback Procedure

bash
# Rollback if needed (drops all event sourcing tables)
# WARNING: This will lose all event history
bun run db:migrate:rollback

D1 Deployment

Migration to D1

bash
# Create D1 database
wrangler d1 create monotask-production

# Apply migrations
wrangler d1 migrations apply monotask-production

# Import existing SQLite events (if migrating)
wrangler d1 execute monotask-production --file=events_export.sql

D1 Limitations and Workarounds

Limitation 1: Query Timeout (30s)

  • Workaround: Use snapshots to avoid long event replays
  • Workaround: Batch large queries into smaller chunks

Limitation 2: 100 Bind Parameters

  • Workaround: Use JSON columns for complex data
  • Workaround: Batch large inserts into multiple statements

Limitation 3: No Full-Text Search

  • Workaround: Use LIKE queries with indexes
  • Workaround: Consider external search service for event search

Monitoring and Observability

Key Metrics

typescript
// Event throughput
SELECT
  DATE(event_timestamp) as date,
  COUNT(*) as event_count,
  COUNT(DISTINCT aggregate_id) as aggregate_count
FROM event_store
GROUP BY DATE(event_timestamp)
ORDER BY date DESC;

// Snapshot efficiency
SELECT
  aggregate_type,
  COUNT(*) as snapshot_count,
  AVG(event_count) as avg_events_per_snapshot,
  AVG(compression_ratio) as avg_compression
FROM aggregate_snapshots
GROUP BY aggregate_type;

// Actor activity
SELECT
  actor_type,
  actor_id,
  COUNT(*) as event_count,
  MAX(event_timestamp) as last_activity
FROM event_store
GROUP BY actor_type, actor_id
ORDER BY event_count DESC;

Testing

Unit Testing Event Sourcing

typescript
import { describe, it, expect } from 'bun:test';
import { createEventSourcingService } from '@monotask/core/event-sourcing/event-sourcing-service';

describe('Event Sourcing', () => {
  it('should record state transitions', async () => {
    const service = createEventSourcingService(testAdapter);

    await service.recordStateTransition({
      taskId: 'test-001',
      fromState: 'PENDING',
      toState: 'IN_PROGRESS',
      transitionType: 'manual',
      triggerType: 'user',
      actorType: 'user',
      actorId: 'user-001'
    });

    const history = await service.getTaskEventHistory('test-001');
    expect(history.stateTransitions).toHaveLength(1);
    expect(history.stateTransitions[0].to_state).toBe('IN_PROGRESS');
  });

  it('should rebuild state from events', async () => {
    // Record multiple transitions
    await recordMultipleTransitions();

    // Rebuild state
    const rebuilt = await service.rebuildTaskState('test-001');

    expect(rebuilt.currentState.current_state).toBe('COMPLETED');
    expect(rebuilt.version).toBeGreaterThan(0);
    expect(rebuilt.eventCount).toBeGreaterThan(0);
  });
});

Conclusion

MonoTask's event sourcing system provides:

Complete audit trail of all state changes ✅ State reconstruction at any point in time ✅ Performance optimization via snapshots ✅ D1 compatibility with optimized indexes ✅ Integration with existing state machine ✅ Debugging capabilities via event replay

For advanced usage and customization, see the API documentation in:

  • /packages/core/src/event-sourcing/event-sourcing-service.ts
  • /packages/shared/src/repositories/event-store-repository.ts

MonoKernel MonoTask Documentation