Test streams by collecting results and verifying transformations, error handling, and resource management.
import { describe, it, expect } from "vitest"
import { Effect, Stream, Chunk, Ref } from "effect"
describe("Stream Testing", () => {
// ============================================
// 1. Test basic stream operations
// ============================================
it("should transform stream elements", async () => {
const result = await Effect.runPromise(
Stream.fromIterable([1, 2, 3, 4, 5]).pipe(
Stream.map((n) => n * 2),
Stream.runCollect
)
)
expect(Chunk.toReadonlyArray(result)).toEqual([2, 4, 6, 8, 10])
})
it("should filter stream elements", async () => {
const result = await Effect.runPromise(
Stream.fromIterable([1, 2, 3, 4, 5, 6]).pipe(
Stream.filter((n) => n % 2 === 0),
Stream.runCollect
)
)
expect(Chunk.toReadonlyArray(result)).toEqual([2, 4, 6])
})
// ============================================
// 2. Test stream aggregation
// ============================================
it("should fold stream to single value", async () => {
const result = await Effect.runPromise(
Stream.fromIterable([1, 2, 3, 4, 5]).pipe(
Stream.runFold(0, (acc, n) => acc + n)
)
)
expect(result).toBe(15)
})
it("should count stream elements", async () => {
const count = await Effect.runPromise(
Stream.fromIterable(["a", "b", "c", "d"]).pipe(
Stream.runCount
)
)
expect(count).toBe(4)
})
// ============================================
// 3. Test error handling in streams
// ============================================
it("should catch errors in stream", async () => {
const result = await Effect.runPromise(
Stream.fromIterable([1, 2, 3]).pipe(
Stream.mapEffect((n) =>
n === 2
? Effect.fail(new Error("Failed on 2"))
: Effect.succeed(n * 10)
),
Stream.catchAll((error) =>
Stream.succeed(-1) // Replace error with sentinel
),
Stream.runCollect
)
)
expect(Chunk.toReadonlyArray(result)).toEqual([10, -1])
})
it("should handle errors and continue with orElse", async () => {
const failingStream = Stream.fail(new Error("Primary failed"))
const fallbackStream = Stream.fromIterable([1, 2, 3])
const result = await Effect.runPromise(
failingStream.pipe(
Stream.orElse(() => fallbackStream),
Stream.runCollect
)
)
expect(Chunk.toReadonlyArray(result)).toEqual([1, 2, 3])
})
// ============================================
// 4. Test stream chunking
// ============================================
it("should chunk stream elements", async () => {
const result = await Effect.runPromise(
Stream.fromIterable([1, 2, 3, 4, 5]).pipe(
Stream.grouped(2),
Stream.runCollect
)
)
const chunks = Chunk.toReadonlyArray(result).map(Chunk.toReadonlyArray)
expect(chunks).toEqual([[1, 2], [3, 4], [5]])
})
// ============================================
// 5. Test stream with effects
// ============================================
it("should run effects for each element", async () => {
const processed: number[] = []
await Effect.runPromise(
Stream.fromIterable([1, 2, 3]).pipe(
Stream.tap((n) =>
Effect.sync(() => {
processed.push(n)
})
),
Stream.runDrain
)
)
expect(processed).toEqual([1, 2, 3])
})
// ============================================
// 6. Test stream resource management
// ============================================
it("should release resources on completion", async () => {
const acquired: string[] = []
const released: string[] = []
const managedStream = Stream.acquireRelease(
Effect.gen(function* () {
acquired.push("resource")
return "resource"
}),
() =>
Effect.sync(() => {
released.push("resource")
})
).pipe(
Stream.flatMap(() => Stream.fromIterable([1, 2, 3]))
)
await Effect.runPromise(Stream.runDrain(managedStream))
expect(acquired).toEqual(["resource"])
expect(released).toEqual(["resource"])
})
it("should release resources on error", async () => {
const released: string[] = []
const managedStream = Stream.acquireRelease(
Effect.succeed("resource"),
() => Effect.sync(() => { released.push("released") })
).pipe(
Stream.flatMap(() =>
Stream.fromEffect(Effect.fail(new Error("Oops")))
)
)
await Effect.runPromise(
Stream.runDrain(managedStream).pipe(
Effect.catchAll(() => Effect.void)
)
)
expect(released).toEqual(["released"])
})
// ============================================
// 7. Test stream timing with take/drop
// ============================================
it("should take first N elements", async () => {
const result = await Effect.runPromise(
Stream.fromIterable([1, 2, 3, 4, 5]).pipe(
Stream.take(3),
Stream.runCollect
)
)
expect(Chunk.toReadonlyArray(result)).toEqual([1, 2, 3])
})
it("should drop first N elements", async () => {
const result = await Effect.runPromise(
Stream.fromIterable([1, 2, 3, 4, 5]).pipe(
Stream.drop(2),
Stream.runCollect
)
)
expect(Chunk.toReadonlyArray(result)).toEqual([3, 4, 5])
})
// ============================================
// 8. Test stream merging
// ============================================
it("should merge streams", async () => {
const stream1 = Stream.fromIterable([1, 3, 5])
const stream2 = Stream.fromIterable([2, 4, 6])
const result = await Effect.runPromise(
Stream.merge(stream1, stream2).pipe(
Stream.runCollect
)
)
const array = Chunk.toReadonlyArray(result)
expect(array).toHaveLength(6)
expect(array).toContain(1)
expect(array).toContain(6)
})
})