🏡/repos/earendil-works/

absurd

🐘

Absurd

PostgreSQL-native durable execution workflows

It's entirely based on Postgres and nothing else... because it's absurd how much you can over-design such a simple thing.

PostgreSQLTypeScriptPython
💡

Why Absurd?

Push complexity to the database

One Dependency

Only PostgreSQL. No Redis, no Kafka, no external coordinator. Deploy with your existing database.

Crash Resilient

Every step checkpointed. Workers can die and restart without losing progress or duplicating work.

Language Agnostic

TypeScript and Python SDKs. Any language with a Postgres driver can implement a worker.

🧠

Core Concepts

Tasks & Steps

  • Task - Logical unit of work (survives retries)
  • Run - Single execution attempt of a task
  • Step - Checkpointed operation within a task
  • Checkpoint - Cached step result (never re-executed)

Events & Suspension

  • sleepFor/sleepUntil - Suspend until time passes
  • awaitEvent - Suspend until named event fires
  • emitEvent - Wake sleeping tasks atomically
  • Queue - Isolated namespace for tasks

Execution Flow

Task Spawned
Worker Claims
Steps Execute
Complete/Retry
🗄️

Database Schema

Per-queue table structure

Tables per Queue

t_{queue}

Tasks

id
state
params
result
r_{queue}

Runs

task_id
worker_id
claim_expires_at
c_{queue}

Checkpoints

task_id
step_name
value
e_{queue}

Events

name
payload
emitted_at
w_{queue}

Wait Regs

task_id
event_name
timeout_at

Key Stored Procedures

spawn_taskCreate task + initial run
claim_taskReserve tasks with lease
complete_runMark successful completion
fail_runHandle failure + schedule retry
set_task_checkpoint_statePersist step result
get_task_checkpoint_statesRetrieve cached results
await_eventRegister event wait
emit_eventWake sleeping tasks
extend_claimHeartbeat to keep lease
cancel_taskGraceful cancellation
📦

TypeScript SDK

Define and run durable tasks

Basic Task Definition

import { Absurd } from 'absurd-sdk'

const app = new Absurd()

app.registerTask({ name: 'order-fulfillment' }, async (params, ctx) => {
  // Checkpointed step - won't re-execute on retry
  const payment = await ctx.step('process-payment', async () => {
    return await stripe.charges.create({ amount: params.amount })
  })

  // Task suspends until event fires
  const shipment = await ctx.awaitEvent(`shipment.packed:${params.orderId}`)

  // Another checkpointed step
  await ctx.step('send-notification', async () => {
    return await sendEmail(params.email, shipment)
  })

  return { payment, shipment }
})

Spawning & Running

// Spawn a new task
const taskId = await app.spawn('order-fulfillment', {
  orderId: '42',
  amount: 9999,
  email: 'customer@example.com'
})

// Start worker (claims and executes tasks)
await app.startWorker({
  concurrency: 4,      // Parallel task execution
  claimTimeout: 120,   // Lease duration in seconds
  pollInterval: 1000   // How often to poll for tasks
})

Event Emission

// In webhook handler after warehouse packs order
app.emitEvent(`shipment.packed:${orderId}`, {
  trackingNumber: 'TRACK123',
  carrier: 'USPS'
})

// The task waiting on this event will resume automatically
// Event payload is delivered to ctx.awaitEvent() caller
🔧

TaskContext API

Available within task handlers

Methods

ctx.step(name, fn)

Execute checkpointed step. Cached on retry.

ctx.sleepFor(name, seconds)

Suspend task for duration.

ctx.sleepUntil(name, date)

Suspend until specific time.

ctx.awaitEvent(name, timeout?)

Suspend until event fires.

ctx.heartbeat()

Extend lease for long operations.

Properties

ctx.taskId

Current task's unique ID.

ctx.runId

Current run attempt ID.

ctx.idempotencyKey(suffix)

Derive key for external calls.

Step Name Handling

Duplicate step names auto-increment: stepstep#2step#3

Sleep & Events

Task suspension patterns

Sleep Example

app.registerTask({ name: 'delayed-followup' }, async (params, ctx) => {
  // Do initial work
  const result = await ctx.step('send-welcome', async () => {
    return await sendWelcomeEmail(params.email)
  })

  // Task suspends for 24 hours (no worker resources used)
  await ctx.sleepFor('wait-24h', 60 * 60 * 24)

  // Automatically resumes after sleep
  await ctx.step('send-followup', async () => {
    return await sendFollowupEmail(params.email)
  })
})

Event with Timeout

app.registerTask({ name: 'payment-confirmation' }, async (params, ctx) => {
  // Wait up to 1 hour for payment webhook
  const payment = await ctx.awaitEvent(
    `payment.confirmed:${params.invoiceId}`,
    60 * 60 // timeout in seconds
  )

  if (payment === null) {
    // Timeout expired - no event received
    await ctx.step('cancel-order', async () => {
      return await cancelOrder(params.orderId)
    })
    return { status: 'cancelled' }
  }

  // Event received with payload
  await ctx.step('fulfill-order', async () => {
    return await fulfillOrder(params.orderId, payment)
  })
  return { status: 'fulfilled', payment }
})
🔄

Retry & Cancellation

