kafka-effectful: Effectful effects for hw-kafka-client

[ library, messaging, mit, network ] [ Propose Tags ] [ Report a vulnerability ]

Effectful effects and interpreters for hw-kafka-client, a Haskell binding to Apache Kafka via librdkafka. Provides typed, composable KafkaProducer and KafkaConsumer effects.


[Skip to Readme]

Downloads

Maintainer's Corner

Package maintainers

For package maintainers and hackage trustees

Candidates

  • No Candidates
Versions [RSS] 0.1.0.0, 0.2.0.0
Change log CHANGELOG.md
Dependencies base (>=4.21 && <5), bytestring (>=0.11 && <0.13), containers (>=0.6 && <0.8), effectful-core (>=2.5 && <2.7), hw-kafka-client (>=5.3 && <6), text (>=2.0 && <2.2) [details]
License MIT
Author Nadeem Bitar
Maintainer Nadeem Bitar
Uploaded by shinzui at 2026-04-17T04:02:00Z
Category Network, Messaging
Source repo head: git clone https://github.com/shinzui/kafka-effectful.git
Distributions
Reverse Dependencies 1 direct, 0 indirect [details]
Downloads 11 total (8 in the last 30 days)
Rating (no votes yet) [estimated by Bayesian average]
Your Rating
  • λ
  • λ
  • λ
Status Docs uploaded by user
Build status unknown [no reports yet]

Readme for kafka-effectful-0.1.0.0

[back to package description]

kafka-effectful

Effectful effects and interpreters for hw-kafka-client, a Haskell binding to Apache Kafka via librdkafka.

Provides typed, composable KafkaProducer and KafkaConsumer effects for the effectful ecosystem.

Status: experimental. This package is on its first release. The API may change in breaking ways in subsequent 0.x versions. Pin to an exact version in production until 1.0 is tagged.

Features

  • KafkaProducer -- send messages and flush the producer queue
  • KafkaConsumer -- poll messages (single or batch), manage offsets, assign/pause/resume/seek partitions, and query committed offsets, positions, assignments, and subscriptions
  • Resource-safe interpreters that acquire and release Kafka handles via bracket
  • Errors surfaced through Effectful.Error.Static (Error KafkaError)

Usage

Producer

import Kafka.Effectful

example :: (IOE :> es, Error KafkaError :> es) => Eff es ()
example =
  runKafkaProducer producerProps $ do
    produceMessage record
    flushProducer

Consumer

import Kafka.Effectful

example :: (IOE :> es, Error KafkaError :> es) => Eff es ()
example =
  runKafkaConsumer consumerProps subscription loop
  where
    loop = do
      mbMsg <- pollMessage (Timeout 1000)
      case mbMsg of
        Nothing  -> loop
        Just msg -> do
          commitOffsetMessage OffsetCommit msg
          loop

pollMessage returns Nothing when the timeout elapses without a message arriving; non-timeout failures are thrown via the Error KafkaError effect.

Running it

The effect handlers runKafkaProducer and runKafkaConsumer require IOE and Error KafkaError in the effect stack. A complete program wires them with runEff and runError:

{-# LANGUAGE TypeApplications #-}

import Effectful
import Effectful.Error.Static (runError)
import Kafka.Effectful

main :: IO ()
main = do
  result <- runEff . runError @KafkaError $ runProgram
  case result of
    Left (_, err) -> putStrLn ("Kafka error: " <> show err)
    Right ()      -> pure ()
  where
    runProgram =
      runKafkaProducer producerProps $ do
        produceMessage record
        flushProducer

Replace producerProps and record with your own ProducerProperties and ProducerRecord values (see the Kafka.Effectful.Producer module for the available builders).

Module Structure

Module Description
Kafka.Effectful Convenience re-export of both effects and common types
Kafka.Effectful.Producer Producer effect, interpreter, and types
Kafka.Effectful.Consumer Consumer effect, interpreter, and types
Kafka.Effectful.Producer.Effect KafkaProducer effect definition and operations
Kafka.Effectful.Producer.Interpreter runKafkaProducer interpreter
Kafka.Effectful.Consumer.Effect KafkaConsumer effect definition and operations
Kafka.Effectful.Consumer.Interpreter runKafkaConsumer interpreter

Requirements

  • GHC >= 9.12
  • librdkafka (system dependency)

License

MIT