{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TypeFamilies #-}

module Network.AMQP.Worker.Message where

import Control.Exception (Exception, throwIO)
import Control.Monad (forever)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Data.Aeson (FromJSON, ToJSON)
import qualified Data.Aeson as Aeson
import Data.ByteString.Lazy (ByteString)
import Network.AMQP (Ack (..), DeliveryMode (..), newMsg)
import qualified Network.AMQP as AMQP
import Network.AMQP.Worker.Connection (Connection, exchange, withChannel)
import Network.AMQP.Worker.Key (Key, RequireRoute, Route, keyText)
import Network.AMQP.Worker.Poll (poll)
import Network.AMQP.Worker.Queue (Queue (..))

-- | a parsed message from the queue
data Message a = Message
    { forall a. Message a -> ByteString
body :: ByteString
    , forall a. Message a -> a
value :: a
    }
    deriving (Int -> Message a -> ShowS
[Message a] -> ShowS
Message a -> String
(Int -> Message a -> ShowS)
-> (Message a -> String)
-> ([Message a] -> ShowS)
-> Show (Message a)
forall a. Show a => Int -> Message a -> ShowS
forall a. Show a => [Message a] -> ShowS
forall a. Show a => Message a -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: forall a. Show a => Int -> Message a -> ShowS
showsPrec :: Int -> Message a -> ShowS
$cshow :: forall a. Show a => Message a -> String
show :: Message a -> String
$cshowList :: forall a. Show a => [Message a] -> ShowS
showList :: [Message a] -> ShowS
Show, Message a -> Message a -> Bool
(Message a -> Message a -> Bool)
-> (Message a -> Message a -> Bool) -> Eq (Message a)
forall a. Eq a => Message a -> Message a -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: forall a. Eq a => Message a -> Message a -> Bool
== :: Message a -> Message a -> Bool
$c/= :: forall a. Eq a => Message a -> Message a -> Bool
/= :: Message a -> Message a -> Bool
Eq)

-- | send a message to a queue. Enforces that the message type and queue name are correct at the type level
--
-- > let newUsers = key "users" & word "new":: Key Route User
-- > publish conn newUsers (User "username")
--
-- Publishing to a Binding Key results in an error
--
-- > -- Compiler error! This doesn't make sense
-- > let users = key "users" & many :: Key Binding User
-- > publish conn users (User "username")
publish :: (RequireRoute a, ToJSON msg, MonadIO m) => Connection -> Key a msg -> msg -> m ()
publish :: forall a msg (m :: * -> *).
(RequireRoute a, ToJSON msg, MonadIO m) =>
Connection -> Key a msg -> msg -> m ()
publish = Connection -> Key a msg -> msg -> m ()
forall a msg (m :: * -> *).
(RequireRoute a, ToJSON msg, MonadIO m) =>
Connection -> Key a msg -> msg -> m ()
publishToExchange

-- | publish a message to a routing key, without making sure a queue exists to handle it
--
-- > publishToExchange conn key (User "username")
publishToExchange :: (RequireRoute a, ToJSON msg, MonadIO m) => Connection -> Key a msg -> msg -> m ()
publishToExchange :: forall a msg (m :: * -> *).
(RequireRoute a, ToJSON msg, MonadIO m) =>
Connection -> Key a msg -> msg -> m ()
publishToExchange Connection
conn Key a msg
rk msg
msg =
    IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Connection -> (Channel -> IO ()) -> IO ()
forall b. Connection -> (Channel -> IO b) -> IO b
withChannel Connection
conn ((Channel -> IO ()) -> IO ()) -> (Channel -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Channel
chan -> do
        _ <- Channel -> Text -> Text -> Message -> IO (Maybe Int)
AMQP.publishMsg Channel
chan (Connection -> Text
exchange Connection
conn) (Key a msg -> Text
forall a msg. Key a msg -> Text
keyText Key a msg
rk) (msg -> Message
forall a. ToJSON a => a -> Message
jsonMessage msg
msg)
        return ()
  where
    jsonMessage :: ToJSON a => a -> AMQP.Message
    jsonMessage :: forall a. ToJSON a => a -> Message
jsonMessage a
a =
        Message
newMsg
            { AMQP.msgBody = Aeson.encode a
            , AMQP.msgContentType = Just "application/json"
            , AMQP.msgContentEncoding = Just "UTF-8"
            , AMQP.msgDeliveryMode = Just Persistent
            }

-- \| Wait until a message is read from the queue. Throws an exception if the message doesn't match the declared type
--
-- > m <- Worker.takeMessage conn queue
-- > print (value m)
takeMessage :: (MonadIO m, FromJSON a) => Connection -> Queue a -> m (Message a)
takeMessage :: forall (m :: * -> *) a.
(MonadIO m, FromJSON a) =>
Connection -> Queue a -> m (Message a)
takeMessage Connection
conn Queue a
q = do
    let delay :: Int
delay = Int
10000 :: Microseconds
    res <- Int -> Connection -> Queue a -> m (ConsumeResult a)
forall msg (m :: * -> *).
(FromJSON msg, MonadIO m) =>
Int -> Connection -> Queue msg -> m (ConsumeResult msg)
consumeNext Int
delay Connection
conn Queue a
q
    case res of
        Error ParseError
e -> IO (Message a) -> m (Message a)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Message a) -> m (Message a))
-> IO (Message a) -> m (Message a)
forall a b. (a -> b) -> a -> b
$ ParseError -> IO (Message a)
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO ParseError
e
        Parsed Message a
