{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE OverloadedStrings #-}

module Network.AMQP.Worker.Connection
    ( Connection (..)
    , AMQP.ConnectionOpts (..)
    , AMQP.defaultConnectionOpts
    , connect
    , disconnect
    , withChannel
    ) where

import Control.Concurrent.MVar (MVar, newEmptyMVar, putMVar, readMVar, takeMVar)
import Control.Monad.Catch (catch, throwM)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Data.Function ((&))
import Data.Pool (Pool)
import qualified Data.Pool as Pool
import Data.Text (Text)
import Network.AMQP (AMQPException (..), Channel)
import qualified Network.AMQP as AMQP

type ExchangeName = Text

data Connection = Connection
    { Connection -> MVar Connection
amqpConn :: MVar AMQP.Connection
    , Connection -> Pool Channel
pool :: Pool Channel
    , Connection -> ExchangeName
exchange :: ExchangeName
    }

-- | Connect to the AMQP server.
--
-- > conn <- connect (fromURI "amqp://guest:guest@localhost:5672")
connect :: MonadIO m => AMQP.ConnectionOpts -> m Connection
connect :: forall (m :: * -> *). MonadIO m => ConnectionOpts -> m Connection
connect ConnectionOpts
opts = IO Connection -> m Connection
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Connection -> m Connection) -> IO Connection -> m Connection
forall a b. (a -> b) -> a -> b
$ do
    -- use a default exchange name
    let exchangeName :: ExchangeName
exchangeName = ExchangeName
"amq.topic"

    -- create a single connection in an mvar
    cvar <- IO (MVar Connection)
forall a. IO (MVar a)
newEmptyMVar
    openConnection cvar

    -- open a shared pool for channels
    chans <- Pool.newPool (config cvar)

    pure $ Connection cvar chans exchangeName
  where
    config :: MVar Connection -> PoolConfig Channel
config MVar Connection
cvar =
        IO Channel
-> (Channel -> IO ()) -> Double -> Int -> PoolConfig Channel
forall a. IO a -> (a -> IO ()) -> Double -> Int -> PoolConfig a
Pool.defaultPoolConfig (MVar Connection -> IO Channel
create MVar Connection
cvar) Channel -> IO ()
destroy Double
openTime Int
maxChans
            PoolConfig Channel
-> (PoolConfig Channel -> PoolConfig Channel) -> PoolConfig Channel
forall a b. a -> (a -> b) -> b
& Maybe Int -> PoolConfig Channel -> PoolConfig Channel
forall a. Maybe Int -> PoolConfig a -> PoolConfig a
Pool.setNumStripes (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
1)

    openTime :: Double
    openTime :: Double
openTime = Double
10

    maxChans :: Int
    maxChans :: Int
maxChans = Int
8

    openConnection :: MVar Connection -> IO ()
openConnection MVar Connection
cvar = do
        -- open a connection and store in the mvar
        conn <- ConnectionOpts -> IO Connection
AMQP.openConnection'' ConnectionOpts
opts
        putMVar cvar conn

    reopenConnection :: MVar Connection -> IO ()
reopenConnection MVar Connection
cvar = do
        -- clear the mvar and reopen
        _ <- MVar Connection -> IO Connection
forall a. MVar a -> IO a
takeMVar MVar Connection
cvar
        openConnection cvar

    create :: MVar Connection -> IO Channel
create MVar Connection
cvar = do
        conn <- MVar Connection -> IO Connection
forall a. MVar a -> IO a
readMVar MVar Connection
cvar
        chan <- catch (AMQP.openChannel conn) (createEx cvar)
        return chan

    createEx :: MVar Connection -> AMQPException -> IO Channel
createEx MVar Connection
cvar (ConnectionClosedException CloseType
_ String
_) = do
        MVar Connection -> IO ()
reopenConnection MVar Connection
cvar
        MVar Connection -> IO Channel
create MVar Connection
cvar
    createEx MVar Connection
_ AMQPException
ex = AMQPException -> IO Channel
forall e a. (HasCallStack, Exception e) => e -> IO a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM AMQPException
ex

    destroy :: Channel -> IO ()
destroy Channel
chan = do
        Channel -> IO ()
AMQP.closeChannel Channel
chan

disconnect :: MonadIO m => Connection -> m ()
disconnect :: forall (m :: * -> *). MonadIO m => Connection -> m ()
disconnect Connection
c = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
    conn <- MVar Connection -> IO Connection
forall a. MVar a -> IO a
readMVar (MVar Connection -> IO Connection)
-> MVar Connection -> IO Connection
forall a b. (a -> b) -> a -> b
$ Connection -> MVar Connection
amqpConn Connection
c
    Pool.destroyAllResources $ pool c
    AMQP.closeConnection conn

-- | Perform an action with a channel resource, and give it back at the end
withChannel :: Connection -> (Channel -> IO b) -> IO b
withChannel :: forall b. Connection -> (Channel -> IO b) -> IO b
withChannel (Connection MVar Connection
_ Pool Channel
p ExchangeName
_) Channel -> IO b
action = do
    Pool Channel -> (Channel -> IO b) -> IO b
forall a r. Pool a -> (a -> IO r) -> IO r
Pool.withResource Pool Channel
p Channel -> IO b
action