Copyright | (c) 2023 Sean Hess |
---|---|
License | BSD3 |
Maintainer | Sean Hess <seanhess@gmail.com> |
Stability | experimental |
Portability | portable |
Safe Haskell | None |
Language | Haskell2010 |
Network.AMQP.Worker
Description
Type safe and simplified message queues with AMQP
Synopsis
- newtype Key a msg = Key [Bind]
- data Bind
- data Route
- word :: Text -> Key a msg -> Key a msg
- key :: Text -> Key Route msg
- any1 :: Key a msg -> Key Bind msg
- many :: Key a msg -> Key Bind msg
- connect :: MonadIO m => ConnectionOpts -> m Connection
- fromURI :: String -> Either String ConnectionOpts
- data Connection
- publish :: (RequireRoute a, ToJSON msg, MonadIO m) => Connection -> Key a msg -> msg -> m ()
- queue :: MonadIO m => Connection -> QueuePrefix -> Key a msg -> m (Queue msg)
- queueNamed :: MonadIO m => Connection -> QueueName -> Key a msg -> m (Queue msg)
- data Queue msg = Queue (Key Bind msg) QueueName
- queueName :: QueuePrefix -> Key a msg -> QueueName
- type QueueName = Text
- newtype QueuePrefix = QueuePrefix Text
- data ParseError = ParseError String ByteString
- data Message a = Message {
- body :: ByteString
- value :: a
- takeMessage :: (MonadIO m, FromJSON a) => Connection -> Queue a -> m (Message a)
- worker :: (FromJSON a, MonadIO m) => Connection -> Queue a -> (Message a -> m ()) -> m ()
- def :: Default a => a
How to use this library
Define keys to identify how messages will be published and what the message type is
import Network.AMQP.Worker as Worker data Greeting = Greeting { message :: Text } deriving (Generic, Show, Eq) instance FromJSON Greeting instance ToJSON Greeting newGreetings :: Key Routing Greeting newGreetings = key "greetings" & word "new"
Connect to AMQP and publish a message
conn <- Worker.connect (fromURI "amqp://guest:guest@localhost:5672") Worker.publish conn newGreetings $ Greeting "hello"
Create a queue to receive messages. You can bind direclty to the Routing Key to ensure it is delivered once
q <- Worker.queue conn "new" newMessages :: IO (Queue Greeting) m <- Worker.takeMessage conn q print (value m)
Define dynamic Routing Keys to receive many kinds of messages
let newMessages = key "messages" & any1 & word "new" q <- Worker.queue conn def newMessages :: IO (Queue Greeting) m <- Worker.takeMessage conn q print (value m)
Create a worker to conintually process messages
forkIO $ Worker.worker conn q $ \m -> do print (value m)
Binding and Routing Keys
Messages are published with a specific identifier called a Routing key. Queues can use Binding Keys to control which messages are delivered to them.
Routing keys have no dynamic component and can be used to publish messages
commentsKey :: Key Route Comment commentsKey = key "posts" & word "new"
Binding keys can contain wildcards, only used for matching messages
commentsKey :: Key Bind Comment commentsKey = key "posts" & any1 & word "comments" & many
word :: Text -> Key a msg -> Key a msg Source #
A specific word. Can be used to chain Routing keys or Binding keys
any1 :: Key a msg -> Key Bind msg Source #
Match any one word. Equivalent to *
. Converts to a Binding key and can no longer be used to publish messaages
many :: Key a msg -> Key Bind msg Source #
Match zero or more words. Equivalient to #
. Converts to a Binding key and can no longer be used to publish messages
Connecting
connect :: MonadIO m => ConnectionOpts -> m Connection Source #
Connect to the AMQP server.
conn <- connect (fromURI "amqp://guest:guest@localhost:5672")
fromURI :: String -> Either String ConnectionOpts #
Parses an AMQP standard URI of the form amqp://user:password@host:port/vhost
and returns a ConnectionOpts
for use with openConnection''
.
To pass multiple servers, separate them by comma, like: amqp://user:password@host:port,host2:port2/vhost
Any of these fields may be empty and will be replaced with defaults from amqp://guest:guest@localhost:5672/
When parsing fails, a Left String
will be returned with a human-readable error-message.
data Connection Source #
Sending Messages
publish :: (RequireRoute a, ToJSON msg, MonadIO m) => Connection -> Key a msg -> msg -> m () Source #
send a message to a queue. Enforces that the message type and queue name are correct at the type level
let newUsers = key "users" & word "new":: Key Route User publish conn newUsers (User "username")
Publishing to a Binding Key results in an error
-- Compiler error! This doesn't make sense let users = key "users" & many :: Key Binding User publish conn users (User "username")
Initializing queues
queue :: MonadIO m => Connection -> QueuePrefix -> Key a msg -> m (Queue msg) Source #
queueNamed :: MonadIO m => Connection -> QueueName -> Key a msg -> m (Queue msg) Source #
Create a queue to receive messages matching the binding key. Each queue with a unique name will be delivered a separate copy of the messsage. Workers operating on the same queue, or on queues with the same name will load balance
A queue is an inbox for messages to be delivered
queueName :: QueuePrefix -> Key a msg -> QueueName Source #
Name a queue with a prefix and the binding key name. Useful for seeing at a glance which queues are receiving which messages
-- "main messages.new" queueName "main" (key "messages" & word "new")
newtype QueuePrefix Source #
Constructors
QueuePrefix Text |
Instances
Default QueuePrefix Source # | |
Defined in Network.AMQP.Worker.Queue Methods def :: QueuePrefix # | |
IsString QueuePrefix Source # | |
Defined in Network.AMQP.Worker.Queue Methods fromString :: String -> QueuePrefix # | |
Show QueuePrefix Source # | |
Defined in Network.AMQP.Worker.Queue Methods showsPrec :: Int -> QueuePrefix -> ShowS # show :: QueuePrefix -> String # showList :: [QueuePrefix] -> ShowS # | |
Eq QueuePrefix Source # | |
Defined in Network.AMQP.Worker.Queue |
Messages
data ParseError Source #
Constructors
ParseError String ByteString |
Instances
Exception ParseError Source # | |
Defined in Network.AMQP.Worker.Message Methods toException :: ParseError -> SomeException # fromException :: SomeException -> Maybe ParseError # displayException :: ParseError -> String # backtraceDesired :: ParseError -> Bool # | |
Show ParseError Source # | |
Defined in Network.AMQP.Worker.Message Methods showsPrec :: Int -> ParseError -> ShowS # show :: ParseError -> String # showList :: [ParseError] -> ShowS # |
a parsed message from the queue
Constructors
Message | |
Fields
|
takeMessage :: (MonadIO m, FromJSON a) => Connection -> Queue a -> m (Message a) Source #