Skip to content

Latest commit

 

History

History
282 lines (227 loc) · 7.26 KB

File metadata and controls

282 lines (227 loc) · 7.26 KB
title Test Streaming Effects
id testing-streams
skillLevel advanced
applicationPatternId testing
summary Write tests for Stream operations, transformations, and error handling.
tags
testing
streams
data-pipelines
rule
description
Use Stream.runCollect and assertions to verify stream behavior.
author PaulJPhilp
related
testing-hello-world
testing-concurrent-code
lessonOrder 4

Guideline

Test streams by collecting results and verifying transformations, error handling, and resource management.


Rationale

Stream tests verify:

  1. Transformations - map, filter, flatMap work correctly
  2. Error handling - Failures are caught and handled
  3. Resource safety - Resources are released
  4. Backpressure - Data flow is controlled

Good Example

import { describe, it, expect } from "vitest"
import { Effect, Stream, Chunk, Ref } from "effect"

describe("Stream Testing", () => {
  // ============================================
  // 1. Test basic stream operations
  // ============================================

  it("should transform stream elements", async () => {
    const result = await Effect.runPromise(
      Stream.fromIterable([1, 2, 3, 4, 5]).pipe(
        Stream.map((n) => n * 2),
        Stream.runCollect
      )
    )

    expect(Chunk.toReadonlyArray(result)).toEqual([2, 4, 6, 8, 10])
  })

  it("should filter stream elements", async () => {
    const result = await Effect.runPromise(
      Stream.fromIterable([1, 2, 3, 4, 5, 6]).pipe(
        Stream.filter((n) => n % 2 === 0),
        Stream.runCollect
      )
    )

    expect(Chunk.toReadonlyArray(result)).toEqual([2, 4, 6])
  })

  // ============================================
  // 2. Test stream aggregation
  // ============================================

  it("should fold stream to single value", async () => {
    const result = await Effect.runPromise(
      Stream.fromIterable([1, 2, 3, 4, 5]).pipe(
        Stream.runFold(0, (acc, n) => acc + n)
      )
    )

    expect(result).toBe(15)
  })

  it("should count stream elements", async () => {
    const count = await Effect.runPromise(
      Stream.fromIterable(["a", "b", "c", "d"]).pipe(
        Stream.runCount
      )
    )

    expect(count).toBe(4)
  })

  // ============================================
  // 3. Test error handling in streams
  // ============================================

  it("should catch errors in stream", async () => {
    const result = await Effect.runPromise(
      Stream.fromIterable([1, 2, 3]).pipe(
        Stream.mapEffect((n) =>
          n === 2
            ? Effect.fail(new Error("Failed on 2"))
            : Effect.succeed(n * 10)
        ),
        Stream.catchAll((error) =>
          Stream.succeed(-1)  // Replace error with sentinel
        ),
        Stream.runCollect
      )
    )

    expect(Chunk.toReadonlyArray(result)).toEqual([10, -1])
  })

  it("should handle errors and continue with orElse", async () => {
    const failingStream = Stream.fail(new Error("Primary failed"))
    const fallbackStream = Stream.fromIterable([1, 2, 3])

    const result = await Effect.runPromise(
      failingStream.pipe(
        Stream.orElse(() => fallbackStream),
        Stream.runCollect
      )
    )

    expect(Chunk.toReadonlyArray(result)).toEqual([1, 2, 3])
  })

  // ============================================
  // 4. Test stream chunking
  // ============================================

  it("should chunk stream elements", async () => {
    const result = await Effect.runPromise(
      Stream.fromIterable([1, 2, 3, 4, 5]).pipe(
        Stream.grouped(2),
        Stream.runCollect
      )
    )

    const chunks = Chunk.toReadonlyArray(result).map(Chunk.toReadonlyArray)
    expect(chunks).toEqual([[1, 2], [3, 4], [5]])
  })

  // ============================================
  // 5. Test stream with effects
  // ============================================

  it("should run effects for each element", async () => {
    const processed: number[] = []

    await Effect.runPromise(
      Stream.fromIterable([1, 2, 3]).pipe(
        Stream.tap((n) =>
          Effect.sync(() => {
            processed.push(n)
          })
        ),
        Stream.runDrain
      )
    )

    expect(processed).toEqual([1, 2, 3])
  })

  // ============================================
  // 6. Test stream resource management
  // ============================================

  it("should release resources on completion", async () => {
    const acquired: string[] = []
    const released: string[] = []

    const managedStream = Stream.acquireRelease(
      Effect.gen(function* () {
        acquired.push("resource")
        return "resource"
      }),
      () =>
        Effect.sync(() => {
          released.push("resource")
        })
    ).pipe(
      Stream.flatMap(() => Stream.fromIterable([1, 2, 3]))
    )

    await Effect.runPromise(Stream.runDrain(managedStream))

    expect(acquired).toEqual(["resource"])
    expect(released).toEqual(["resource"])
  })

  it("should release resources on error", async () => {
    const released: string[] = []

    const managedStream = Stream.acquireRelease(
      Effect.succeed("resource"),
      () => Effect.sync(() => { released.push("released") })
    ).pipe(
      Stream.flatMap(() =>
        Stream.fromEffect(Effect.fail(new Error("Oops")))
      )
    )

    await Effect.runPromise(
      Stream.runDrain(managedStream).pipe(
        Effect.catchAll(() => Effect.void)
      )
    )

    expect(released).toEqual(["released"])
  })

  // ============================================
  // 7. Test stream timing with take/drop
  // ============================================

  it("should take first N elements", async () => {
    const result = await Effect.runPromise(
      Stream.fromIterable([1, 2, 3, 4, 5]).pipe(
        Stream.take(3),
        Stream.runCollect
      )
    )

    expect(Chunk.toReadonlyArray(result)).toEqual([1, 2, 3])
  })

  it("should drop first N elements", async () => {
    const result = await Effect.runPromise(
      Stream.fromIterable([1, 2, 3, 4, 5]).pipe(
        Stream.drop(2),
        Stream.runCollect
      )
    )

    expect(Chunk.toReadonlyArray(result)).toEqual([3, 4, 5])
  })

  // ============================================
  // 8. Test stream merging
  // ============================================

  it("should merge streams", async () => {
    const stream1 = Stream.fromIterable([1, 3, 5])
    const stream2 = Stream.fromIterable([2, 4, 6])

    const result = await Effect.runPromise(
      Stream.merge(stream1, stream2).pipe(
        Stream.runCollect
      )
    )

    const array = Chunk.toReadonlyArray(result)
    expect(array).toHaveLength(6)
    expect(array).toContain(1)
    expect(array).toContain(6)
  })
})

Test Patterns

Pattern How to Test
Transformations runCollect + compare array
Aggregation runFold, runCount
Errors catchAll + verify recovery
Resources Track acquire/release calls
Side effects Use tap + external tracking

Key Assertions

Function Returns
Stream.runCollect Chunk of all elements
Stream.runFold Single aggregated value
Stream.runCount Number of elements
Stream.runDrain void (side effects)
Stream.runHead First element (Option)