{-# OPTIONS_HADDOCK not-home #-}

-- | Internal implementation details for "Data.Pool".
--
-- This module is intended for internal use only, and may change without warning
-- in subsequent releases.
module Data.Pool.Internal where

import Control.Concurrent
import Control.Concurrent.STM
import Control.Exception
import Control.Monad
import Data.Either
import Data.Hashable (hash)
import Data.IORef
import Data.List qualified as L
import Data.Primitive.SmallArray
import Data.Text qualified as T
import GHC.Clock (getMonotonicTime)
import GHC.Conc (labelThread, unsafeIOToSTM)

-- | Striped resource pool based on "Control.Concurrent.QSem".
data Pool a = Pool
  { forall a. Pool a -> PoolConfig a
poolConfig :: !(PoolConfig a)
  , forall a. Pool a -> SmallArray (LocalPool a)
localPools :: !(SmallArray (LocalPool a))
  , forall a. Pool a -> IORef ()
reaperRef :: !(IORef ())
  }

-- | A single, local pool.
data LocalPool a = LocalPool
  { forall a. LocalPool a -> Int
stripeId :: !Int
  , forall a. LocalPool a -> TVar (Stripe a)
stripeVar :: !(TVar (Stripe a))
  , forall a. LocalPool a -> IORef ()
cleanerRef :: !(IORef ())
  }

-- | Stripe of a resource pool. If @available@ is 0, the list of threads waiting
-- for a resource (each with an associated 'TMVar') is @queue ++ reverse queueR@
-- to ensure fairness.
data Stripe a = Stripe
  { forall a. Stripe a -> Int
available :: !Int
  , forall a. Stripe a -> [Entry a]
cache :: ![Entry a]
  , forall a. Stripe a -> Queue a
queue :: !(Queue a)
  , forall a. Stripe a -> Queue a
queueR :: !(Queue a)
  }

-- | An existing resource currently sitting in a pool.
data Entry a = Entry
  { forall a. Entry a -> a
entry :: a
  , forall a. Entry a -> Double
lastUsed :: !Double
  }

-- | A queue of TMVarS corresponding to threads waiting for resources.
--
-- Basically a monomorphic list to save two pointer indirections.
data Queue a = Queue !(TMVar (Maybe a)) (Queue a) | Empty

-- | Configuration of a 'Pool'.
data PoolConfig a = PoolConfig
  { forall a. PoolConfig a -> IO a
createResource :: !(IO a)
  , forall a. PoolConfig a -> a -> IO ()
freeResource :: !(a -> IO ())
  , forall a. PoolConfig a -> Double
poolCacheTTL :: !Double
  , forall a. PoolConfig a -> Int
poolMaxResources :: !Int
  , forall a. PoolConfig a -> Maybe Int
poolNumStripes :: !(Maybe Int)
  , forall a. PoolConfig a -> Text
pcLabel :: !T.Text
  }

-- | Create a 'PoolConfig' with optional parameters having default values.
--
-- For setting optional parameters have a look at:
--
-- - 'setNumStripes'
--
-- @since 0.4.0.0
defaultPoolConfig
  :: IO a
  -- ^ The action that creates a new resource.
  -> (a -> IO ())
  -- ^ The action that destroys an existing resource.
  -> Double
  -- ^ The number of seconds for which an unused resource is kept around. The
  -- smallest acceptable value is @0.5@.
  --
  -- /Note:/ the elapsed time before destroying a resource may be a little
  -- longer than requested, as the collector thread wakes at 1-second intervals.
  -> Int
  -- ^ The maximum number of resources to keep open __across all stripes__. The
  -- smallest acceptable value is @1@ per stripe.
  --
  -- /Note:/ if the number of stripes does not divide the number of resources,
  -- some of the stripes will have 1 more resource available than the others.
  -> PoolConfig a
defaultPoolConfig :: forall a. IO a -> (a -> IO ()) -> Double -> Int -> PoolConfig a
defaultPoolConfig IO a
create a -> IO ()
free Double
cacheTTL Int
maxResources =
  PoolConfig
    { createResource :: IO a
createResource = IO a
create
    , freeResource :: a -> IO ()
freeResource = a -> IO ()
free
    , poolCacheTTL :: Double
poolCacheTTL = Double
cacheTTL
    , poolMaxResources :: Int
poolMaxResources = Int
maxResources
    , poolNumStripes :: Maybe Int
poolNumStripes = Int -> Maybe Int
forall a. a -> Maybe a
Just Int
1
    , pcLabel :: Text
pcLabel = Text
T.empty
    }

-- | Set the number of stripes (sub-pools) in the pool.
--
-- If not explicitly set, the default number of stripes is 1, which should be
-- good for typical use (when in doubt, profile your application first).
--
-- If set to 'Nothing', the pool will create the number of stripes equal to the
-- number of capabilities.
--
-- /Note:/ usage of multiple stripes reduces contention, but can also result in
-- suboptimal use of resources since stripes are separated from each other.
--
-- @since 0.4.0.0
setNumStripes :: Maybe Int -> PoolConfig a -> PoolConfig a
setNumStripes :: forall a. Maybe Int -> PoolConfig a -> PoolConfig a
setNumStripes Maybe Int
numStripes PoolConfig a
pc = PoolConfig a
pc {poolNumStripes = numStripes}

-- | Assign a label to the pool.
--
-- The label will appear in a label of the collector thread as well as
-- t'Data.Pool.Introspection.Resource'.
--
-- @since 0.5.0.0
setPoolLabel :: T.Text -> PoolConfig a -> PoolConfig a
setPoolLabel :: forall a. Text -> PoolConfig a -> PoolConfig a
setPoolLabel Text
label PoolConfig a
pc = PoolConfig a
pc {pcLabel = label}

-- | Create a new striped resource pool.
--
-- /Note:/ although the runtime system will destroy all idle resources when the
-- pool is garbage collected, it's recommended to manually call
-- 'destroyAllResources' when you're done with the pool so that the resources
-- are freed up as soon as possible.
newPool :: PoolConfig a -> IO (Pool a)
newPool :: forall a. PoolConfig a -> IO (Pool a)
newPool PoolConfig a
pc = do
  Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (PoolConfig a -> Double
forall a. PoolConfig a -> Double
poolCacheTTL PoolConfig a
pc Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
< Double
0.5) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    [Char] -> IO ()
forall a. HasCallStack => [Char] -> a
error [Char]
"poolCacheTTL must be at least 0.5"
  Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (PoolConfig a -> Int
forall a. PoolConfig a -> Int
poolMaxResources PoolConfig a
pc Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
1) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    [Char] -> IO ()
forall a. HasCallStack => [Char] -> a
error [Char]
"poolMaxResources must be at least 1"
  Int
numStripes <- IO Int -> (Int -> IO Int) -> Maybe Int -> IO Int
forall b a. b -> (a -> b) -> Maybe a -> b
maybe IO Int
getNumCapabilities Int -> IO Int
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (PoolConfig a -> Maybe Int
forall a. PoolConfig a -> Maybe Int
poolNumStripes PoolConfig a
pc)
  Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
numStripes Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
1) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    [Char] -> IO ()
forall a. HasCallStack => [Char] -> a
error [Char]
"numStripes must be at least 1"
  Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (PoolConfig a -> Int
forall a. PoolConfig a -> Int
poolMaxResources PoolConfig a
pc Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
numStripes) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    [Char] -> IO ()
forall a. HasCallStack => [Char] -> a
error [Char]
"poolMaxResources must not be smaller than numStripes"
  let mkArray :: IO [a] -> IO (SmallArray a)
