module Database.Bolty.Connection
( queryIO
, queryPIO
, queryIO'
, queryPIO'
, queryWithFieldsIO
, queryPWithFieldsIO
, queryPMetaIO
, requestResponseRunIO
, requestResponsePullIO
) where
import Prelude
import qualified Database.Bolty.Connection.Pipe as P
import Database.Bolty.Connection.Type
import Database.Bolty.Logging
import Database.Bolty.Record
import Control.Exception (SomeException, throwIO, try)
import Data.Text (Text)
import Data.Word (Word64)
import Debug.Trace (traceEventIO)
import GHC.Clock (getMonotonicTimeNSec)
import GHC.Stack (HasCallStack)
import Database.Bolty.Message.Response
import Database.Bolty.Message.Request
import qualified Data.HashMap.Lazy as H
import qualified Data.Text as T
import qualified Data.Vector as V
import Data.PackStream.Ps (Ps, fromPs)
import Data.PackStream.Result (Result(..))
queryIO :: HasCallStack => Connection -> Text -> IO (V.Vector Record)
queryIO :: HasCallStack => Connection -> Text -> IO (Vector Record)
queryIO Connection
conn Text
cypher = SuccessPull -> Vector Record
records (SuccessPull -> Vector Record)
-> IO SuccessPull -> IO (Vector Record)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> HasCallStack =>
Connection -> Text -> HashMap Text Ps -> IO SuccessPull
Connection -> Text -> HashMap Text Ps -> IO SuccessPull
queryPIO' Connection
conn Text
cypher HashMap Text Ps
forall k v. HashMap k v
H.empty
queryPIO :: HasCallStack => Connection -> Text -> H.HashMap Text Ps -> IO (V.Vector Record)
queryPIO :: HasCallStack =>
Connection -> Text -> HashMap Text Ps -> IO (Vector Record)
queryPIO Connection
conn Text
cypher HashMap Text Ps
params = SuccessPull -> Vector Record
records (SuccessPull -> Vector Record)
-> IO SuccessPull -> IO (Vector Record)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> HasCallStack =>
Connection -> Text -> HashMap Text Ps -> IO SuccessPull
Connection -> Text -> HashMap Text Ps -> IO SuccessPull
queryPIO' Connection
conn Text
cypher HashMap Text Ps
params
queryIO' :: HasCallStack => Connection -> Text -> IO SuccessPull
queryIO' :: HasCallStack => Connection -> Text -> IO SuccessPull
queryIO' Connection
conn Text
cypher = HasCallStack =>
Connection -> Text -> HashMap Text Ps -> IO SuccessPull
Connection -> Text -> HashMap Text Ps -> IO SuccessPull
queryPIO' Connection
conn Text
cypher HashMap Text Ps
forall k v. HashMap k v
H.empty
queryPIO' :: HasCallStack => Connection -> Text -> H.HashMap Text Ps -> IO SuccessPull
queryPIO' :: HasCallStack =>
Connection -> Text -> HashMap Text Ps -> IO SuccessPull
queryPIO' Connection
conn Text
cypher HashMap Text Ps
params = do
(runResp, pullResp, clientNs) <- HasCallStack =>
Connection
-> Text -> HashMap Text Ps -> IO (SuccessRun, SuccessPull, Word64)
Connection
-> Text -> HashMap Text Ps -> IO (SuccessRun, SuccessPull, Word64)
timedQuery Connection
conn Text
cypher HashMap Text Ps
params
fireLogger conn cypher params runResp pullResp clientNs
fireNotifications conn pullResp
pure pullResp
requestResponseRunIO :: HasCallStack => Connection -> Text -> H.HashMap Text Ps -> IO SuccessRun
requestResponseRunIO :: HasCallStack =>
Connection -> Text -> HashMap Text Ps -> IO SuccessRun
requestResponseRunIO Connection
conn Text
cypher HashMap Text Ps
params = do
st <- Connection -> IO ServerState
forall (m :: * -> *). MonadIO m => Connection -> m ServerState
P.getState Connection
conn
P.requireStateIO conn [Ready, TXready] "RUN"
let request = case ServerState
st of
ServerState
TXready -> RunExplicitTransaction -> Request
RRunExplicitTransaction (RunExplicitTransaction -> Request)
-> RunExplicitTransaction -> Request
forall a b. (a -> b) -> a -> b
$ Text -> HashMap Text Ps -> RunExtra -> RunExplicitTransaction
RunExplicitTransaction Text
cypher HashMap Text Ps
params RunExtra
defaultRunExtra
ServerState
_ -> RunAutoCommitTransaction -> Request
RRunAutoCommitTransaction (RunAutoCommitTransaction -> Request)
-> RunAutoCommitTransaction -> Request
forall a b. (a -> b) -> a -> b
$ Text -> HashMap Text Ps -> RunAutoCommitTransaction
mkRunAutoCommit Text
cypher HashMap Text Ps
params
P.flushIO conn request
response <- P.fetchIO conn
case response of
RSuccess HashMap Text Ps
meta -> do
Connection -> ServerState -> IO ()
forall (m :: * -> *).
MonadIO m =>
Connection -> ServerState -> m ()
P.setState Connection
conn (ServerState -> IO ()) -> ServerState -> IO ()
forall a b. (a -> b) -> a -> b
$ case ServerState
st of
ServerState
TXready -> ServerState
TXstreaming
ServerState
_ -> ServerState
Streaming
case HashMap Text Ps -> Either Text SuccessRun
makeResponseRunAutoCommitTransaction HashMap Text Ps
meta of
Left Text
err -> Error -> IO SuccessRun
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO (Error -> IO SuccessRun) -> Error -> IO SuccessRun
forall a b. (a -> b) -> a -> b
$ Text -> Error
WrongMessageFormat Text
err
Right SuccessRun
v -> SuccessRun -> IO SuccessRun
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SuccessRun
v
Response
RIgnored -> do
Connection -> IO ()
forall (m :: * -> *).
(MonadIO m, HasCallStack) =>
Connection -> m ()
P.reset Connection
conn
Error -> IO SuccessRun
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO Error
ResponseErrorIgnored
RFailure Failure{Text
code :: Text
code :: Failure -> Text
code, Text
message :: Text
message :: Failure -> Text
message} -> do
Connection -> ServerState -> IO ()
forall (m :: * -> *).
MonadIO m =>
Connection -> ServerState -> m ()
P.setState Connection
conn ServerState
Failed
Connection -> IO ()
forall (m :: * -> *).
(MonadIO m, HasCallStack) =>
Connection -> m ()
P.reset Connection
conn
Error -> IO SuccessRun
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO (Error -> IO SuccessRun) -> Error -> IO SuccessRun
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Error
ResponseErrorFailure Text
code Text
message
RRecord Record
_ -> do
Connection -> IO ()
forall (m :: * -> *).
(MonadIO m, HasCallStack) =>
Connection -> m ()
P.reset Connection
conn
Error -> IO SuccessRun
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO Error
ResponseErrorRecords
requestResponsePullIO :: HasCallStack => Connection -> IO SuccessPull
requestResponsePullIO :: HasCallStack => Connection -> IO SuccessPull
requestResponsePullIO Connection
conn = do
HasCallStack => Connection -> [ServerState] -> Text -> IO ()
Connection -> [ServerState] -> Text -> IO ()
P.requireStateIO Connection
conn [ServerState
Streaming, ServerState
TXstreaming] Text
"PULL"
HasCallStack => Connection -> Request -> IO ()
Connection -> Request -> IO ()
P.flushIO Connection
conn (Request -> IO ()) -> Request -> IO ()
forall a b. (a -> b) -> a -> b
$ Pull -> Request
RPull Pull
defaultPull
Vector Record -> IO SuccessPull
go Vector Record
forall a. Vector a
V.empty
where
go :: Vector Record -> IO SuccessPull
go Vector Record
records = do
response <- HasCallStack => Connection -> IO Response
Connection -> IO Response
P.fetchIO Connection
conn
case response of
RSuccess HashMap Text Ps
meta -> do
let has_more :: Bool
has_more = case Text -> HashMap Text Ps -> Maybe Ps
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
H.lookup Text
"has_more" HashMap Text Ps
meta of
Just Ps
hm -> case Ps -> Result Bool
forall a. PackStream a => Ps -> Result a
fromPs Ps
hm of
Success Bool
True -> Bool
True
Result Bool
_ -> Bool
False
Maybe Ps
Nothing -> Bool
False
if Bool
has_more then do
HasCallStack => Connection -> Request -> IO ()
Connection -> Request -> IO ()
P.flushIO Connection
conn (Request -> IO ()) -> Request -> IO ()
forall a b. (a -> b) -> a -> b
$ Pull -> Request
RPull Pull
defaultPull
Vector Record -> IO SuccessPull
go Vector Record
records
else do
st <- Connection -> IO ServerState
forall (m :: * -> *). MonadIO m => Connection -> m ServerState
P.getState Connection
conn
P.setState conn $ case st of
ServerState
TXstreaming -> ServerState
TXready
ServerState
_ -> ServerState
Ready
case makeSuccessPull records meta of
Left Text
err -> Error -> IO SuccessPull
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO (Error -> IO SuccessPull) -> Error -> IO SuccessPull
forall a b. (a -> b) -> a -> b
$ Text -> Error
WrongMessageFormat Text
err
Right SuccessPull
v -> SuccessPull -> IO SuccessPull
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SuccessPull
v
Response
RIgnored -> do
Connection -> IO ()
forall (m :: * -> *).
(MonadIO m, HasCallStack) =>
Connection -> m ()
P.reset Connection
conn
Error -> IO SuccessPull
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO Error
ResponseErrorIgnored
RFailure Failure{Text
code :: Failure -> Text
code :: Text
code, Text
message :: Failure -> Text
message :: Text
message} -> do
Connection -> ServerState -> IO ()
forall (m :: * -> *).
MonadIO m =>
Connection -> ServerState -> m ()
P.setState Connection
conn ServerState
Failed
Connection -> IO ()
forall (m :: * -> *).
(MonadIO m, HasCallStack) =>
Connection -> m ()
P.reset Connection
conn
Error -> IO SuccessPull
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO (Error -> IO SuccessPull) -> Error -> IO SuccessPull
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Error
ResponseErrorFailure Text
code Text
message
RRecord Record
record -> Vector Record -> IO SuccessPull
go (Vector Record -> IO SuccessPull)
-> Vector Record -> IO SuccessPull
forall a b. (a -> b) -> a -> b
$ Vector Record -> Record -> Vector Record
forall a. Vector a -> a -> Vector a
V.snoc Vector Record
records Record
record
queryPWithFieldsIO :: HasCallStack => Connection -> T.Text -> H.HashMap T.Text Ps -> IO (V.Vector T.Text, V.Vector Record)
queryPWithFieldsIO :: HasCallStack =>
Connection
-> Text -> HashMap Text Ps -> IO (Vector Text, Vector Record)
queryPWithFieldsIO Connection
conn Text
cypher HashMap Text Ps
params = do
(runResp, pullResp, clientNs) <- HasCallStack =>
Connection
-> Text -> HashMap Text Ps -> IO (SuccessRun, SuccessPull, Word64)
Connection
-> Text -> HashMap Text Ps -> IO (SuccessRun, SuccessPull, Word64)
timedQuery Connection
conn Text
cypher HashMap Text Ps
params
fireLogger conn cypher params runResp pullResp clientNs
fireNotifications conn pullResp
pure (successFields runResp, records pullResp)
queryWithFieldsIO :: HasCallStack => Connection -> T.Text -> IO (V.Vector T.Text, V.Vector Record)
queryWithFieldsIO :: HasCallStack =>
Connection -> Text -> IO (Vector Text, Vector Record)
queryWithFieldsIO Connection
conn Text
cypher = HasCallStack =>
Connection
-> Text -> HashMap Text Ps -> IO (Vector Text, Vector Record)
Connection
-> Text -> HashMap Text Ps -> IO (Vector Text, Vector Record)
queryPWithFieldsIO Connection
conn Text
cypher HashMap Text Ps
forall k v. HashMap k v
H.empty
queryPMetaIO :: HasCallStack => Connection -> T.Text -> H.HashMap T.Text Ps -> IO (V.Vector T.Text, V.Vector Record, QueryMeta)
queryPMetaIO :: HasCallStack =>
Connection
-> Text
-> HashMap Text Ps
-> IO (Vector Text, Vector Record, QueryMeta)
queryPMetaIO Connection
conn Text
cypher HashMap Text Ps
params = do
(runResp, pullResp, clientNs) <- HasCallStack =>
Connection
-> Text -> HashMap Text Ps -> IO (SuccessRun, SuccessPull, Word64)
Connection
-> Text -> HashMap Text Ps -> IO (SuccessRun, SuccessPull, Word64)
timedQuery Connection
conn Text
cypher HashMap Text Ps
params
fireLogger conn cypher params runResp pullResp clientNs
fireNotifications conn pullResp
pure (successFields runResp, records pullResp, infos pullResp)
timedQuery :: HasCallStack => Connection -> T.Text -> H.HashMap T.Text Ps -> IO (SuccessRun, SuccessPull, Word64)
timedQuery :: HasCallStack =>
Connection
-> Text -> HashMap Text Ps -> IO (SuccessRun, SuccessPull, Word64)
timedQuery Connection
conn Text
cypher HashMap Text Ps
params = do
String -> IO ()
traceEventIO String
"START boltWireIO"
t0 <- IO Word64
getMonotonicTimeNSec
st <- P.getState conn
P.requireStateIO conn [Ready, TXready] "RUN"
let runRequest = case ServerState
st of
ServerState
TXready -> RunExplicitTransaction -> Request
RRunExplicitTransaction (RunExplicitTransaction -> Request)
-> RunExplicitTransaction -> Request
forall a b. (a -> b) -> a -> b
$ Text -> HashMap Text Ps -> RunExtra -> RunExplicitTransaction
RunExplicitTransaction Text
cypher HashMap Text Ps
params RunExtra
defaultRunExtra
ServerState
_ -> RunAutoCommitTransaction -> Request
RRunAutoCommitTransaction (RunAutoCommitTransaction -> Request)
-> RunAutoCommitTransaction -> Request
forall a b. (a -> b) -> a -> b
$ Text -> HashMap Text Ps -> RunAutoCommitTransaction
mkRunAutoCommit Text
cypher HashMap Text Ps
params
P.flushIO conn runRequest
P.flushIO conn (RPull defaultPull)
runResult <- try @SomeException $ do
runResponse <- P.fetchIO conn
case runResponse of
RSuccess HashMap Text Ps
meta -> do
Connection -> ServerState -> IO ()
forall (m :: * -> *).
MonadIO m =>
Connection -> ServerState -> m ()
P.setState Connection
conn (ServerState -> IO ()) -> ServerState -> IO ()
forall a b. (a -> b) -> a -> b
$ case ServerState
st of
ServerState
TXready -> ServerState
TXstreaming
ServerState
_ -> ServerState
Streaming
case HashMap Text Ps -> Either Text SuccessRun
makeResponseRunAutoCommitTransaction HashMap Text Ps
meta of
Left Text
err -> Error -> IO SuccessRun
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO (Error -> IO SuccessRun) -> Error -> IO SuccessRun
forall a b. (a -> b) -> a -> b
$ Text -> Error
WrongMessageFormat Text
err
Right SuccessRun
v -> SuccessRun -> IO SuccessRun
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SuccessRun
v
RFailure Failure{Text
code :: Failure -> Text
code :: Text
code, Text
message :: Failure -> Text
message :: Text
message} -> do
Connection -> ServerState -> IO ()
forall (m :: * -> *).
MonadIO m =>
Connection -> ServerState -> m ()
P.setState Connection
conn ServerState
Failed
Error -> IO SuccessRun
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO (Error -> IO SuccessRun) -> Error -> IO SuccessRun
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Error
ResponseErrorFailure Text
code Text
message
Response
RIgnored ->
Error -> IO SuccessRun
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO Error
ResponseErrorIgnored
RRecord Record
_ ->
Error -> IO SuccessRun
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO Error
ResponseErrorRecords
case runResult of
Left SomeException
err -> do
_ <- HasCallStack => Connection -> IO Response
Connection -> IO Response
P.fetchIO Connection
conn
P.reset conn
throwIO err
Right SuccessRun
runResp -> do
pullResp <- HasCallStack => Connection -> Vector Record -> IO SuccessPull
Connection -> Vector Record -> IO SuccessPull
pullRecords Connection
conn Vector Record
forall a. Vector a
V.empty
t1 <- getMonotonicTimeNSec
traceEventIO "STOP boltWireIO"
pure (runResp, pullResp, t1 - t0)
pullRecords :: HasCallStack => Connection -> V.Vector Record -> IO SuccessPull
pullRecords :: HasCallStack => Connection -> Vector Record -> IO SuccessPull
pullRecords Connection
conn Vector Record
recs = do
response <- HasCallStack => Connection -> IO Response
Connection -> IO Response
P.fetchIO Connection
conn
case response of
RSuccess HashMap Text Ps
meta -> do
let has_more :: Bool
has_more = case Text -> HashMap Text Ps -> Maybe Ps
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
H.lookup Text
"has_more" HashMap Text Ps
meta of
Just Ps
hm -> case Ps -> Result Bool
forall a. PackStream a => Ps -> Result a
fromPs Ps
hm of
Success Bool
True -> Bool
True
Result Bool
_ -> Bool
False
Maybe Ps
Nothing -> Bool
False
if Bool
has_more then do
HasCallStack => Connection -> Request -> IO ()
Connection -> Request -> IO ()
P.flushIO Connection
conn (Request -> IO ()) -> Request -> IO ()
forall a b. (a -> b) -> a -> b
$ Pull -> Request
RPull Pull
defaultPull
HasCallStack => Connection -> Vector Record -> IO SuccessPull
Connection -> Vector Record -> IO SuccessPull
pullRecords Connection
conn Vector Record
recs
else do
st <- Connection -> IO ServerState
forall (m :: * -> *). MonadIO m => Connection -> m ServerState
P.getState Connection
conn
P.setState conn $ case st of
ServerState
TXstreaming -> ServerState
TXready
ServerState
_ -> ServerState
Ready
case makeSuccessPull recs meta of
Left Text
err -> Error -> IO SuccessPull
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO (Error -> IO SuccessPull) -> Error -> IO SuccessPull
forall a b. (a -> b) -> a -> b
$ Text -> Error
WrongMessageFormat Text
err
Right SuccessPull
v -> SuccessPull -> IO SuccessPull
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SuccessPull
v
Response
RIgnored -> do
Connection -> IO ()
forall (m :: * -> *).
(MonadIO m, HasCallStack) =>
Connection -> m ()
P.reset Connection
conn
Error -> IO SuccessPull
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO Error
ResponseErrorIgnored
RFailure Failure{Text
code :: Failure -> Text
code :: Text
code, Text
message :: Failure -> Text
message :: Text
message} -> do
Connection -> ServerState -> IO ()
forall (m :: * -> *).
MonadIO m =>
Connection -> ServerState -> m ()
P.setState Connection
conn ServerState
Failed
Connection -> IO ()
forall (m :: * -> *).
(MonadIO m, HasCallStack) =>
Connection -> m ()
P.reset Connection
conn
Error -> IO SuccessPull
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO (Error -> IO SuccessPull) -> Error -> IO SuccessPull
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Error
ResponseErrorFailure Text
code Text
message
RRecord Record
record -> HasCallStack => Connection -> Vector Record -> IO SuccessPull
Connection -> Vector Record -> IO SuccessPull
pullRecords Connection
conn (Vector Record -> IO SuccessPull)
-> Vector Record -> IO SuccessPull
forall a b. (a -> b) -> a -> b
$ Vector Record -> Record -> Vector Record
forall a. Vector a -> a -> Vector a
V.snoc Vector Record
recs Record
record
fireLogger :: Connection -> T.Text -> H.HashMap T.Text Ps -> SuccessRun -> SuccessPull -> Word64 -> IO ()
fireLogger :: Connection
-> Text
-> HashMap Text Ps
-> SuccessRun
-> SuccessPull
-> Word64
-> IO ()
fireLogger Connection{Maybe (QueryLog -> QueryMeta -> IO ())
queryLogger :: Maybe (QueryLog -> QueryMeta -> IO ())
queryLogger :: Connection -> Maybe (QueryLog -> QueryMeta -> IO ())
queryLogger} Text
cypher HashMap Text Ps
params SuccessRun
runResp SuccessPull{Vector Record
records :: SuccessPull -> Vector Record
records :: Vector Record
records, QueryMeta
infos :: SuccessPull -> QueryMeta
infos :: QueryMeta
infos} Word64
clientNs =
case Maybe (QueryLog -> QueryMeta -> IO ())
queryLogger of
Maybe (QueryLog -> QueryMeta -> IO ())
Nothing -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Just QueryLog -> QueryMeta -> IO ()
logger -> QueryLog -> QueryMeta -> IO ()
logger QueryLog
{ qlCypher :: Text
qlCypher = Text
cypher
, qlParameters :: HashMap Text Ps
qlParameters = HashMap Text Ps
params
, qlRowCount :: Int
qlRowCount = Vector Record -> Int
forall a. Vector a -> Int
V.length Vector Record
records
, qlServerFirst :: Int64
qlServerFirst = SuccessRun -> Int64
successTFirst SuccessRun
runResp
, qlServerLast :: Int64
qlServerLast = QueryMeta -> Int64
t_last QueryMeta
infos
, qlClientTime :: Double
qlClientTime = Word64 -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word64
clientNs Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ Double
1_000_000
} QueryMeta
infos
fireNotifications :: Connection -> SuccessPull -> IO ()
fireNotifications :: Connection -> SuccessPull -> IO ()
fireNotifications Connection{Maybe (Notification -> IO ())
notificationHandler :: Maybe (Notification -> IO ())
notificationHandler :: Connection -> Maybe (Notification -> IO ())
notificationHandler} SuccessPull{infos :: SuccessPull -> QueryMeta
infos = QueryMeta{Vector Notification
parsedNotifications :: Vector Notification
parsedNotifications :: QueryMeta -> Vector Notification
parsedNotifications}} =
case Maybe (Notification -> IO ())
notificationHandler of
Maybe (Notification -> IO ())
Nothing -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Just Notification -> IO ()
handler -> (Notification -> IO ()) -> Vector Notification -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> Vector a -> m ()
V.mapM_ Notification -> IO ()
handler Vector Notification
parsedNotifications