🏑/lib/

Effect

⚑

Effect

A powerful TypeScript framework for building robust, type-safe applications with comprehensive effect management, concurrency, and error handling

What is Effect?

Effect represents computations as values that describe what to do, not as imperative code that does it. This enables composition, testing, and control that's impossible with Promises or async/await.

🎯

Effect Management

Structured handling of side effects with complete type safety

⚑

Concurrency

Lightweight fiber-based concurrency (not OS threads)

πŸ›‘οΈ

Error Handling

Lossless, typed error capture via Causes

πŸ’‰

Dependency Injection

Type-safe service management through Contexts and Layers

The Effect Type

Effect<A, E = never, R = never>
A

Success Type

The value returned when the effect succeeds

E

Error Type

Typed, checked errors (not exceptions)

R

Requirements

Dependencies/environment needed to run

Effect vs Promise

AspectEffectPromise
ExecutionLazy (described, not executed)Eager (executes immediately)
Error TypeTyped (part of signature)any (untyped exceptions)
CancellationFirst-class, resource-safeNot native, requires AbortSignal
DependenciesImplicit via R typeMust pass manually
TestingIntrospectable as valuesMust execute
CompositionTrivial, typesafeComplex, runtime errors

Basic Example

Effect (lazy, composable, typed)
const effect: Effect<number, never, never> =
  Effect.succeed(1).pipe(
    Effect.flatMap(n => Effect.succeed(n + 1))
  )

// Nothing executed yet!
// Run explicitly:
Effect.runPromise(effect) // => 2
Promise (eager, less composable)
const promise: Promise<number> =
  Promise.resolve(1)
    .then(n => Promise.resolve(n + 1))

// Already executing!
// Can't inspect or transform
// without running

Error Handling: Causes & Typed Errors

Effect captures all failure information in a lossless Cause<E> data type:

Cause Types

  • Fail<E> β€” Typed application error
  • Die β€” Unexpected error/defect (exceptions)
  • Interrupt β€” Fiber interruption
  • Sequential β€” Sequential combination of causes
  • Parallel β€” Parallel combination of causes

Error Handling Operators

  • Effect.catch β€” Handle specific typed errors
  • Effect.catchAll β€” Handle all errors uniformly
  • Effect.catchTags β€” Handle discriminated union errors
  • Effect.mapError β€” Transform error type
  • Effect.either β€” Convert to Either<E, A>
Error Handling Pattern
Effect.gen(function*() {
  const user = yield* getUser(id)
  return user
}).pipe(
  Effect.catchTags({
    NotFoundError: (e) => Effect.succeed(null),
    ValidationError: (e) => Effect.fail(new AppError(e.message))
  })
)

Dependency Injection & Context

A Context is a lightweight, immutable map of service implementations keyed by Tags:

Type-Safe Service Management
// Define a service tag
const Logger = Context.Tag<Logger>()

// Service implementation
const loggerLive = Layer.succeed(Logger, {
  log: (msg: string) => Effect.sync(() => console.log(msg))
})

// Use in an effect
const program = Effect.gen(function*() {
  const logger = yield* Logger
  yield* logger.log("Hello")
})

// Provide the implementation
const runnable = program.pipe(
  Effect.provide(loggerLive)
)

Layers: Service Recipes

Layer<ROut, E, RIn>
ROut

Services this layer provides

E

Errors building services

RIn

Dependencies required

Layer Composition
// Layer defining a database service
const Database = Context.Tag<Database>()

const databaseLive = Layer.scoped(
  Database,
  Effect.gen(function*() {
    const conn = yield* Effect.sync(() => pool.getConnection())
    yield* Effect.addFinalizer(() =>
      Effect.sync(() => conn.close())
    )
    return conn
  })
)

// Layer defining repository that needs database
const UserRepository = Context.Tag<UserRepository>()

const userRepositoryLive = Layer.effect(
  UserRepository,
  Effect.gen(function*() {
    const db = yield* Database
    return new UserRepository(db)
  })
)