mkArray = ([a] -> SmallArray a) -> IO [a] -> IO (SmallArray a)
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Int -> [a] -> SmallArray a
forall a. Int -> [a] -> SmallArray a
smallArrayFromListN Int
numStripes)
  SmallArray (LocalPool a)
pools <- IO [LocalPool a] -> IO (SmallArray (LocalPool a))
forall {a}. IO [a] -> IO (SmallArray a)
mkArray (IO [LocalPool a] -> IO (SmallArray (LocalPool a)))
-> (((Int, Int) -> IO (LocalPool a)) -> IO [LocalPool a])
-> ((Int, Int) -> IO (LocalPool a))
-> IO (SmallArray (LocalPool a))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [(Int, Int)]
-> ((Int, Int) -> IO (LocalPool a)) -> IO [LocalPool a]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM (Int -> [(Int, Int)]
stripeResources Int
numStripes) (((Int, Int) -> IO (LocalPool a)) -> IO (SmallArray (LocalPool a)))
-> ((Int, Int) -> IO (LocalPool a))
-> IO (SmallArray (LocalPool a))
forall a b. (a -> b) -> a -> b
$ \(Int
n, Int
resources) -> do
    IORef ()
ref <- () -> IO (IORef ())
forall a. a -> IO (IORef a)
newIORef ()
    TVar (Stripe a)
