-- | Sessions: managed transactions with automatic bookmark tracking for causal consistency.
module Database.Bolty.Session
  ( -- * Bookmark management
    BookmarkManager
  , newBookmarkManager
  , getBookmarks
  , updateBookmark
    -- * Session
  , Session(..)
  , SessionPool(..)
  , SessionConfig(..)
  , defaultSessionConfig
  , createSession
  , createRoutingSession
    -- * Managed transactions
  , readTransaction
  , writeTransaction
    -- * Querying
  , getLastBookmarks
  ) where

import           Control.Concurrent             (threadDelay)
import           Control.Exception              (SomeException, fromException, onException,
                                                 throwIO, try)
import           Control.Monad                  (when)
import qualified Data.HashMap.Lazy              as H
import           Data.IORef                     (IORef, newIORef, readIORef, writeIORef)
import           Data.Kind                      (Type)
import           Data.Text                      (Text)
import qualified Data.Vector                    as V
import           GHC.Stack                      (HasCallStack)

import           Database.Bolty.Connection.Type
import qualified Database.Bolty.Connection.Pipe as P
import           Database.Bolty.Message.Request (Begin(Begin), TelemetryApi(..))
import           Database.Bolty.Pool
import           Database.Bolty.Routing (AccessMode(..), RoutingPool(..), withRoutingConnection,
                                        invalidateRoutingTable)


-- ---------------------------------------------------------------------------
-- Bookmark management
-- ---------------------------------------------------------------------------

-- | Mutable bookmark holder for causal consistency across transactions.
-- Within a session, each committed transaction produces a bookmark that
-- supersedes all previous bookmarks. The manager tracks the latest
-- bookmark(s) so they can be passed to the next transaction's BEGIN.
type BookmarkManager :: Type
newtype BookmarkManager = BookmarkManager (IORef [Text])

-- | Create a bookmark manager with optional initial bookmarks.
newBookmarkManager :: [Text] -> IO BookmarkManager
newBookmarkManager :: [Text] -> IO BookmarkManager
newBookmarkManager [Text]
initial = IORef [Text] -> BookmarkManager
BookmarkManager (IORef [Text] -> BookmarkManager)
-> IO (IORef [Text]) -> IO BookmarkManager
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Text] -> IO (IORef [Text])
forall a. a -> IO (IORef a)
newIORef [Text]
initial

-- | Get the current bookmarks.
getBookmarks :: BookmarkManager -> IO [Text]
getBookmarks :: BookmarkManager -> IO [Text]
getBookmarks (BookmarkManager IORef [Text]
ref) = IORef [Text] -> IO [Text]
forall a. IORef a -> IO a
readIORef IORef [Text]
ref

-- | Replace the current bookmarks with the new one from a COMMIT response.
updateBookmark :: BookmarkManager -> Text -> IO ()
updateBookmark :: BookmarkManager -> Text -> IO ()
updateBookmark (BookmarkManager IORef [Text]
ref) Text
bm = IORef [Text] -> [Text] -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef [Text]
ref [Text
bm]


-- ---------------------------------------------------------------------------
-- Session
-- ---------------------------------------------------------------------------

-- | Session configuration.
type SessionConfig :: Type
data SessionConfig = SessionConfig
  { SessionConfig -> Maybe Text
database         :: !(Maybe Text)
  -- ^ Database to use (Nothing = default database).
  , SessionConfig -> AccessMode
accessMode       :: !AccessMode
  -- ^ Default access mode for auto-commit queries.
  , SessionConfig -> [Text]
sessionBookmarks :: ![Text]
  -- ^ Initial bookmarks for causal consistency (e.g. from a previous session).
  }

-- | Default session configuration: default database, write access, no initial bookmarks.
defaultSessionConfig :: SessionConfig
defaultSessionConfig :: SessionConfig
defaultSessionConfig = SessionConfig
  { database :: Maybe Text
database         = Maybe Text
forall a. Maybe a
Nothing
  , accessMode :: AccessMode
accessMode       = AccessMode
WriteAccess
  , sessionBookmarks :: [Text]
sessionBookmarks = []
  }


-- | Which pool type the session uses (internal).
type SessionPool :: Type
data SessionPool
  = DirectPool !BoltPool
  | RoutedPool !RoutingPool

