module Database.Bolty.Session
(
BookmarkManager
, newBookmarkManager
, getBookmarks
, updateBookmark
, Session(..)
, SessionPool(..)
, SessionConfig(..)
, defaultSessionConfig
, createSession
, createRoutingSession
, readTransaction
, writeTransaction
, getLastBookmarks
) where
import Control.Concurrent (threadDelay)
import Control.Exception (SomeException, fromException, onException,
throwIO, try)
import Control.Monad (when)
import qualified Data.HashMap.Lazy as H
import Data.IORef (IORef, newIORef, readIORef, writeIORef)
import Data.Kind (Type)
import Data.Text (Text)
import qualified Data.Vector as V
import GHC.Stack (HasCallStack)
import Database.Bolty.Connection.Type
import qualified Database.Bolty.Connection.Pipe as P
import Database.Bolty.Message.Request (Begin(Begin), TelemetryApi(..))
import Database.Bolty.Pool
import Database.Bolty.Routing (AccessMode(..), RoutingPool(..), withRoutingConnection,
invalidateRoutingTable)
type BookmarkManager :: Type
newtype BookmarkManager = BookmarkManager (IORef [Text])
newBookmarkManager :: [Text] -> IO BookmarkManager
newBookmarkManager :: [Text] -> IO BookmarkManager
newBookmarkManager [Text]
initial = IORef [Text] -> BookmarkManager
BookmarkManager (IORef [Text] -> BookmarkManager)
-> IO (IORef [Text]) -> IO BookmarkManager
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Text] -> IO (IORef [Text])
forall a. a -> IO (IORef a)
newIORef [Text]
initial
getBookmarks :: BookmarkManager -> IO [Text]
getBookmarks :: BookmarkManager -> IO [Text]
getBookmarks (BookmarkManager IORef [Text]
ref) = IORef [Text] -> IO [Text]
forall a. IORef a -> IO a
readIORef IORef [Text]
ref
updateBookmark :: BookmarkManager -> Text -> IO ()
updateBookmark :: BookmarkManager -> Text -> IO ()
updateBookmark (BookmarkManager IORef [Text]
ref) Text
bm = IORef [Text] -> [Text] -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef [Text]
ref [Text
bm]
type SessionConfig :: Type
data SessionConfig = SessionConfig
{ SessionConfig -> Maybe Text
database :: !(Maybe Text)
, SessionConfig -> AccessMode
accessMode :: !AccessMode
, SessionConfig -> [Text]
sessionBookmarks :: ![Text]
}
defaultSessionConfig :: SessionConfig
defaultSessionConfig :: SessionConfig
defaultSessionConfig = SessionConfig
{ database :: Maybe Text
database = Maybe Text
forall a. Maybe a
Nothing
, accessMode :: AccessMode
accessMode = AccessMode
WriteAccess
, sessionBookmarks :: [Text]
sessionBookmarks = []
}
type SessionPool :: Type
data SessionPool
= DirectPool !BoltPool
| RoutedPool !RoutingPool
type Session :: Type
data Session = Session
{ Session -> SessionPool
sPool :: !SessionPool
, Session -> BookmarkManager
sBookmarks :: !BookmarkManager
, Session -> SessionConfig
sConfig :: !SessionConfig
, Session -> RetryConfig
sRetry :: !RetryConfig
, Session -> IORef Bool
sTelemetrySent :: !(IORef Bool)
}
createSession :: BoltPool -> SessionConfig -> IO Session
createSession :: BoltPool -> SessionConfig -> IO Session
createSession BoltPool
pool SessionConfig
cfg = do
bm <- [Text] -> IO BookmarkManager
newBookmarkManager (SessionConfig -> [Text]
sessionBookmarks SessionConfig
cfg)
ts <- newIORef False
pure Session
{ sPool = DirectPool pool
, sBookmarks = bm
, sConfig = cfg
, sRetry = bpRetryConfig pool
, sTelemetrySent = ts
}
createRoutingSession :: RoutingPool -> SessionConfig -> IO Session
createRoutingSession :: RoutingPool -> SessionConfig -> IO Session
createRoutingSession RoutingPool
rp SessionConfig
cfg = do
bm <- [Text] -> IO BookmarkManager
newBookmarkManager (SessionConfig -> [Text]
sessionBookmarks SessionConfig
cfg)
ts <- newIORef False
pure Session
{ sPool = RoutedPool rp
, sBookmarks = bm
, sConfig = cfg
, sRetry = retryConfig (rpPoolConfig rp)
, sTelemetrySent = ts
}
getLastBookmarks :: Session -> IO [Text]
getLastBookmarks :: Session -> IO [Text]
getLastBookmarks Session{BookmarkManager
sBookmarks :: Session -> BookmarkManager
sBookmarks :: BookmarkManager
sBookmarks} = BookmarkManager -> IO [Text]
getBookmarks BookmarkManager
sBookmarks
readTransaction :: HasCallStack => Session -> (Connection -> IO a) -> IO a
readTransaction :: forall a. HasCallStack => Session -> (Connection -> IO a) -> IO a
readTransaction Session
session Connection -> IO a
action = Session -> AccessMode -> (Connection -> IO a) -> IO a
forall a.
HasCallStack =>
Session -> AccessMode -> (Connection -> IO a) -> IO a
managedTransaction Session
session AccessMode
ReadAccess Connection -> IO a
action
writeTransaction :: HasCallStack => Session -> (Connection -> IO a) -> IO a
writeTransaction :: forall a. HasCallStack => Session -> (Connection -> IO a) -> IO a
writeTransaction Session
session Connection -> IO a
action = Session -> AccessMode -> (Connection -> IO a) -> IO a
forall a.
HasCallStack =>
Session -> AccessMode -> (Connection -> IO a) -> IO a
managedTransaction Session
session AccessMode
WriteAccess Connection -> IO a
action
managedTransaction :: HasCallStack => Session -> AccessMode -> (Connection -> IO a) -> IO a
managedTransaction :: forall a.
HasCallStack =>
Session -> AccessMode -> (Connection -> IO a) -> IO a
managedTransaction Session{SessionPool
sPool :: Session -> SessionPool
sPool :: SessionPool
sPool, BookmarkManager
sBookmarks :: Session -> BookmarkManager
sBookmarks :: BookmarkManager
sBookmarks, SessionConfig
sConfig :: Session -> SessionConfig
sConfig :: SessionConfig
sConfig, RetryConfig
sRetry :: Session -> RetryConfig
sRetry :: RetryConfig
sRetry, IORef Bool
sTelemetrySent :: Session -> IORef Bool
sTelemetrySent :: IORef Bool
sTelemetrySent} AccessMode
mode Connection -> IO a
action =
let maxR :: Int
maxR = RetryConfig -> Int
maxRetries RetryConfig
sRetry
initD :: Int
initD = RetryConfig -> Int
initialDelay RetryConfig
sRetry
maxD :: Int
maxD = RetryConfig -> Int
maxDelay RetryConfig
sRetry
in Int -> Int -> Int -> IO a
go Int
maxR Int
initD Int
maxD
where
go :: Int -> Int -> Int -> IO a
go Int
0 Int
_ Int
_ = IO a
attempt
go Int
n Int
delay Int
maxD = do
result <- IO a -> IO (Either SomeException a)
forall e a. Exception e => IO a -> IO (Either e a)
try IO a
attempt
case result of
Right a
x -> a -> IO a
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
x
Left (SomeException
e :: SomeException) -> case SomeException -> Maybe Error
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e :: Maybe Error of
Just Error
err
| Error -> Bool
isTransient Error
err -> do
Int -> IO ()
threadDelay Int
delay
Int -> Int -> Int -> IO a
go (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) (Int -> Int -> Int
forall a. Ord a => a -> a -> a
min Int
maxD (Int
delay Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
2)) Int
maxD
| Error -> Bool
isRoutingError Error
err -> do
SessionPool -> IO ()
invalidatePool SessionPool
sPool
Int -> IO ()
threadDelay Int
delay
Int -> Int -> Int -> IO a
go (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) (Int -> Int -> Int
forall a. Ord a => a -> a -> a
min Int
maxD (Int
delay Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
2)) Int
maxD
Maybe Error
_ -> SomeException -> IO a
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO SomeException
e
modeChar :: Char
modeChar = case AccessMode
mode of
AccessMode
ReadAccess -> Char
'r'
AccessMode
WriteAccess -> Char
'w'
attempt :: IO a
attempt =
SessionPool -> AccessMode -> (Connection -> IO a) -> IO a
forall a. SessionPool -> AccessMode -> (Connection -> IO a) -> IO a
withSessionConnection SessionPool
sPool AccessMode
mode ((Connection -> IO a) -> IO a) -> (Connection -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \Connection
conn -> do
bms <- BookmarkManager -> IO [Text]
getBookmarks BookmarkManager
sBookmarks
P.beginTx conn $ Begin (V.fromList bms) Nothing H.empty modeChar (database sConfig) Nothing
result <- action conn `onException` P.tryRollback conn
mbBookmark <- P.commitTx conn
case mbBookmark of
Just Text
bm -> BookmarkManager -> Text -> IO ()
updateBookmark BookmarkManager
sBookmarks Text
bm
Maybe Text
Nothing -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
sent <- readIORef sTelemetrySent
when (not sent) $ do
writeIORef sTelemetrySent True
P.sendTelemetry conn ManagedTransactions
pure result
invalidatePool :: SessionPool -> IO ()
invalidatePool (RoutedPool RoutingPool
rp) = RoutingPool -> IO ()
invalidateRoutingTable RoutingPool
rp
invalidatePool (DirectPool BoltPool
_) = () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
withSessionConnection :: SessionPool -> AccessMode -> (Connection -> IO a) -> IO a
withSessionConnection :: forall a. SessionPool -> AccessMode -> (Connection -> IO a) -> IO a
withSessionConnection (DirectPool BoltPool
pool) AccessMode
_mode Connection -> IO a
action =
BoltPool -> (Connection -> IO a) -> IO a
forall a. HasCallStack => BoltPool -> (Connection -> IO a) -> IO a
withConnection BoltPool
pool Connection -> IO a
action
withSessionConnection (RoutedPool RoutingPool
rp) AccessMode
mode Connection -> IO a
action =
RoutingPool -> AccessMode -> (Connection -> IO a) -> IO a
forall a.
HasCallStack =>
RoutingPool -> AccessMode -> (Connection -> IO a) -> IO a
withRoutingConnection RoutingPool
rp AccessMode
mode Connection -> IO a
action