// Compose layers - dependencies auto-wired!
const AppLayers = Layer.merge(databaseLive, userRepositoryLive)

Fiber: Lightweight Concurrency

A Fiber<A, E> is a lightweight thread of execution that never consumes a full OS thread.

Fiber Properties

  • β€’ Suspendable and resumable
  • β€’ Can be interrupted safely with resource cleanup
  • β€’ Can be joined to wait for results
  • β€’ Each gets a unique FiberId for tracing

Fiber Operations

  • Effect.fork β€” Spawn background fiber
  • fiber.join β€” Wait for result
  • fiber.interrupt β€” Cancel fiber
  • Effect.race β€” First to complete wins
Concurrent Processing
// Fork: spawn as background fiber
const fiber = yield* Effect.fork(longRunningTask)

// Do other work...

// Join: wait for result
const result = yield* fiber.join

// Race: first to complete wins
const first = yield* Effect.race(effect1, effect2)

// Process with limited parallelism
const results = yield* Effect.all(items.map(processItem), {
  concurrency: 4
})

Concurrency Primitives

Queue<A>

Bounded/unbounded queue for producer-consumer patterns

const queue = yield* Queue.bounded<Msg>(100)
yield* queue.offer(message)
const msg = yield* queue.take

Ref<A>

Mutable reference for shared state between fibers

const counter = yield* Ref.make(0)
yield* counter.update(n => n + 1)
const value = yield* counter.get

Deferred<A, E>

Promise-like synchronization, set once

const deferred = yield* Deferred.make<number>()
// In fiber 1:
yield* deferred.await
// In fiber 2:
yield* deferred.succeed(42)

Scope: Resource Management

A Scope manages resource lifecycles with finalizers that run even on failure.

Acquire-Release Pattern
const withConnection = Effect.scoped(
  Effect.gen(function*() {
    // Acquire resource
    const conn = yield* acquireConnection()

    // Register cleanup (runs on success, failure, or interruption)
    yield* Effect.addFinalizer(() =>
      releaseConnection(conn)
    )

    // Use resource
    return yield* doWorkWith(conn)
  })
)

// Resources automatically cleaned up!
const result = yield* withConnection

Stream: Pull-Based Async Data

A Stream<A, E, R> is a description of an asynchronous program that emits zero or more values with backpressure support.

Stream Characteristics

  • β€’ Pull-Based: Consumer pulls data (backpressure)
  • β€’ Lazy: Streaming is lazy and composable
  • β€’ Chunked: Emits arrays for efficiency
  • β€’ Typed Errors: Full error handling like Effect
  • β€’ Resource-Safe: Automatic cleanup

Sink: Stream Consumer

  • Sink.fold β€” fold/reduce
  • Sink.collect β€” collect all
  • Sink.drain β€” discard all
  • Sink.head β€” first element
  • Sink.take(n) β€” first n elements
Stream Processing
Stream.range(1, 1000).pipe(
  Stream.map(x => x * 2),
  Stream.filter(x => x % 3 === 0),
  Stream.take(100),
  Stream.grouped(10),  // group into chunks of 10
  Stream.run(Sink.collect()),
  Effect.runPromise
)

Schema: Validation & Serialization

Schema<A, I, R> enables type-safe data validation and transformation.

Schema Definition & Usage
import { Schema } from "@effect/schema"

// Define schema
const User = Schema.struct({
  id: Schema.number,
  name: Schema.string,
  email: Schema.string.pipe(
    Schema.filter(email => email.includes('@'), {
      message: () => 'Invalid email'
    })
  ),
  age: Schema.optional(Schema.number)
})

// Derive type from schema
type User = Schema.Type<typeof User>

// Parse (decode/validate)
const decoded = Schema.decode(User)(jsonData)
// Type: Effect<User, ParseError, never>

// Encode (serialize)
const encoded = Schema.encode(User)(userData)
// Type: Effect<unknown, ParseError, never>