stripe <-
      Stripe a -> IO (TVar (Stripe a))
forall a. a -> IO (TVar a)
newTVarIO
        Stripe
          { available :: Int
available = Int
resources
          , cache :: [Entry a]
cache = []
          , queue :: Queue a
queue = Queue a
forall a. Queue a
Empty
          , queueR :: Queue a
queueR = Queue a
forall a. Queue a
Empty
          }
    -- When the local pool goes out of scope, free its resources.
    IO (Weak (IORef ())) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Weak (IORef ())) -> IO ())
-> (IO () -> IO (Weak (IORef ()))) -> IO () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IORef () -> IO () -> IO (Weak (IORef ()))
forall a. IORef a -> IO () -> IO (Weak (IORef a))
mkWeakIORef IORef ()
ref (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ (Entry a -> Bool) -> (a -> IO ()) -> TVar (Stripe a) -> IO ()
forall a.
(Entry a -> Bool) -> (a -> IO ()) -> TVar (Stripe a) -> IO ()
cleanStripe (Bool -> Entry a -> Bool
forall a b. a -> b -> a
const Bool
True) (PoolConfig a -> a -> IO ()
forall a. PoolConfig a -> a -> IO ()
freeResource PoolConfig a
pc) TVar (Stripe a)
stripe
    LocalPool a -> IO (LocalPool a)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
      LocalPool
        { stripeId :: Int
stripeId = Int
n
        , stripeVar :: TVar (Stripe a)
stripeVar = TVar (Stripe a)
stripe
        , cleanerRef :: IORef ()
cleanerRef = IORef ()
ref
        }
  IO (Pool a) -> IO (Pool a)
forall a. IO a -> IO a
mask_ (IO (Pool a) -> IO (Pool a)) -> IO (Pool a) -> IO (Pool a)
forall a b. (a -> b) -> a -> b
$ do
    IORef ()
ref <- () -> IO (IORef ())
forall a. a -> IO (IORef a)
newIORef ()
    ThreadId
collectorA <- ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forkIOWithUnmask (((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId)
-> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
unmask -> IO () -> IO ()
forall a. IO a -> IO a
unmask (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
      ThreadId
tid <- IO ThreadId
myThreadId
      ThreadId -> [Char] -> IO ()
labelThread ThreadId
tid ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"resource-pool: collector (" [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ Text -> [Char]
T.unpack (PoolConfig a -> Text
forall a. PoolConfig a -> Text
pcLabel PoolConfig a
pc) [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
")"
      SmallArray (LocalPool a) -> IO ()
forall {t :: * -> *} {b}. Foldable t => t (LocalPool a) -> IO b
collector SmallArray (LocalPool a)
pools
    IO (Weak (IORef ())) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Weak (IORef ())) -> IO ())
-> (IO () -> IO (Weak (IORef ()))) -> IO () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IORef () -> IO () -> IO (Weak (IORef ()))
forall a. IORef a -> IO () -> IO (Weak (IORef a))
mkWeakIORef IORef ()
ref (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
      -- When the pool goes out of scope, stop the collector. Resources existing
      -- in stripes will be taken care by their cleaners.
      ThreadId -> IO ()
killThread ThreadId
collectorA
    Pool a -> IO (Pool a)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
      Pool
        { poolConfig :: PoolConfig a
poolConfig = PoolConfig a
pc
        , localPools :: SmallArray (LocalPool a)
localPools = SmallArray (LocalPool a)
pools
        , reaperRef :: IORef ()
reaperRef = IORef ()
ref
        }
  where
    stripeResources :: Int -> [(Int, Int)]
    stripeResources :: Int -> [(Int, Int)]
stripeResources Int
numStripes =
      let (Int
base, Int
rest) = Int -> Int -> (Int, Int)
forall a. Integral a => a -> a -> (a, a)
quotRem (PoolConfig a -> Int
forall a. PoolConfig a -> Int
poolMaxResources PoolConfig a
pc) Int
numStripes
      in [Int] -> [Int] -> [(Int, Int)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Int
1 .. Int
numStripes] ([Int] -> [(Int, Int)]) -> [Int] -> [(Int, Int)]
forall a b. (a -> b) -> a -> b
$ [Int] -> Int -> [Int]
forall {t} {a}. (Eq t, Num t, Num a) => [a] -> t -> [a]
addRest (Int -> Int -> [Int]
forall a. Int -> a -> [a]
replicate Int
numStripes Int
base) Int
rest
      where
        addRest :: [a] -> t -> [a]
addRest [] = [Char] -> t -> [a]
forall a. HasCallStack => [Char] -> a
error [Char]
"unreachable"
        addRest acc :: [a]
acc@(a
r : [a]
rs) = \case
          t
0 -> [a]
acc
          t
rest -> a
r a -> a -> a
forall a. Num a => a -> a -> a
+ a
1 a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a] -> t -> [a]
addRest [a]
rs (t
rest t -> t -> t
forall a. Num a => a -> a -> a
- t
1)

    -- Collect stale resources from the pool once per second.
    collector :: t (LocalPool a) -> IO b
collector t (LocalPool a)
pools = IO () -> IO b
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO b) -> IO () -> IO b
forall a b. (a -> b) -> a -> b
$ do
      Int -> IO ()
threadDelay Int
1000000
      Double
now <- IO Double
getMonotonicTime
      let isStale :: Entry a -> Bool
isStale Entry a
e = Double
now Double -> Double -> Double
forall a. Num a => a -> a -> a
- Entry a -> Double
forall a. Entry a -> Double
lastUsed Entry a
e Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
> PoolConfig a -> Double
forall a. PoolConfig a -> Double
poolCacheTTL PoolConfig a
pc
      (LocalPool a -> IO ()) -> t (LocalPool a) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ ((Entry a -> Bool) -> (a -> IO ()) -> TVar (Stripe a) -> IO ()
forall a.
(Entry a -> Bool) -> (a -> IO ()) -> TVar (Stripe a) -> IO ()
cleanStripe Entry a -> Bool
forall {a}. Entry a -> Bool
isStale (PoolConfig a -> a -> IO ()
forall a. PoolConfig a -> a -> IO ()
freeResource PoolConfig a
pc) (TVar (Stripe a) -> IO ())
-> (LocalPool a -> TVar (Stripe a)) -> LocalPool a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. LocalPool a -> TVar (Stripe a)
forall a. LocalPool a -> TVar (Stripe a)
stripeVar) t (LocalPool a)
pools

-- | Destroy a resource.
--
-- Note that this will ignore any exceptions in the destroy function.
destroyResource :: Pool a -> LocalPool a -> a -> IO ()
destroyResource :: forall a. Pool a -> LocalPool a -> a -> IO ()
destroyResource Pool a
pool LocalPool a
lp a
a = IO () -> IO ()
forall a. IO a -> IO a
mask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
  STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    Stripe a
stripe <- TVar (Stripe a) -> STM (Stripe a)
forall a. TVar a -> STM a
readTVar (LocalPool a -> TVar (Stripe a)
forall a. LocalPool a -> TVar (Stripe a)
stripeVar LocalPool a
lp)
    Stripe a
newStripe <- Stripe a -> Maybe a -> STM (Stripe a)
forall a. Stripe a -> Maybe a -> STM (Stripe a)
signal Stripe a
stripe Maybe a
forall a. Maybe a
Nothing
    TVar (Stripe a) -> Stripe a -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (LocalPool a -> TVar (Stripe a)
forall a. LocalPool a -> TVar (Stripe a)
stripeVar LocalPool a
lp) (Stripe a -> STM ()) -> Stripe a -> STM ()
forall a b. (a -> b) -> a -> b
$! Stripe a
newStripe
  PoolConfig a -> a -> IO ()
forall a. PoolConfig a -> a -> IO ()
freeResource (Pool a -> PoolConfig a
forall a. Pool a -> PoolConfig a
poolConfig Pool a
pool) a
a

-- | Return a resource to the given 'LocalPool'.
putResource :: LocalPool a -> a -> IO ()
putResource :: forall a. LocalPool a -> a -> IO ()
putResource LocalPool a
lp a
a = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
  Stripe a
stripe <- TVar (Stripe a) -> STM (Stripe a)
forall a. TVar a -> STM a
readTVar (LocalPool a -> TVar (Stripe a)
forall a. LocalPool a -> TVar (Stripe a)
stripeVar LocalPool a
lp)
  Stripe a
newStripe <- Stripe a -> Maybe a -> STM (Stripe a)
forall a. Stripe a -> Maybe a -> STM (Stripe a)
signal Stripe a
stripe (a -> Maybe a
forall a. a -> Maybe a
Just a
a)
  TVar (Stripe a) -> Stripe a -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (LocalPool a -> TVar (Stripe a)
forall a. LocalPool a -> TVar (Stripe a)
stripeVar LocalPool a
lp) (Stripe a -> STM ()) -> Stripe a -> STM ()
forall a b. (a -> b) -> a -> b
$! Stripe a
newStripe

-- | Destroy all resources in all stripes in the pool.
--
-- Note that this will ignore any exceptions in the destroy function.
--
-- This function is useful when you detect that all resources in the pool are
-- broken. For example after a database has been restarted all connections
-- opened before the restart will be broken. In that case it's better to close
-- those connections so that 'takeResource' won't take a broken connection from
-- the pool but will open a new connection instead.
--
-- Another use-case for this function is that when you know you are done with
-- the pool you can destroy all idle resources immediately instead of waiting on
-- the garbage collector to destroy them, thus freeing up those resources
-- sooner.
destroyAllResources :: Pool a -> IO ()
destroyAllResources :: forall a. Pool a -> IO ()
destroyAllResources Pool a
pool = SmallArray (LocalPool a) -> (LocalPool a -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (Pool a -> SmallArray (LocalPool a)
forall a. Pool a -> SmallArray (LocalPool a)
localPools Pool a
pool) ((LocalPool a -> IO ()) -> IO ())
-> (LocalPool a -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \LocalPool a
lp -> do
  (Entry a -> Bool) -> (a -> IO ()) -> TVar (Stripe a) -> IO ()
forall a.
(Entry a -> Bool) -> (a -> IO ()) -> TVar (Stripe a) -> IO ()
cleanStripe (Bool -> Entry a -> Bool
forall a b. a -> b -> a
const Bool
True) (PoolConfig a -> a -> IO ()
forall a. PoolConfig a -> a -> IO ()
freeResource (Pool a -> PoolConfig a
forall a. Pool a -> PoolConfig a
poolConfig Pool a
pool)) (LocalPool a -> TVar (Stripe a)
forall a. LocalPool a -> TVar (Stripe a)
stripeVar LocalPool a
lp)

----------------------------------------
-- Helpers

-- | Get a local pool.
getLocalPool :: SmallArray (LocalPool a) -> IO (LocalPool a)
getLocalPool :: forall a. SmallArray (LocalPool a) -> IO (LocalPool a)
getLocalPool SmallArray (LocalPool a)
pools = do
  Int
sid <-
    if Int
stripes Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
1
      then -- If there is just one stripe, there is no choice.
        Int -> IO Int
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int
0
      else do
        Int
capabilities <- IO Int
getNumCapabilities
        -- If the number of stripes is smaller than the number of capabilities and
        -- doesn't divide it, selecting a stripe by a capability the current
        -- thread runs on wouldn't give equal load distribution across all stripes
        -- (e.g. if there are 2 stripes and 3 capabilities, stripe 0 would be used
        -- by capability 0 and 2, while stripe 1 would only be used by capability
        -- 1, a 100% load difference). In such case we select based on the id of a
        -- thread.
        if Int
stripes Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
capabilities Bool -> Bool -> Bool
&& Int
capabilities Int -> Int -> Int
forall a. Integral a => a -> a -> a
`rem` Int
stripes Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
/= Int
0
          then ThreadId -> Int
forall a. Hashable a => a -> Int
hash (ThreadId -> Int) -> IO ThreadId -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO ThreadId
myThreadId
          else ((Int, Bool) -> Int) -> IO (Int, Bool) -> IO Int
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Int, Bool) -> Int
forall a b. (a, b) -> a
fst (IO (Int, Bool) -> IO Int)
-> (ThreadId -> IO (Int, Bool)) -> ThreadId -> IO Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ThreadId -> IO (Int, Bool)
threadCapability (ThreadId -> IO Int) -> IO ThreadId -> IO Int
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO ThreadId
myThreadId
  LocalPool a -> IO (LocalPool a)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (LocalPool a -> IO (LocalPool a))
-> LocalPool a -> IO (LocalPool a)
forall a b. (a -> b) -> a -> b
$ SmallArray (LocalPool a)
pools SmallArray (LocalPool a) -> Int -> LocalPool a
forall a. SmallArray a -> Int -> a
`indexSmallArray` (Int
sid Int -> Int -> Int
forall a. Integral a => a -> a -> a
`rem` Int
stripes)
  where
    stripes :: Int
stripes = SmallArray (LocalPool a) -> Int
forall a. SmallArray a -> Int
sizeofSmallArray SmallArray (LocalPool a)
pools

-- | Wait for the resource to be put into a given 'TMVar'.
waitForResource :: TVar (Stripe a) -> TMVar (Maybe a) -> IO (Maybe a)
waitForResource :: forall a. TVar (Stripe a) -> TMVar (Maybe a) -> IO (Maybe a)
waitForResource TVar (Stripe a)
mstripe TMVar (Maybe a)
q = STM (Maybe a) -> IO (Maybe a)
forall a. STM a -> IO a
atomically (TMVar (Maybe a) -> STM (Maybe a)
forall a. TMVar a -> STM a
takeTMVar TMVar (Maybe a)
q) IO (Maybe a) -> IO () -> IO (Maybe a)
forall a b. IO a -> IO b -> IO a
`onException` IO ()
cleanup
  where
    cleanup :: IO ()
cleanup = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
      Stripe a
stripe <- TVar (Stripe a) -> STM (Stripe a)
forall a. TVar a -> STM a
readTVar TVar (Stripe a)
mstripe
      Stripe a
newStripe <-
        TMVar (Maybe a) -> STM (Maybe (Maybe a))
forall a. TMVar a -> STM (Maybe a)
tryTakeTMVar TMVar (Maybe a)
q STM (Maybe (Maybe a))
-> (Maybe (Maybe a) -> STM (Stripe a)) -> STM (Stripe a)
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
          Just Maybe a
ma -> do
            -- Between entering the exception handler and taking ownership of
            -- the stripe we got the resource we wanted. We don't need it
            -- anymore though, so pass it to someone else.
            Stripe a -> Maybe a -> STM (Stripe a)
forall a. Stripe a -> Maybe a -> STM (Stripe a)
signal Stripe a
stripe Maybe a
ma
          Maybe (Maybe a)
Nothing -> do
            -- If we're still waiting, fill up the TMVar with an undefined value
            -- so that 'signal' can discard our TMVar from the queue.
            TMVar (Maybe a) -> Maybe a -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (Maybe a)
q (Maybe a -> STM ()) -> Maybe a -> STM ()
forall a b. (a -> b) -> a -> b
$ [Char] -> Maybe a
forall a. HasCallStack => [Char] -> a
error [Char]
"unreachable"
            Stripe a -> STM (Stripe a)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Stripe a
stripe
      TVar (Stripe a) -> Stripe a -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Stripe a)
mstripe (Stripe a -> STM ()) -> Stripe a -> STM ()
forall a b. (a -> b) -> a -> b
$! Stripe a
newStripe

-- | If an exception is received while a resource is being created, restore the
-- original size of the stripe.
restoreSize :: TVar (Stripe a) -> IO ()
restoreSize :: forall a. TVar (Stripe a) -> IO ()
restoreSize TVar (Stripe a)
mstripe = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
  TVar (Stripe a) -> (Stripe a -> Stripe a) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Stripe a)
mstripe ((Stripe a -> Stripe a) -> STM ())
-> (Stripe a -> Stripe a) -> STM ()
forall a b. (a -> b) -> a -> b
$ \Stripe a
stripe -> Stripe a
stripe {available = available stripe + 1}

-- | Free resource entries in the stripes that fulfil a given condition.
cleanStripe
  :: (Entry a -> Bool)
  -> (a -> IO ())
  -> TVar (Stripe a)
  -> IO ()
cleanStripe :: forall a.
(Entry a -> Bool) -> (a -> IO ()) -> TVar (Stripe a) -> IO ()
cleanStripe Entry a -> Bool
isStale a -> IO ()
free TVar (Stripe a)
mstripe = IO () -> IO ()
forall a. IO a -> IO a
mask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
  -- Asynchronous exceptions need to be masked here to prevent leaking of
  -- 'stale' resources before they're freed.
  [a]
stale <- STM [a] -> IO [a]
forall a. STM a -> IO a
atomically (STM [a] -> IO [a]) -> STM [a] -> IO [a]
forall a b. (a -> b) -> a -> b
$ do
    Stripe a
stripe <- TVar (Stripe a) -> STM (Stripe a)
forall a. TVar a -> STM a
readTVar TVar (Stripe a)
mstripe
    let ([Entry a]
stale, [Entry a]
fresh) = (Entry a -> Bool) -> [Entry a] -> ([Entry a], [Entry a])
forall a. (a -> Bool) -> [a] -> ([a], [a])
L.partition Entry a -> Bool
isStale (Stripe a -> [Entry a]
forall a. Stripe a -> [Entry a]
cache Stripe a
stripe)
    -- There's no need to update 'available' here because it only tracks
    -- the number of resources taken from the pool.
    TVar (Stripe a) -> Stripe a -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Stripe a)
mstripe (Stripe a -> STM ()) -> Stripe a -> STM ()
forall a b. (a -> b) -> a -> b
$! Stripe a
stripe {cache = fresh}
    [a] -> STM [a]
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([a] -> STM [a]) -> [a] -> STM [a]
forall a b. (a -> b) -> a -> b
$ (Entry a -> a) -> [Entry a] -> [a]
forall a b. (a -> b) -> [a] -> [b]
map Entry a -> a
forall a. Entry a -> a
entry [Entry a]
stale
  -- We need to ignore exceptions in the 'free' function, otherwise if an
  -- exception is thrown half-way, we leak the rest of the resources. Also,
  -- asynchronous exceptions need to be hard masked here we need to run 'free'
  -- for all resources.
  ((forall a. IO a -> IO a) -> IO ()) -> IO ()
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
uninterruptibleMask (((forall a. IO a -> IO a) -> IO ()) -> IO ())
-> ((forall a. IO a -> IO a) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
release -> do
    [Either SomeException ()]
rs <- [a]
-> (a -> IO (Either SomeException ()))
-> IO [Either SomeException ()]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [a]
stale ((a -> IO (Either SomeException ()))
 -> IO [Either SomeException ()])
-> (a -> IO (Either SomeException ()))
-> IO [Either SomeException ()]
forall a b. (a -> b) -> a -> b
$ forall e a. Exception e => IO a -> IO (Either e a)
try @SomeException (IO () -> IO (Either SomeException ()))
-> (a -> IO ()) -> a -> IO (Either SomeException ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO ()
forall a. IO a -> IO a
release (IO () -> IO ()) -> (a -> IO ()) -> a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> IO ()
free
    -- If any async exception arrived in between, propagate it.
    [SomeException] -> IO ()
rethrowFirstAsyncException ([SomeException] -> IO ()) -> [SomeException] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Either SomeException ()] -> [SomeException]
forall a b. [Either a b] -> [a]
lefts [Either SomeException ()]
rs
  where
    rethrowFirstAsyncException :: [SomeException] -> IO ()
rethrowFirstAsyncException = \case
      [] -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
      SomeException
e : [SomeException]
es
        | Just SomeAsyncException {} <- SomeException -> Maybe SomeAsyncException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e -> SomeException -> IO ()
forall e a. Exception e => e -> IO a
throwIO SomeException
e
        | Bool
otherwise -> [SomeException] -> IO ()
rethrowFirstAsyncException [SomeException]
es

signal :: forall a. Stripe a -> Maybe a -> STM (Stripe a)
signal :: forall a. Stripe a -> Maybe a -> STM (Stripe a)
signal Stripe a
stripe Maybe a
ma =
  if Stripe a -> Int
forall a. Stripe a -> Int
available Stripe a
stripe Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
    then Queue a -> Queue a -> STM (Stripe a)
loop (Stripe a -> Queue a
forall a. Stripe a -> Queue a
queue Stripe a
stripe) (Stripe a -> Queue a
forall a. Stripe a -> Queue a
queueR Stripe a
stripe)
    else do
      [Entry a]
newCache <- case Maybe a
ma of
        Just a
a -> do
          Double
now <- IO Double -> STM Double
forall a. IO a -> STM a
unsafeIOToSTM IO Double
getMonotonicTime
          [Entry a] -> STM [Entry a]
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([Entry a] -> STM [Entry a]) -> [Entry a] -> STM [Entry a]
forall a b. (a -> b) -> a -> b
$ a -> Double -> Entry a
forall a. a -> Double -> Entry a
Entry a
a Double
now Entry a -> [Entry a] -> [Entry a]
forall a. a -> [a] -> [a]
: Stripe a -> [Entry a]
forall a. Stripe a -> [Entry a]
cache Stripe a
stripe
        Maybe a
Nothing -> [Entry a] -> STM [Entry a]
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([Entry a] -> STM [Entry a]) -> [Entry a] -> STM [Entry a]
forall a b. (a -> b) -> a -> b
$ Stripe a -> [Entry a]
forall a. Stripe a -> [Entry a]
cache Stripe a
stripe
      Stripe a -> STM (Stripe a)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
        Stripe a
stripe
          { available = available stripe + 1
          , cache = newCache
          }
  where
    loop :: Queue a -> Queue a -> STM (Stripe a)
    loop :: Queue a -> Queue a -> STM (Stripe a)
loop Queue a
Empty Queue a
Empty = do
      [Entry a]
newCache <- case Maybe a
ma of
        Just a
a -> do
          Double
now <- IO Double -> STM Double
forall a. IO a -> STM a
unsafeIOToSTM IO Double
getMonotonicTime
          [Entry a] -> STM [Entry a]
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [a -> Double -> Entry a
forall a. a -> Double -> Entry a
Entry a
a Double
now]
        Maybe a
Nothing -> [Entry a] -> STM [Entry a]
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
      Stripe a -> STM (Stripe a)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
        Stripe
          { available :: Int
available = Int
1
          , cache :: [Entry a]
cache = [Entry a]
newCache
          , queue :: Queue a
queue = Queue a
forall a. Queue a
Empty
          , queueR :: Queue a
queueR = Queue a
forall a. Queue a
Empty
          }
    loop Queue a
Empty Queue a
qR = Queue a -> Queue a -> STM (Stripe a)
loop (Queue a -> Queue a
reverseQueue Queue a
qR) Queue a
forall a. Queue a
Empty
    loop (Queue TMVar (Maybe a)
q Queue a
qs) Queue a
qR =
      TMVar (Maybe a) -> Maybe a -> STM Bool
forall a. TMVar a -> a -> STM Bool
tryPutTMVar TMVar (Maybe a)
q Maybe a
ma STM Bool -> (Bool -> STM (Stripe a)) -> STM (Stripe a)
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        -- This fails when 'waitForResource' went into the exception handler and
        -- filled the TMVar (with an undefined value) itself. In such case we
        -- simply ignore it.
        Bool
False -> Queue a -> Queue a -> STM (Stripe a)
loop Queue a
qs Queue a
qR
        Bool
True ->
          Stripe a -> STM (Stripe a)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
            Stripe a
stripe
              { available = 0
              , queue = qs
              , queueR = qR
              }

    reverseQueue :: Queue a -> Queue a
    reverseQueue :: Queue a -> Queue a
reverseQueue = Queue a -> Queue a -> Queue a
forall {a}. Queue a -> Queue a -> Queue a
go Queue a
forall a. Queue a
Empty
      where
        go :: Queue a -> Queue a -> Queue a
go Queue a
acc = \case
          Queue a
Empty -> Queue a
acc
          Queue TMVar (Maybe a)
x Queue a
xs -> Queue a -> Queue a -> Queue a
go (TMVar (Maybe a) -> Queue a -> Queue a
forall a. TMVar (Maybe a) -> Queue a -> Queue a
Queue TMVar (Maybe a)
x Queue a
acc) Queue a
xs