hw-kafka-streamly
Streamly streaming integration for hw-kafka-client — the
Haskell binding to Apache Kafka via librdkafka.
hw-kafka-streamly exposes Kafka consumers as composable Streamly Streams
and Kafka producers as Streamly Folds, so message processing can be expressed
in the same vocabulary used elsewhere in a Streamly pipeline. Resource handling
(consumer/producer creation, close, flush) is layered so the caller picks the
amount of bracketing they want.
Installation
Add to your .cabal file:
build-depends:
, hw-kafka-streamly >=0.1 && <0.2
, hw-kafka-client >=5.3 && <6
, streamly-core >=0.4 && <0.5
Note: streamly-core 0.4 is not yet published to Hackage at the time of
writing. Until it is, depend on a source-repository-package for upstream
Streamly in your cabal.project. The plan to land Streamly 0.4 on Hackage
is tracked in the project's master plan
(docs/masterplans/1-release-hw-kafka-streamly-0.1.0.0.md, EP-5/EP-6).
Modules
Kafka.Streamly.Source — consumer streams, error predicates and filters,
value-mapping helpers built on Bifunctor/Bitraversable.
Kafka.Streamly.Sink — producer folds and a withKafkaProducer bracket
helper.
Kafka.Streamly.Combinators — batching combinators and helpers that throw
Left values as exceptions.
Consuming
The source module ships three variants that differ only in how they manage
the underlying KafkaConsumer:
kafkaSource — creates the consumer from ConsumerProperties and
Subscription, closes it when the stream ends. Use this when the stream
fully owns the consumer's lifecycle.
kafkaSourceAutoClose — wraps a caller-supplied KafkaConsumer and closes
it on stream end. Use this when the consumer is created elsewhere but its
lifetime matches the stream.
kafkaSourceNoClose — wraps a caller-supplied KafkaConsumer and leaves
it open. Use this when the consumer outlives the stream.
import Kafka.Consumer
import Kafka.Streamly.Source (kafkaSource, skipNonFatal)
import Streamly.Data.Stream qualified as Stream
main :: IO ()
main = do
let props = brokersList ["localhost:9092"]
<> groupId "my-group"
<> noAutoCommit
sub = topics ["events"] <> offsetReset Earliest
Stream.fold (Fold.drainBy print)
. skipNonFatal
$ kafkaSource props sub (Timeout 1000)
Producing
Producer sinks are Streamly Folds:
import Kafka.Producer
import Kafka.Streamly.Sink (kafkaSink, withKafkaProducer)
import Streamly.Data.Fold qualified as Fold
import Streamly.Data.Stream qualified as Stream
main :: IO ()
main = do
let props = brokersList ["localhost:9092"]
result <- withKafkaProducer props $ \producer ->
Stream.fold (kafkaSink producer)
. fmap mkRecord
$ Stream.fromList ["a", "b", "c"]
print result
where
mkRecord v =
ProducerRecord
{ prTopic = TopicName "events"
, prPartition = UnassignedPartition
, prKey = Nothing
, prValue = Just v
, prHeaders = mempty
}
Cookbook
For end-to-end runnable examples — concurrent consumers, batching producers,
error handling, transform pipelines — see the companion
hw-kafka-streamly-jitsurei package in this repository. It is not
published to Hackage; clone the repo and run the executables locally.
Design notes
The original design plan for the bindings lives at
docs/plans/1-streamly-bindings-for-hw-kafka-client.md.
License
MIT — see LICENSE.