Schedule: Retry & Repeat Policies

Retry with Exponential Backoff
// Define a schedule
const exponentialBackoff = Schedule.exponential(
  Duration.millis(10),  // initial delay
  2.0                    // factor
).pipe(
  Schedule.either(Schedule.upTo(Duration.seconds(10)))  // max duration
)

// Retry on failure
const result = yield* Effect.retry(fetchData, exponentialBackoff)

// Common schedules
Schedule.recurs(5)              // retry 5 times
Schedule.linear(Duration.seconds(1))   // fixed delay
Schedule.fibonacci(Duration.millis(100)) // fibonacci delays
Schedule.forever                 // infinite retries

STM: Software Transactional Memory

STM<A, E, R> enables atomic, composable transactions without locks.

STM Components

  • TRef<A> β€” Transactional variable
  • TMap<K, V> β€” Transactional map
  • TQueue<A> β€” Transactional queue
  • TSemaphore β€” Transactional semaphore
Atomic Transaction
const transfer = (from: TRef<number>,
                 to: TRef<number>,
                 amount: number) =>
  STM.gen(function*() {
    const balance = yield* from.get
    if (balance < amount) {
      yield* STM.fail("Insufficient funds")
    }
    yield* from.set(balance - amount)
    yield* to.update(n => n + amount)
  }).pipe(STM.commit)

Key Modules

Effect
Core computation and composition
Context
Type-safe dependency tagging
Layer
Service composition and lifecycle
Fiber
Lightweight concurrency primitives
Stream
Pull-based data streaming
Sink
Stream consumption and reduction
Queue
Message passing and buffering
Ref
Mutable shared state
Deferred
One-time async completion
Scope
Resource lifecycle management
Cause
Comprehensive error information
Schema
Data validation and serialization
Schedule
Retry and repeat policies
STM
Transactional memory (lock-free)
Pool
Resource pooling and reuse
Request
Batching and deduplication

Complete Example: Real-World Pattern

Production-Ready Service
// Define services
const Database = Context.Tag<Database>()
const Logger = Context.Tag<Logger>()
const UserRepo = Context.Tag<UserRepo>()

// Define errors
class UserNotFound extends Data.TaggedError("UserNotFound")<{
  id: string
}> {}

class ValidationError extends Data.TaggedError("ValidationError")<{
  message: string
}> {}

// Business logic with typed errors and dependencies
const getUser = (id: string) =>
  Effect.gen(function*() {
    const repo = yield* UserRepo
    const logger = yield* Logger

    yield* logger.info(`Fetching user ${id}`)

    const user = yield* repo.findById(id).pipe(
      Effect.flatMap(Option.match({
        onNone: () => Effect.fail(new UserNotFound({ id })),
        onSome: Effect.succeed
      }))
    )

    return user
  })

// Compose layers
const AppLayer = Layer.merge(
  Layer.succeed(Logger, ConsoleLogger),
  Layer.effect(UserRepo, Effect.gen(function*() {
    const db = yield* Database
    return new PgUserRepo(db)
  }))
).pipe(
  Layer.provide(Layer.scoped(Database, acquireDbPool))
)

// Run with all dependencies
const main = getUser("123").pipe(
  Effect.provide(AppLayer),
  Effect.catchTags({
    UserNotFound: (e) => Effect.succeed(null),
    ValidationError: (e) => Effect.die(e)
  }),
  Effect.runPromise
)

Core Strengths

βœ“Type Safety: Full compilation-time error checking
βœ“Composability: Effects naturally compose through monadic operations
βœ“Resource Safety: Automatic, exception-safe cleanup
βœ“Observability: Effects are valuesβ€”inspect before running
βœ“Concurrency: Fibers without OS thread overhead
βœ“Error Information: No lost context; complete cause chains
βœ“Testing: Effects can be inspected without execution
βœ“Performance: Chunked streaming, efficient batching