-- | A session bundles a connection pool with bookmark management and configuration.
-- Sessions track bookmarks automatically across managed transactions
-- ('readTransaction', 'writeTransaction') to ensure causal consistency.
type Session :: Type
data Session = Session
  { Session -> SessionPool
sPool            :: !SessionPool
  , Session -> BookmarkManager
sBookmarks       :: !BookmarkManager
  , Session -> SessionConfig
sConfig          :: !SessionConfig
  , Session -> RetryConfig
sRetry           :: !RetryConfig
  , Session -> IORef Bool
sTelemetrySent   :: !(IORef Bool)
  }


-- | Create a session using a direct (non-routing) connection pool.
createSession :: BoltPool -> SessionConfig -> IO Session
createSession :: BoltPool -> SessionConfig -> IO Session
createSession BoltPool
pool SessionConfig
cfg = do
  bm <- [Text] -> IO BookmarkManager
newBookmarkManager (SessionConfig -> [Text]
sessionBookmarks SessionConfig
cfg)
  ts <- newIORef False
  pure Session
    { sPool            = DirectPool pool
    , sBookmarks       = bm
    , sConfig          = cfg
    , sRetry           = bpRetryConfig pool
    , sTelemetrySent   = ts
    }


-- | Create a session using a routing-aware connection pool.
createRoutingSession :: RoutingPool -> SessionConfig -> IO Session
createRoutingSession :: RoutingPool -> SessionConfig -> IO Session
createRoutingSession RoutingPool
rp SessionConfig
cfg = do
  bm <- [Text] -> IO BookmarkManager
newBookmarkManager (SessionConfig -> [Text]
sessionBookmarks SessionConfig
cfg)
  ts <- newIORef False
  pure Session
    { sPool            = RoutedPool rp
    , sBookmarks       = bm
    , sConfig          = cfg
    , sRetry           = retryConfig (rpPoolConfig rp)
    , sTelemetrySent   = ts
    }


-- | Get the last bookmarks from the session. Pass these to a new session's
-- 'SessionConfig' to ensure the new session sees all committed writes.
getLastBookmarks :: Session -> IO [Text]
getLastBookmarks :: Session -> IO [Text]
getLastBookmarks Session{BookmarkManager
sBookmarks :: Session -> BookmarkManager
sBookmarks :: BookmarkManager
sBookmarks} = BookmarkManager -> IO [Text]
getBookmarks BookmarkManager
sBookmarks


-- ---------------------------------------------------------------------------
-- Managed transactions
-- ---------------------------------------------------------------------------

-- | Run a read transaction. Uses 'ReadAccess' for routing (directs to
-- read replicas in a cluster). Automatically handles BEGIN, COMMIT,
-- ROLLBACK, bookmark propagation, and retries on transient failures.
readTransaction :: HasCallStack => Session -> (Connection -> IO a) -> IO a
readTransaction :: forall a. HasCallStack => Session -> (Connection -> IO a) -> IO a
readTransaction Session
session Connection -> IO a
action = Session -> AccessMode -> (Connection -> IO a) -> IO a
forall a.
HasCallStack =>
Session -> AccessMode -> (Connection -> IO a) -> IO a
managedTransaction Session
session AccessMode
ReadAccess Connection -> IO a
action


-- | Run a write transaction. Uses 'WriteAccess' for routing (directs to
-- the leader in a cluster). Automatically handles BEGIN, COMMIT,
-- ROLLBACK, bookmark propagation, and retries on transient failures.
writeTransaction :: HasCallStack => Session -> (Connection -> IO a) -> IO a
writeTransaction :: forall a. HasCallStack => Session -> (Connection -> IO a) -> IO a
writeTransaction Session
session Connection -> IO a
action = Session -> AccessMode -> (Connection -> IO a) -> IO a
forall a.
HasCallStack =>
Session -> AccessMode -> (Connection -> IO a) -> IO a
managedTransaction Session
session AccessMode
WriteAccess Connection -> IO a
action


-- | Internal: run a managed transaction with the given access mode.
-- Re-acquires a connection on each retry so that routing errors (NotALeader)
-- result in fresh routing table lookups.
managedTransaction :: HasCallStack => Session -> AccessMode -> (Connection -> IO a) -> IO a
managedTransaction :: forall a.
HasCallStack =>
Session -> AccessMode -> (Connection -> IO a) -> IO a
managedTransaction Session{SessionPool
sPool :: Session -> SessionPool
sPool :: SessionPool
sPool, BookmarkManager
sBookmarks :: Session -> BookmarkManager
sBookmarks :: BookmarkManager
sBookmarks, SessionConfig
sConfig :: Session -> SessionConfig
sConfig :: SessionConfig
sConfig, RetryConfig
sRetry :: Session -> RetryConfig
sRetry :: RetryConfig
sRetry, IORef Bool
sTelemetrySent :: Session -> IORef Bool
sTelemetrySent :: IORef Bool
sTelemetrySent} AccessMode
mode Connection -> IO a
action =
  let maxR :: Int