msg -> Message a -> m (Message a)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Message a
msg

-- | Create a worker which loops and handles messages
--
-- > Worker.worker conn queue $ \m -> do
-- >   print (value m)
worker :: (FromJSON a, MonadIO m) => Connection -> Queue a -> (Message a -> m ()) -> m ()
worker :: forall a (m :: * -> *).
(FromJSON a, MonadIO m) =>
Connection -> Queue a -> (Message a -> m ()) -> m ()
worker Connection
conn Queue a
queue Message a -> m ()
action =
    m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
        m <- Connection -> Queue a -> m (Message a)
forall (m :: * -> *) a.
(MonadIO m, FromJSON a) =>
Connection -> Queue a -> m (Message a)
takeMessage Connection
conn Queue a
queue
        action m

-- | Block while checking for messages every N microseconds. Return once you find one.
--
-- > res <- consumeNext conn queue
-- > case res of
-- >   (Parsed m) -> print m
-- >   (Error e) -> putStrLn "could not parse message"
consumeNext :: (FromJSON msg, MonadIO m) => Microseconds -> Connection -> Queue msg -> m (ConsumeResult msg)
consumeNext :: forall msg (m :: * -> *).
(FromJSON msg, MonadIO m) =>
Int -> Connection -> Queue msg -> m (ConsumeResult msg)
consumeNext Int
pd Connection
conn Queue msg
key =
    Int -> m (Maybe (ConsumeResult msg)) -> m (ConsumeResult msg)
forall (m :: * -> *) a. MonadIO m => Int -> m (Maybe a) -> m a
poll Int
pd (m (Maybe (ConsumeResult msg)) -> m (ConsumeResult msg))
-> m (Maybe (ConsumeResult msg)) -> m (ConsumeResult msg)
forall a b. (a -> b) -> a -> b
$ Connection -> Queue msg -> m (Maybe (ConsumeResult msg))
forall msg (m :: * -> *).
(FromJSON msg, MonadIO m) =>
Connection -> Queue msg -> m (Maybe (ConsumeResult msg))
consume Connection
conn Queue msg
key

-- | Check for a message once and attempt to parse it
--
-- > res <- consume conn queue
-- > case res of
-- >   Just (Parsed m) -> print m
-- >   Just (Error e) -> putStrLn "could not parse message"
-- >   Nothing -> putStrLn "No messages on the queue"
consume :: (FromJSON msg, MonadIO m) => Connection -> Queue msg -> m (Maybe (ConsumeResult msg))
consume :: forall msg (m :: * -> *).
(FromJSON msg, MonadIO m) =>
Connection -> Queue msg -> m (Maybe (ConsumeResult msg))
consume Connection
conn (Queue Key Bind msg
_ Text
name) = do
    mme <- IO (Maybe (Message, Envelope)) -> m (Maybe (Message, Envelope))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (Message, Envelope)) -> m (Maybe (Message, Envelope)))
