bolty-streamly: Streamly streaming interface for bolty Neo4j driver

This is a package candidate release! Here you can preview how this package release will appear once published to the main package index (which can be accomplished via the 'maintain' link below). Please note that once a package has been published to the main package index it cannot be undone! Please consult the package uploading documentation for more information.

[maintain] [Publish]

Streaming queries over Neo4j using streamly streams. Wraps bolty's query functions with streamly's Stream type for lazy record-by-record consumption.


[Skip to Readme]

Properties

Versions 0.1.0.0, 0.1.0.0
Change log changelog.md
Dependencies base (>=4.18 && <5), bolty (>=0.1 && <0.2), packstream-bolt (>=0.1 && <0.2), streamly-core (>=0.2 && <0.3), text (>=2.0 && <2.2), unordered-containers (>=0.2 && <0.3), vector (>=0.13 && <0.14) [details]
License Apache-2.0
Copyright (c) 2023-2026 philippedev101
Author philippedev101
Maintainer philippedev101@gmail.com
Category Database
Home page https://github.com/philippedev101/bolty-streamly#readme
Bug tracker https://github.com/philippedev101/bolty-streamly/issues
Source repo head: git clone https://github.com/philippedev101/bolty-streamly
Uploaded by philippedev101 at 2026-03-07T13:13:29Z

Modules

[Index] [Quick Jump]

Flags

Manual Flags

NameDescriptionDefault
dev

Turn on -Werror and other developer flags

Disabled

Use -f <flag> to enable a flag, or -f -<flag> to disable that flag. More info

Downloads

Maintainer's Corner

Package maintainers

For package maintainers and hackage trustees


Readme for bolty-streamly-0.1.0.0

[back to package description]

bolty-streamly

Streaming interface for the bolty Neo4j driver, built on streamly.

Why streaming?

bolty's standard query functions buffer the entire result set into a Vector before returning. For large result sets (millions of rows, graph traversals, data exports), this can use a lot of memory.

bolty-streamly yields records one at a time as they arrive from the server, giving you constant-memory consumption regardless of result set size. Records are pulled from Neo4j in batches via the BOLT protocol's PULL mechanism, but exposed as a single Stream IO — you never have to think about batching.

Quick start

import qualified Database.Bolty          as Bolt
import qualified Database.Bolty.Streamly as BoltS
import qualified Streamly.Data.Stream    as Stream
import qualified Streamly.Data.Fold      as Fold
import           Data.Default            (def)

main :: IO ()
main = do
  let cfg = def{ Bolt.scheme = Bolt.Basic "neo4j" "password", Bolt.use_tls = False }
  case Bolt.validateConfig cfg of
    Failure _ -> error "bad config"
    Success vc -> do
      conn <- Bolt.connect vc
      s <- BoltS.queryStream conn "MATCH (n:Person) RETURN n.name AS name, n.age AS age"
      count <- Stream.fold Fold.length s
      putStrLn $ "Processed " <> show count <> " records"
      Bolt.close conn

API overview

The module exposes four levels of streaming, each with variants for parameters (P) and typed decoding (As):

Direct connection

Use when you manage the connection yourself:

-- Raw records
queryStream   :: Connection -> Text -> IO (Stream IO Record)
queryStreamP  :: Connection -> Text -> HashMap Text Ps -> IO (Stream IO Record)

-- Decoded records (throws DecodeError on failure)
queryStreamAs  :: RowDecoder a -> Connection -> Text -> IO (Stream IO a)
queryStreamPAs :: RowDecoder a -> Connection -> Text -> HashMap Text Ps -> IO (Stream IO a)

Connection pool

Acquires a connection, streams the query, and releases when the consumer returns. The stream must be fully consumed within the callback — the connection is returned to the pool when consume finishes:

withPoolStream   :: BoltPool -> Text -> (Stream IO Record -> IO a) -> IO a
withPoolStreamAs :: RowDecoder a -> BoltPool -> Text -> (Stream IO a -> IO b) -> IO b
-- + P variants for parameters

Example:

pool <- Bolt.createPool vc Bolt.defaultPoolConfig

withPoolStreamAs personDecoder pool "MATCH (p:Person) RETURN p.name, p.age" $ \stream ->
  Stream.mapM_ (\person -> putStrLn (show person)) stream

Bolt.destroyPool pool

Routing pool (clusters)

Routes queries to the appropriate cluster member based on access mode:

withRoutingStream   :: RoutingPool -> AccessMode -> Text -> (Stream IO Record -> IO a) -> IO a
withRoutingStreamAs :: RowDecoder a -> RoutingPool -> AccessMode -> Text -> (Stream IO a -> IO b) -> IO b
-- + P variants for parameters

Example:

withRoutingStreamAs decoder routingPool ReadAccess "MATCH (n) RETURN n" $ \stream ->
  Stream.fold Fold.toList stream

Session (causal consistency)

Runs streaming queries inside managed transactions with automatic bookmark tracking, retries on transient errors, and read/write routing:

sessionReadStream   :: Session -> Text -> (Stream IO Record -> IO a) -> IO a
sessionWriteStream  :: Session -> Text -> (Stream IO Record -> IO a) -> IO a
sessionReadStreamAs :: RowDecoder a -> Session -> Text -> (Stream IO a -> IO b) -> IO b
-- + P and Write variants

Example:

session <- Bolt.createSession pool Bolt.defaultSessionConfig

-- Write some data
sessionWriteStream session "CREATE (p:Person {name: 'Alice'})" $ \s ->
  Stream.fold Fold.drain s

-- Read it back (guaranteed to see Alice via bookmarks)
sessionReadStreamAs personDecoder session "MATCH (p:Person) RETURN p.name, p.age" $ \stream ->
  Stream.mapM_ print stream

Low-level: pullStream

If you need to run a query with custom RUN parameters and then stream the PULL phase yourself:

pullStream :: Connection -> IO (Stream IO Record)

This expects the connection to already be in Streaming or TXstreaming state (after a RUN has been acknowledged). It handles PULL batching and state transitions automatically.

Important: stream lifetime

With the pool, routing, and session variants, the Stream is only valid inside the callback. The connection is released when the callback returns, so you cannot store the stream or consume it later:

-- WRONG: stream escapes the callback
stream <- withPoolStream pool "MATCH (n) RETURN n" pure  -- connection released!
Stream.mapM_ print stream  -- BOOM: connection already returned to pool

-- RIGHT: consume inside the callback
withPoolStream pool "MATCH (n) RETURN n" $ \stream ->
  Stream.mapM_ print stream

With queryStream / queryStreamP on a bare connection, you manage the connection lifetime yourself, so the stream lives as long as the connection does.

Naming convention

Suffix Meaning
(none) No parameters, raw Record stream
P With parameters (HashMap Text Ps)
As Decoded via RowDecoder a, no parameters
PAs Decoded via RowDecoder a, with parameters

Supported GHC versions

9.6.7, 9.8.4, 9.10.3, 9.12.3

License

Apache-2.0