maxR = RetryConfig -> Int
maxRetries RetryConfig
sRetry
      initD :: Int
initD = RetryConfig -> Int
initialDelay RetryConfig
sRetry
      maxD :: Int
maxD = RetryConfig -> Int
maxDelay RetryConfig
sRetry
  in Int -> Int -> Int -> IO a
go Int
maxR Int
initD Int
maxD
  where
    go :: Int -> Int -> Int -> IO a
go Int
0 Int
_ Int
_    = IO a
attempt
    go Int
n Int
delay Int
maxD = do
      result <- IO a -> IO (Either SomeException a)
forall e a. Exception e => IO a -> IO (Either e a)
try IO a
attempt
      case result of
        Right a
x -> a -> IO a
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
x
        Left (SomeException
e :: SomeException) -> case SomeException -> Maybe Error
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e :: Maybe Error of
          Just Error
err
            | Error -> Bool
isTransient Error
err -> do
                Int -> IO ()
threadDelay Int
delay
                Int -> Int -> Int -> IO a
go (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) (Int -> Int -> Int
forall a. Ord a => a -> a -> a
min Int
maxD (Int
delay Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
2)) Int
maxD
            | Error -> Bool
isRoutingError Error
err -> do
                SessionPool -> IO ()
invalidatePool SessionPool
sPool
                Int -> IO ()
threadDelay Int
delay
                Int -> Int -> Int -> IO a
go (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) (Int -> Int -> Int
forall a. Ord a => a -> a -> a
min Int
maxD (Int
delay Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
2)) Int
maxD
          Maybe Error
_ -> SomeException -> IO a
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO SomeException
e

    modeChar :: Char
modeChar = case AccessMode
mode of
      AccessMode
ReadAccess  -> Char
'r'
      AccessMode
WriteAccess -> Char
'w'

    attempt :: IO a
attempt =
      SessionPool -> AccessMode -> (Connection -> IO a) -> IO a
forall a. SessionPool -> AccessMode -> (Connection -> IO a) -> IO a
withSessionConnection SessionPool
sPool AccessMode
mode ((Connection -> IO a) -> IO a) -> (Connection -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \Connection
conn -> do
        -- Get current bookmarks and begin the transaction
        bms <- BookmarkManager -> IO [Text]
getBookmarks BookmarkManager
sBookmarks
        P.beginTx conn $ Begin (V.fromList bms) Nothing H.empty modeChar (database sConfig) Nothing
        -- Run the user's action, rollback on error
        result <- action conn `onException` P.tryRollback conn
        -- Commit and extract bookmark
        mbBookmark <- P.commitTx conn
        case mbBookmark of
          Just Text
bm -> BookmarkManager -> Text -> IO ()
updateBookmark BookmarkManager
sBookmarks Text
bm
          Maybe Text
Nothing -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
        -- Send telemetry after first successful managed transaction
        sent <- readIORef sTelemetrySent
        when (not sent) $ do
          writeIORef sTelemetrySent True
          P.sendTelemetry conn ManagedTransactions
        pure result

    invalidatePool :: SessionPool -> IO ()
invalidatePool (RoutedPool RoutingPool
rp) = RoutingPool -> IO ()
invalidateRoutingTable RoutingPool
rp
    invalidatePool (DirectPool BoltPool
_)  = () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()


-- | Acquire a connection based on pool type and access mode.
withSessionConnection :: SessionPool -> AccessMode -> (Connection -> IO a) -> IO a
withSessionConnection :: forall a. SessionPool -> AccessMode -> (Connection -> IO a) -> IO a
withSessionConnection (DirectPool BoltPool
pool) AccessMode
_mode Connection -> IO a
action =
  BoltPool -> (Connection -> IO a) -> IO a
forall a. HasCallStack => BoltPool -> (Connection -> IO a) -> IO a
withConnection BoltPool
pool Connection -> IO a
action
withSessionConnection (RoutedPool RoutingPool
rp) AccessMode
mode Connection -> IO a
action =
  RoutingPool -> AccessMode -> (Connection -> IO a) -> IO a
forall a.
HasCallStack =>
RoutingPool -> AccessMode -> (Connection -> IO a) -> IO a
withRoutingConnection RoutingPool
rp AccessMode
mode Connection -> IO a
action