-> IO (Maybe (Message, Envelope)) -> m (Maybe (Message, Envelope))
forall a b. (a -> b) -> a -> b
$ Connection
-> (Channel -> IO (Maybe (Message, Envelope)))
-> IO (Maybe (Message, Envelope))
forall b. Connection -> (Channel -> IO b) -> IO b
withChannel Connection
conn ((Channel -> IO (Maybe (Message, Envelope)))
 -> IO (Maybe (Message, Envelope)))
-> (Channel -> IO (Maybe (Message, Envelope)))
-> IO (Maybe (Message, Envelope))
forall a b. (a -> b) -> a -> b
$ \Channel
chan -> do
        m <- Channel -> Ack -> Text -> IO (Maybe (Message, Envelope))
AMQP.getMsg Channel
chan Ack
Ack Text
name
        pure m

    case mme of
        Maybe (Message, Envelope)
Nothing ->
            Maybe (ConsumeResult msg) -> m (Maybe (ConsumeResult msg))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (ConsumeResult msg)
forall a. Maybe a
Nothing
        Just (Message
msg, Envelope
env) -> do
            IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Envelope -> IO ()
AMQP.ackEnv Envelope
env
            let bd :: ByteString
bd = Message -> ByteString
AMQP.msgBody Message
msg
            case ByteString -> Either String msg
forall a. FromJSON a => ByteString -> Either String a
Aeson.eitherDecode ByteString
bd of
                Left String
err ->
                    Maybe (ConsumeResult msg) -> m (Maybe (ConsumeResult msg))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (ConsumeResult msg) -> m (Maybe (ConsumeResult msg)))
-> Maybe (ConsumeResult msg) -> m (Maybe (ConsumeResult msg))
forall a b. (a -> b) -> a -> b
$ ConsumeResult msg -> Maybe (ConsumeResult msg)
forall a. a -> Maybe a
Just (ConsumeResult msg -> Maybe (ConsumeResult msg))
-> ConsumeResult msg -> Maybe (ConsumeResult msg)
forall a b. (a -> b) -> a -> b
$ ParseError -> ConsumeResult msg
forall a. ParseError -> ConsumeResult a
Error (String -> ByteString -> ParseError
ParseError String
err ByteString
bd)
                Right msg
v ->
                    Maybe (ConsumeResult msg) -> m (Maybe (ConsumeResult msg))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (ConsumeResult msg) -> m (Maybe (ConsumeResult msg)))
-> Maybe (ConsumeResult msg) -> m (Maybe (ConsumeResult msg))
forall a b. (a -> b) -> a -> b
$ ConsumeResult msg -> Maybe (ConsumeResult msg)
forall a. a -> Maybe a
Just (ConsumeResult msg -> Maybe (ConsumeResult msg))
-> ConsumeResult msg -> Maybe (ConsumeResult msg)
forall a b. (a -> b) -> a -> b
$ Message msg -> ConsumeResult msg
forall a. Message a -> ConsumeResult a
Parsed (ByteString -> msg -> Message msg
forall a. ByteString -> a -> Message a
Message ByteString
bd msg
v)

data ConsumeResult a
    = Parsed (Message a)
    | Error ParseError

data ParseError = ParseError String ByteString
    deriving (Int -> ParseError -> ShowS
[ParseError] -> ShowS
ParseError -> String
(Int -> ParseError -> ShowS)
-> (ParseError -> String)
-> ([ParseError] -> ShowS)
-> Show ParseError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ParseError -> ShowS
showsPrec :: Int -> ParseError -> ShowS
$cshow :: ParseError -> String
show :: ParseError -> String
$cshowList :: [ParseError] -> ShowS
showList :: [ParseError] -> ShowS
Show, Show ParseError
Typeable ParseError
(Typeable ParseError, Show ParseError) =>
(ParseError -> SomeException)
-> (SomeException -> Maybe ParseError)
-> (ParseError -> String)
-> (ParseError -> Bool)
-> Exception ParseError
SomeException -> Maybe ParseError
ParseError -> Bool
ParseError -> String
ParseError -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e)
-> (e -> String)
-> (e -> Bool)
-> Exception e
$ctoException :: ParseError -> SomeException
toException :: ParseError -> SomeException
$cfromException :: SomeException -> Maybe ParseError
fromException :: SomeException -> Maybe ParseError
$cdisplayException :: ParseError -> String
displayException :: ParseError -> String
$cbacktraceDesired :: ParseError -> Bool
backtraceDesired :: ParseError -> Bool
Exception)

type Microseconds = Int