Retry Strategies

app.registerTask({
  name: 'flaky-api-call',
  retry: {
    strategy: 'exponential', // or 'fixed', 'none'
    maxAttempts: 5,
    initialDelay: 1000,      // ms
    maxDelay: 60000          // ms
  }
}, async (params, ctx) => {
  // Task will retry on failure with backoff
  return await unreliableApiCall(params)
})

Cancellation Policy

app.registerTask({
  name: 'time-sensitive',
  cancellation: {
    maxDelay: 300,      // Cancel if pending > 5 min
    maxDuration: 3600   // Cancel if running > 1 hour
  }
}, async (params, ctx) => {
  // Task auto-cancelled if exceeds limits
  // Checked at claim time
})
🤖

AI Agent Example

Durable agent loops with tool calls

Perfect for AI Agents

Each reasoning step checkpointed. Tool call results cached. Long conversations survive crashes.

Agent Loop Pattern

import { generateText, tool } from 'ai'
import { anthropic } from '@ai-sdk/anthropic'

app.registerTask({ name: 'ai-agent' }, async (params, ctx) => {
  const messages = [{ role: 'user', content: params.prompt }]
  let stepCount = 0

  while (true) {
    // Each LLM call is checkpointed
    const response = await ctx.step(`llm-call-${stepCount++}`, async () => {
      return await generateText({
        model: anthropic('claude-sonnet-4-20250514'),
        messages,
        tools: {
          search: tool({
            description: 'Search the web',
            parameters: z.object({ query: z.string() }),
            execute: async ({ query }) => searchWeb(query)
          }),
          calculate: tool({
            description: 'Do math',
            parameters: z.object({ expression: z.string() }),
            execute: async ({ expression }) => eval(expression)
          })
        }
      })
    })

    messages.push({ role: 'assistant', content: response.text })

    // If no tool calls, agent is done
    if (!response.toolCalls?.length) {
      return { result: response.text, steps: stepCount }
    }

    // Tool results also checkpointed
    for (const call of response.toolCalls) {
      messages.push({
        role: 'tool',
        content: JSON.stringify(call.result),
        toolCallId: call.id
      })
    }
  }
})
🐍

Python SDK

from absurd_sdk import Absurd

app = Absurd("postgresql://localhost/absurd")

@app.register_task(name="order-fulfillment")
def process_order(params, ctx):
    # Checkpointed step
    payment = ctx.step("process-payment", lambda: stripe_charge(params["amount"]))

    # Wait for external event
    shipment = ctx.await_event(f"shipment.packed:{params['order_id']}")

    # Another step
    ctx.step("send-notification", lambda: send_email(params["email"], shipment))

    return {"payment": payment, "shipment": shipment}

# Emit event from external system
app.emit_event(f"shipment.packed:{order_id}", {"tracking": "TRACK123"})

# Start worker
app.start_worker()
🖥️

CLI & Operations

absurdctl Commands

# Initialize schema in database
./absurdctl init -d mydb

# Create a queue
./absurdctl create-queue -d mydb default

# List queues
./absurdctl list-queues -d mydb

# Cleanup old runs (keep last 7 days)
./absurdctl cleanup default 7

# Drop a queue
./absurdctl drop-queue -d mydb default

# Generate agent-friendly help
./absurdctl agent-help >> AGENTS.md

Habitat UI

Web dashboard for monitoring tasks, built with Go + SolidJS.

# Start Habitat UI
./bin/habitat run -db-name mydb

# Opens http://localhost:7890
# View tasks, runs, checkpoints, events
🏗️

Architecture

Project Structure

Core

sql/absurd.sql1,337 lines - Schema & procedures
absurdctlCLI tool (Python)
tests/Core system tests

SDKs & UI

sdks/typescript/954 lines - Main SDK
sdks/python/1,272 lines - Alternative
habitat/Go + SolidJS dashboard

Pull-Based Worker Model

No Coordinator

Workers poll claim_task independently. No central process needed.

Lease-Based Claims

Tasks locked with expiry. Crashed workers auto-release.

Natural Load Control

Workers claim only what they can process. Self-regulating.

🔬

Notable Implementation Details

UUIDv7 Generation

Tasks and runs use UUIDv7 for time-ordered IDs. Custom portable_uuidv7() for Postgres <18.

Dynamic SQL

Queue tables created via format() + execute. No static schema per queue.

Fake Time for Testing

Session variable absurd.fake_now enables deterministic time control in tests.

Implicit Heartbeats

Every checkpoint write extends the claim lease. No separate heartbeat needed.

Claim-Time Cancellation

Cancellation policy checked in claim_task. No separate cancellation loop.

Race-Free Events

Events emitted + tasks woken atomically. Payload stored in run record.

💎

Key Takeaways

Complexity in DB, simplicity in SDK

SQL handles all distributed coordination. SDKs are thin wrappers (~950 lines). Language-agnostic by design.

Checkpoints as idempotency

Different from retry-only systems. Each step cached independently. Code outside steps can be non-deterministic.

No external dependencies

Only Postgres + language runtime. Deploy with your app or existing database. Self-hostable, no SaaS.

AI-friendly design

Perfect for agent loops. Persists reasoning traces naturally. Tool call results cached automatically.