Skip to Content

Last Updated: 3/20/2026


Streaming Results

When a query returns a large result set, loading all rows into memory at once can cause performance problems or even crash your application. Kysely provides the .stream() method to process rows incrementally as they arrive from the database.

Basic Streaming

Call .stream() instead of .execute() on any select query. The method returns an AsyncIterableIterator that yields rows one at a time:

const stream = db .selectFrom('person') .selectAll() .where('age', '>', 18) .stream() for await (const person of stream) { console.log(person.first_name) // Process each row as it arrives await processRow(person) }

The stream() method is defined in the Streamable interface (see src/util/streamable.ts). It’s implemented by SelectQueryBuilder, UpdateQueryBuilder, DeleteQueryBuilder, and MergeQueryBuilder.

Unlike .execute(), which waits for all rows and returns an array, .stream() begins yielding rows as soon as the database sends them. This keeps memory usage constant regardless of result set size.

Chunk Size Parameter

The .stream() method accepts an optional chunkSize parameter that controls how many rows the database fetches at a time. This parameter is only effective on dialects that support cursor-based streaming:

const stream = db .selectFrom('person') .selectAll() .stream(100) // Fetch 100 rows at a time for await (const person of stream) { console.log(person.first_name) }

PostgreSQL supports chunk size through its cursor mechanism. The pg driver fetches rows in batches of the specified size, reducing the number of round trips to the database while still streaming results.

Other dialects may ignore the chunk size parameter if they don’t support cursor-based streaming. Check your dialect’s documentation for specific behavior.

Early Termination

Breaking out of the for await loop or returning early releases the database connection and invalidates the stream:

const stream = db .selectFrom('person') .selectAll() .stream() for await (const person of stream) { if (person.last_name === 'Smith') { // Breaking releases the connection break } console.log(person.first_name) } // Connection is released, stream is no longer usable

This is important for connection pool management. If you don’t consume the entire stream, make sure to break or return so the connection is released back to the pool.

Error Handling

Wrap streaming operations in try-catch blocks to handle database errors:

const stream = db .selectFrom('person') .selectAll() .stream() try { for await (const person of stream) { await processRow(person) } } catch (error) { console.error('Streaming error:', error) // Connection is automatically released on error }

If an error occurs during streaming, the connection is released and the stream is terminated. You cannot resume streaming after an error.

Streaming vs Pagination

Streaming is ideal for processing large result sets sequentially, but it’s not always the best choice. Consider pagination when:

  • Users need random access: Pagination lets users jump to any page. Streaming processes rows in order.
  • Results need to be displayed in a UI: Pagination provides a known total count and page boundaries.
  • Processing can be parallelized: Pagination lets you process multiple pages concurrently.

Use streaming when:

  • Memory is constrained: Streaming keeps memory usage constant.
  • Processing is sequential: You need to process every row in order.
  • Result set is very large: Millions of rows are impractical to paginate.
  • You’re exporting data: Streaming is perfect for generating CSV files or data dumps.

Implementation Details

The stream() method delegates to the QueryExecutor interface (defined in src/query-executor/query-executor.ts), which calls the driver’s streamQuery() method. The driver’s DatabaseConnection interface (see src/driver/database-connection.ts) defines:

interface DatabaseConnection { streamQuery<R>( compiledQuery: CompiledQuery, chunkSize?: number, ): AsyncIterableIterator<QueryResult<R>> }

The driver yields QueryResult objects, each containing a batch of rows. Kysely unwraps these results and yields individual rows to your code.

Dialect Support

Streaming support varies by dialect:

PostgreSQL: Full support with configurable chunk size via cursors. The pg driver uses pg-cursor internally to fetch rows in batches.

MySQL: Full support. The mysql2 driver streams results natively. Chunk size is not configurable.

SQLite: Limited support. SQLite loads the entire result set into memory before returning it, so streaming provides no memory benefit. The stream interface still works, but it’s equivalent to calling .execute() and iterating over the array.

MS SQL Server: Full support via the tedious driver. Chunk size is not configurable.

Streaming with Joins

Streaming works with complex queries including joins, subqueries, and CTEs:

const stream = db .selectFrom('person') .innerJoin('pet', 'pet.owner_id', 'person.id') .select(['person.first_name', 'pet.name as pet_name']) .stream() for await (const row of stream) { console.log(`${row.first_name} owns ${row.pet_name}`) }

Be aware that joins can produce duplicate rows if the relationship is one-to-many. Each person-pet combination appears as a separate row in the stream.

Streaming Updates and Deletes

Some dialects support streaming results from update and delete queries when combined with RETURNING clauses:

const stream = db .updateTable('person') .set({ processed: true }) .where('processed', '=', false) .returningAll() .stream() for await (const person of stream) { console.log(`Processed ${person.first_name}`) }

This is useful for processing large batch updates while tracking which rows were affected.

Memory Considerations

While streaming keeps memory usage constant, be aware of memory leaks in your processing code:

const results = [] // Don't do this! const stream = db .selectFrom('person') .selectAll() .stream() for await (const person of stream) { // This defeats the purpose of streaming results.push(person) }

If you accumulate rows in an array, you’re back to loading everything into memory. Process and discard rows as you go:

const stream = db .selectFrom('person') .selectAll() .stream() for await (const person of stream) { await sendToQueue(person) // Process and discard }

What’s Next