{-# LANGUAGE BlockArguments #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE TupleSections #-}
module Database.DuckDB.Simple (
Connection,
open,
openWithConfig,
close,
withConnection,
withConnectionWithConfig,
Query (..),
Statement,
openStatement,
closeStatement,
withStatement,
clearStatementBindings,
namedParameterIndex,
columnCount,
columnName,
executeStatement,
execute,
executeMany,
execute_,
bind,
bindNamed,
executeNamed,
queryNamed,
fold,
fold_,
foldNamed,
withTransaction,
query,
queryWith,
query_,
queryWith_,
nextRow,
nextRowWith,
SQLError (..),
FormatError (..),
ResultError (..),
FieldParser,
FromField (..),
FromRow (..),
RowParser,
field,
fieldWith,
numFieldsRemaining,
ToField (..),
ToRow (..),
FieldBinding,
NamedParam (..),
DuckDBColumnType (..),
duckdbColumnType,
Null (..),
Only (..),
(:.) (..),
Function (..),
createFunction,
createFunctionWithState,
deleteFunction,
) where
import Control.Exception (SomeException, bracket, finally, mask, onException, throwIO, try)
import Control.Monad (forM, forM_, join, void, when, zipWithM, zipWithM_)
import Data.IORef (IORef, atomicModifyIORef', mkWeakIORef, newIORef, readIORef, writeIORef)
import Data.Maybe (isJust, isNothing)
import Data.Text (Text)
import qualified Data.Text as Text
import qualified Data.Text.Foreign as TextForeign
import Database.DuckDB.FFI
import Database.DuckDB.Simple.FromField (
Field (..),
FieldParser,
FromField (..),
ResultError (..),
)
import Database.DuckDB.Simple.FromRow (
FromRow (..),
RowParser,
field,
fieldWith,
numFieldsRemaining,
parseRow,
rowErrorsToSqlError,
)
import Database.DuckDB.Simple.Function (Function, createFunction, createFunctionWithState, deleteFunction)
import Database.DuckDB.Simple.Internal (
Connection (..),
ConnectionState (..),
Query (..),
SQLError (..),
Statement (..),
StatementState (..),
StatementStream (..),
StatementStreamChunk (..),
StatementStreamChunkVector (..),
StatementStreamColumn (..),
StatementStreamState (..),
withConnectionHandle,
withQueryCString,
withStatementHandle,
)
import Database.DuckDB.Simple.Materialize (
materializeValue,
)
import Database.DuckDB.Simple.Ok (Ok (..))
import Database.DuckDB.Simple.ToField (DuckDBColumnType (..), FieldBinding, NamedParam (..), ToField (..), bindFieldBinding, duckdbColumnType, renderFieldBinding)
import Database.DuckDB.Simple.ToRow (ToRow (..))
import Database.DuckDB.Simple.Types (FormatError (..), Null (..), Only (..), (:.) (..))
import Foreign.C.String (CString, peekCString, withCString)
import Foreign.Marshal.Alloc (alloca, free, malloc)
import Foreign.Ptr (Ptr, castPtr, nullPtr)
import Foreign.Storable (peek, poke)
open :: FilePath -> IO Connection
open :: String -> IO Connection
open String
path = String -> [(Text, Text)] -> IO Connection
openWithConfig String
path []
openWithConfig :: FilePath -> [(Text, Text)] -> IO Connection
openWithConfig :: String -> [(Text, Text)] -> IO Connection
openWithConfig String
path [(Text, Text)]
settings =
((forall a. IO a -> IO a) -> IO Connection) -> IO Connection
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask \forall a. IO a -> IO a
restore -> do
db <- IO DuckDBDatabase -> IO DuckDBDatabase
forall a. IO a -> IO a
restore (String -> [(Text, Text)] -> IO DuckDBDatabase
openDatabaseWithConfig String
path [(Text, Text)]
settings)
conn <-
restore (connectDatabase db)
`onException` closeDatabaseHandle db
createConnection db conn
`onException` do
closeConnectionHandle conn
closeDatabaseHandle db
close :: Connection -> IO ()
close :: Connection -> IO ()
close Connection{IORef ConnectionState
connectionState :: IORef ConnectionState
connectionState :: Connection -> IORef ConnectionState
connectionState} =
IO (IO ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (IO ()) -> IO ()) -> IO (IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$
IORef ConnectionState
-> (ConnectionState -> (ConnectionState, IO ())) -> IO (IO ())
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef ConnectionState
connectionState \case
ConnectionState
ConnectionClosed -> (ConnectionState
ConnectionClosed, () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
openState :: ConnectionState
openState@(ConnectionOpen{}) ->
(ConnectionState
ConnectionClosed, ConnectionState -> IO ()
closeHandles ConnectionState
openState)
withConnection :: FilePath -> (Connection -> IO a) -> IO a
withConnection :: forall a. String -> (Connection -> IO a) -> IO a
withConnection String
path = IO Connection
-> (Connection -> IO ()) -> (Connection -> IO a) -> IO a
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (String -> IO Connection
open String
path) Connection -> IO ()
close
withConnectionWithConfig :: FilePath -> [(Text, Text)] -> (Connection -> IO a) -> IO a
withConnectionWithConfig :: forall a. String -> [(Text, Text)] -> (Connection -> IO a) -> IO a
withConnectionWithConfig String
path [(Text, Text)]
settings = IO Connection
-> (Connection -> IO ()) -> (Connection -> IO a) -> IO a
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (String -> [(Text, Text)] -> IO Connection
openWithConfig String
path [(Text, Text)]
settings) Connection -> IO ()
close
openStatement :: Connection -> Query -> IO Statement
openStatement :: Connection -> Query -> IO Statement
openStatement Connection
conn Query
queryText =
((forall a. IO a -> IO a) -> IO Statement) -> IO Statement
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask \forall a. IO a -> IO a
restore -> do
handle <-
IO DuckDBPreparedStatement -> IO DuckDBPreparedStatement
forall a. IO a -> IO a
restore (IO DuckDBPreparedStatement -> IO DuckDBPreparedStatement)
-> IO DuckDBPreparedStatement -> IO DuckDBPreparedStatement
forall a b. (a -> b) -> a -> b
$
Connection
-> (DuckDBConnection -> IO DuckDBPreparedStatement)
-> IO DuckDBPreparedStatement
forall a. Connection -> (DuckDBConnection -> IO a) -> IO a
withConnectionHandle Connection
conn \DuckDBConnection
connPtr ->
Query
-> (CString -> IO DuckDBPreparedStatement)
-> IO DuckDBPreparedStatement
forall a. Query -> (CString -> IO a) -> IO a
withQueryCString Query
queryText \CString
sql ->
(Ptr DuckDBPreparedStatement -> IO DuckDBPreparedStatement)
-> IO DuckDBPreparedStatement
forall a b. Storable a => (Ptr a -> IO b) -> IO b
alloca \Ptr DuckDBPreparedStatement
stmtPtr -> do
rc <- DuckDBConnection
-> CString -> Ptr DuckDBPreparedStatement -> IO DuckDBState
c_duckdb_prepare DuckDBConnection
connPtr CString
sql Ptr DuckDBPreparedStatement
stmtPtr
stmt <- peek stmtPtr
if rc == DuckDBSuccess
then pure stmt
else do
errMsg <- fetchPrepareError stmt
c_duckdb_destroy_prepare stmtPtr
throwIO $ mkPrepareError queryText errMsg
createStatement conn handle queryText
`onException` destroyPrepared handle
closeStatement :: Statement -> IO ()
closeStatement :: Statement -> IO ()
closeStatement stmt :: Statement
stmt@Statement{IORef StatementState
statementState :: IORef StatementState
statementState :: Statement -> IORef StatementState
statementState} = do
Statement -> IO ()
resetStatementStream Statement
stmt
IO (IO ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (IO ()) -> IO ()) -> IO (IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$
IORef StatementState
-> (StatementState -> (StatementState, IO ())) -> IO (IO ())
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef StatementState
statementState \case
StatementState
StatementClosed -> (StatementState
StatementClosed, () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
StatementOpen{DuckDBPreparedStatement
statementHandle :: DuckDBPreparedStatement
statementHandle :: StatementState -> DuckDBPreparedStatement
statementHandle} ->
(StatementState
StatementClosed, DuckDBPreparedStatement -> IO ()
destroyPrepared DuckDBPreparedStatement
statementHandle)
withStatement :: Connection -> Query -> (Statement -> IO a) -> IO a
withStatement :: forall a. Connection -> Query -> (Statement -> IO a) -> IO a
withStatement Connection
conn Query
sql = IO Statement -> (Statement -> IO ()) -> (Statement -> IO a) -> IO a
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (Connection -> Query -> IO Statement
openStatement Connection
conn Query
sql) Statement -> IO ()
closeStatement
bind :: Statement -> [FieldBinding] -> IO ()
bind :: Statement -> [FieldBinding] -> IO ()
bind Statement
stmt [FieldBinding]
fields = do
Statement -> IO ()
resetStatementStream Statement
stmt
Statement -> (DuckDBPreparedStatement -> IO ()) -> IO ()
forall a. Statement -> (DuckDBPreparedStatement -> IO a) -> IO a
withStatementHandle Statement
stmt \DuckDBPreparedStatement
handle -> do
let actual :: Int
actual = [FieldBinding] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [FieldBinding]
fields
expected <- (DuckDBIdx -> Int) -> IO DuckDBIdx -> IO Int
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap DuckDBIdx -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (DuckDBPreparedStatement -> IO DuckDBIdx
c_duckdb_nparams DuckDBPreparedStatement
handle)
when (actual /= expected) $
throwFormatErrorBindings stmt (parameterCountMessage expected actual) fields
parameterNames <- fetchParameterNames handle expected
when (any isJust parameterNames) $
throwFormatErrorBindings stmt (Text.pack "duckdb-simple: statement defines named parameters; use executeNamed or bindNamed") fields
Statement -> IO ()
clearStatementBindings Statement
stmt
(Int -> FieldBinding -> IO ()) -> [Int] -> [FieldBinding] -> IO ()
forall (m :: * -> *) a b c.
Applicative m =>
(a -> b -> m c) -> [a] -> [b] -> m ()
zipWithM_ Int -> FieldBinding -> IO ()
apply [Int
1 ..] [FieldBinding]
fields
where
parameterCountMessage :: a -> a -> Text
parameterCountMessage a
expected a
actual =
String -> Text
Text.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$
String
"duckdb-simple: SQL query contains "
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> a -> String
forall a. Show a => a -> String
show a
expected
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
" parameter(s), but "
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> a -> String
forall a. Show a => a -> String
show a
actual
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
" argument(s) were supplied"
apply :: Int -> FieldBinding -> IO ()
apply :: Int -> FieldBinding -> IO ()
apply Int
idx = Statement -> DuckDBIdx -> FieldBinding -> IO ()
bindFieldBinding Statement
stmt (Int -> DuckDBIdx
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
idx :: DuckDBIdx)
bindNamed :: Statement -> [NamedParam] -> IO ()
bindNamed :: Statement -> [NamedParam] -> IO ()
bindNamed Statement
stmt [NamedParam]
params =
let bindings :: [(Text, FieldBinding)]
bindings = (NamedParam -> (Text, FieldBinding))
-> [NamedParam] -> [(Text, FieldBinding)]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (\(Text
name := a
value) -> (Text
name, a -> FieldBinding
forall a. ToField a => a -> FieldBinding
toField a
value)) [NamedParam]
params
parameterCountMessage :: a -> a -> Text
parameterCountMessage a
expected a
actual =
String -> Text
Text.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$
String
"duckdb-simple: SQL query contains "
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> a -> String
forall a. Show a => a -> String
show a
expected
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
" named parameter(s), but "
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> a -> String
forall a. Show a => a -> String
show a
actual
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
" argument(s) were supplied"
unknownNameMessage :: Text -> Text
unknownNameMessage Text
name =
[Text] -> Text
Text.concat
[ String -> Text
Text.pack String
"duckdb-simple: unknown named parameter "
, Text
name
]
apply :: (Text, FieldBinding) -> IO ()
apply (Text
name, FieldBinding
binding) = do
mIdx <- Statement -> Text -> IO (Maybe Int)
namedParameterIndex Statement
stmt Text
name
case mIdx of
Maybe Int
Nothing ->
Statement -> Text -> [(Text, FieldBinding)] -> IO ()
forall a. Statement -> Text -> [(Text, FieldBinding)] -> IO a
throwFormatErrorNamed Statement
stmt (Text -> Text
unknownNameMessage Text
name) [(Text, FieldBinding)]
bindings
Just Int
idx -> Statement -> DuckDBIdx -> FieldBinding -> IO ()
bindFieldBinding Statement
stmt (Int -> DuckDBIdx
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
idx :: DuckDBIdx) FieldBinding
binding
in do
Statement -> IO ()
resetStatementStream Statement
stmt
Statement -> (DuckDBPreparedStatement -> IO ()) -> IO ()
forall a. Statement -> (DuckDBPreparedStatement -> IO a) -> IO a
withStatementHandle Statement
stmt \DuckDBPreparedStatement
handle -> do
let actual :: Int
actual = [(Text, FieldBinding)] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [(Text, FieldBinding)]
bindings
expected <- (DuckDBIdx -> Int) -> IO DuckDBIdx -> IO Int
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap DuckDBIdx -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (DuckDBPreparedStatement -> IO DuckDBIdx
c_duckdb_nparams DuckDBPreparedStatement
handle)
when (actual /= expected) $
throwFormatErrorNamed stmt (parameterCountMessage expected actual) bindings
parameterNames <- fetchParameterNames handle expected
when (all isNothing parameterNames && expected > 0) $
throwFormatErrorNamed stmt (Text.pack "duckdb-simple: statement does not define named parameters; use positional bindings or adjust the SQL") bindings
Statement -> IO ()
clearStatementBindings Statement
stmt
((Text, FieldBinding) -> IO ()) -> [(Text, FieldBinding)] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Text, FieldBinding) -> IO ()
apply [(Text, FieldBinding)]
bindings
fetchParameterNames :: DuckDBPreparedStatement -> Int -> IO [Maybe Text]
fetchParameterNames :: DuckDBPreparedStatement -> Int -> IO [Maybe Text]
fetchParameterNames DuckDBPreparedStatement
handle Int
count =
[Int] -> (Int -> IO (Maybe Text)) -> IO [Maybe Text]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [Int
1 .. Int
count] \Int
idx -> do
namePtr <- DuckDBPreparedStatement -> DuckDBIdx -> IO CString
c_duckdb_parameter_name DuckDBPreparedStatement
handle (Int -> DuckDBIdx
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
idx)
if namePtr == nullPtr
then pure Nothing
else do
name <- Text.pack <$> peekCString namePtr
c_duckdb_free (castPtr namePtr)
let normalized = Text -> Text
normalizeName Text
name
if normalized == Text.pack (show idx)
then pure Nothing
else pure (Just name)
clearStatementBindings :: Statement -> IO ()
clearStatementBindings :: Statement -> IO ()
clearStatementBindings Statement
stmt =
Statement -> (DuckDBPreparedStatement -> IO ()) -> IO ()
forall a. Statement -> (DuckDBPreparedStatement -> IO a) -> IO a
withStatementHandle Statement
stmt \DuckDBPreparedStatement
handle -> do
rc <- DuckDBPreparedStatement -> IO DuckDBState
c_duckdb_clear_bindings DuckDBPreparedStatement
handle
when (rc /= DuckDBSuccess) $ do
err <- fetchPrepareError handle
throwIO $ mkPrepareError (statementQuery stmt) err
namedParameterIndex :: Statement -> Text -> IO (Maybe Int)
namedParameterIndex :: Statement -> Text -> IO (Maybe Int)
namedParameterIndex Statement
stmt Text
name =
Statement
-> (DuckDBPreparedStatement -> IO (Maybe Int)) -> IO (Maybe Int)
forall a. Statement -> (DuckDBPreparedStatement -> IO a) -> IO a
withStatementHandle Statement
stmt \DuckDBPreparedStatement
handle ->
let normalized :: Text
normalized = Text -> Text
normalizeName Text
name
in Text -> (CString -> IO (Maybe Int)) -> IO (Maybe Int)
forall a. Text -> (CString -> IO a) -> IO a
TextForeign.withCString Text
normalized \CString
cName ->
(Ptr DuckDBIdx -> IO (Maybe Int)) -> IO (Maybe Int)
forall a b. Storable a => (Ptr a -> IO b) -> IO b
alloca \Ptr DuckDBIdx
idxPtr -> do
rc <- DuckDBPreparedStatement
-> Ptr DuckDBIdx -> CString -> IO DuckDBState
c_duckdb_bind_parameter_index DuckDBPreparedStatement
handle Ptr DuckDBIdx
idxPtr CString
cName
if rc == DuckDBSuccess
then do
idx <- peek idxPtr
if idx == 0
then pure Nothing
else pure (Just (fromIntegral idx))
else pure Nothing
columnCount :: Statement -> IO Int
columnCount :: Statement -> IO Int
columnCount Statement
stmt =
Statement -> (DuckDBPreparedStatement -> IO Int) -> IO Int
forall a. Statement -> (DuckDBPreparedStatement -> IO a) -> IO a
withStatementHandle Statement
stmt ((DuckDBPreparedStatement -> IO Int) -> IO Int)
-> (DuckDBPreparedStatement -> IO Int) -> IO Int
forall a b. (a -> b) -> a -> b
$ \DuckDBPreparedStatement
handle ->
(DuckDBIdx -> Int) -> IO DuckDBIdx -> IO Int
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap DuckDBIdx -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (DuckDBPreparedStatement -> IO DuckDBIdx
c_duckdb_prepared_statement_column_count DuckDBPreparedStatement
handle)
columnName :: Statement -> Int -> IO Text
columnName :: Statement -> Int -> IO Text
columnName Statement
stmt Int
columnIndex
| Int
columnIndex Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
0 = SQLError -> IO Text
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO (Statement -> Int -> Maybe Int -> SQLError
columnIndexError Statement
stmt Int
columnIndex Maybe Int
forall a. Maybe a
Nothing)
| Bool
otherwise =
Statement -> (DuckDBPreparedStatement -> IO Text) -> IO Text
forall a. Statement -> (DuckDBPreparedStatement -> IO a) -> IO a
withStatementHandle Statement
stmt \DuckDBPreparedStatement
handle -> do
total <- (DuckDBIdx -> Int) -> IO DuckDBIdx -> IO Int
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap DuckDBIdx -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (DuckDBPreparedStatement -> IO DuckDBIdx
c_duckdb_prepared_statement_column_count DuckDBPreparedStatement
handle)
when (columnIndex >= total) $
throwIO (columnIndexError stmt columnIndex (Just total))
namePtr <- c_duckdb_prepared_statement_column_name handle (fromIntegral columnIndex)
if namePtr == nullPtr
then throwIO (columnNameUnavailableError stmt columnIndex)
else do
name <- Text.pack <$> peekCString namePtr
c_duckdb_free (castPtr namePtr)
pure name
executeStatement :: Statement -> IO Int
executeStatement :: Statement -> IO Int
executeStatement Statement
stmt =
Statement -> (DuckDBPreparedStatement -> IO Int) -> IO Int
forall a. Statement -> (DuckDBPreparedStatement -> IO a) -> IO a
withStatementHandle Statement
stmt \DuckDBPreparedStatement
handle -> do
Statement -> IO ()
resetStatementStream Statement
stmt
(Ptr DuckDBResult -> IO Int) -> IO Int
forall a b. Storable a => (Ptr a -> IO b) -> IO b
alloca \Ptr DuckDBResult
resPtr -> do
rc <- DuckDBPreparedStatement -> Ptr DuckDBResult -> IO DuckDBState
c_duckdb_execute_prepared DuckDBPreparedStatement
handle Ptr DuckDBResult
resPtr
if rc == DuckDBSuccess
then do
changed <- resultRowsChanged resPtr
c_duckdb_destroy_result resPtr
pure changed
else do
(errMsg, _) <- fetchResultError resPtr
c_duckdb_destroy_result resPtr
throwIO $ mkPrepareError (statementQuery stmt) errMsg
execute :: (ToRow q) => Connection -> Query -> q -> IO Int
execute :: forall q. ToRow q => Connection -> Query -> q -> IO Int
execute Connection
conn Query
queryText q
params =
Connection -> Query -> (Statement -> IO Int) -> IO Int
forall a. Connection -> Query -> (Statement -> IO a) -> IO a
withStatement Connection
conn Query
queryText \Statement
stmt -> do
Statement -> [FieldBinding] -> IO ()
bind Statement
stmt (q -> [FieldBinding]
forall a. ToRow a => a -> [FieldBinding]
toRow q
params)
Statement -> IO Int
executeStatement Statement
stmt
executeMany :: (ToRow q) => Connection -> Query -> [q] -> IO Int
executeMany :: forall q. ToRow q => Connection -> Query -> [q] -> IO Int
executeMany Connection
conn Query
queryText [q]
rows =
Connection -> Query -> (Statement -> IO Int) -> IO Int
forall a. Connection -> Query -> (Statement -> IO a) -> IO a
withStatement Connection
conn Query
queryText \Statement
stmt -> do
[Int] -> Int
forall a. Num a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum ([Int] -> Int) -> IO [Int] -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (q -> IO Int) -> [q] -> IO [Int]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (\q
row -> Statement -> [FieldBinding] -> IO ()
bind Statement
stmt (q -> [FieldBinding]
forall a. ToRow a => a -> [FieldBinding]
toRow q
row) IO () -> IO Int -> IO Int
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Statement -> IO Int
executeStatement Statement
stmt) [q]
rows
execute_ :: Connection -> Query -> IO Int
execute_ :: Connection -> Query -> IO Int
execute_ Connection
conn Query
queryText =
Connection -> (DuckDBConnection -> IO Int) -> IO Int
forall a. Connection -> (DuckDBConnection -> IO a) -> IO a
withConnectionHandle Connection
conn \DuckDBConnection
connPtr ->
Query -> (CString -> IO Int) -> IO Int
forall a. Query -> (CString -> IO a) -> IO a
withQueryCString Query
queryText \CString
sql ->
(Ptr DuckDBResult -> IO Int) -> IO Int
forall a b. Storable a => (Ptr a -> IO b) -> IO b
alloca \Ptr DuckDBResult
resPtr -> do
rc <- DuckDBConnection -> CString -> Ptr DuckDBResult -> IO DuckDBState
c_duckdb_query DuckDBConnection
connPtr CString
sql Ptr DuckDBResult
resPtr
if rc == DuckDBSuccess
then do
changed <- resultRowsChanged resPtr
c_duckdb_destroy_result resPtr
pure changed
else do
(errMsg, errType) <- fetchResultError resPtr
c_duckdb_destroy_result resPtr
throwIO $ mkExecuteError queryText errMsg errType
executeNamed :: Connection -> Query -> [NamedParam] -> IO Int
executeNamed :: Connection -> Query -> [NamedParam] -> IO Int
executeNamed Connection
conn Query
queryText [NamedParam]
params =
Connection -> Query -> (Statement -> IO Int) -> IO Int
forall a. Connection -> Query -> (Statement -> IO a) -> IO a
withStatement Connection
conn Query
queryText \Statement
stmt -> do
Statement -> [NamedParam] -> IO ()
bindNamed Statement
stmt [NamedParam]
params
Statement -> IO Int
executeStatement Statement
stmt
query :: (ToRow q, FromRow r) => Connection -> Query -> q -> IO [r]
query :: forall q r.
(ToRow q, FromRow r) =>
Connection -> Query -> q -> IO [r]
query = RowParser r -> Connection -> Query -> q -> IO [r]
forall q r.
ToRow q =>
RowParser r -> Connection -> Query -> q -> IO [r]
queryWith RowParser r
forall a. FromRow a => RowParser a
fromRow
queryWith :: (ToRow q) => RowParser r -> Connection -> Query -> q -> IO [r]
queryWith :: forall q r.
ToRow q =>
RowParser r -> Connection -> Query -> q -> IO [r]
queryWith RowParser r
parser Connection
conn Query
queryText q
params =
Connection -> Query -> (Statement -> IO [r]) -> IO [r]
forall a. Connection -> Query -> (Statement -> IO a) -> IO a
withStatement Connection
conn Query
queryText \Statement
stmt -> do
Statement -> [FieldBinding] -> IO ()
bind Statement
stmt (q -> [FieldBinding]
forall a. ToRow a => a -> [FieldBinding]
toRow q
params)
Statement -> (DuckDBPreparedStatement -> IO [r]) -> IO [r]
forall a. Statement -> (DuckDBPreparedStatement -> IO a) -> IO a
withStatementHandle Statement
stmt \DuckDBPreparedStatement
handle ->
(Ptr DuckDBResult -> IO [r]) -> IO [r]
forall a b. Storable a => (Ptr a -> IO b) -> IO b
alloca \Ptr DuckDBResult
resPtr -> do
rc <- DuckDBPreparedStatement -> Ptr DuckDBResult -> IO DuckDBState
c_duckdb_execute_prepared DuckDBPreparedStatement
handle Ptr DuckDBResult
resPtr
if rc == DuckDBSuccess
then do
rows <- collectRows resPtr
c_duckdb_destroy_result resPtr
convertRowsWith parser queryText rows
else do
(errMsg, errType) <- fetchResultError resPtr
c_duckdb_destroy_result resPtr
throwIO $ mkExecuteError queryText errMsg errType
queryNamed :: (FromRow r) => Connection -> Query -> [NamedParam] -> IO [r]
queryNamed :: forall r.
FromRow r =>
Connection -> Query -> [NamedParam] -> IO [r]
queryNamed Connection
conn Query
queryText [NamedParam]
params =
Connection -> Query -> (Statement -> IO [r]) -> IO [r]
forall a. Connection -> Query -> (Statement -> IO a) -> IO a
withStatement Connection
conn Query
queryText \Statement
stmt -> do
Statement -> [NamedParam] -> IO ()
bindNamed Statement
stmt [NamedParam]
params
Statement -> (DuckDBPreparedStatement -> IO [r]) -> IO [r]
forall a. Statement -> (DuckDBPreparedStatement -> IO a) -> IO a
withStatementHandle Statement
stmt \DuckDBPreparedStatement
handle ->
(Ptr DuckDBResult -> IO [r]) -> IO [r]
forall a b. Storable a => (Ptr a -> IO b) -> IO b
alloca \Ptr DuckDBResult
resPtr -> do
rc <- DuckDBPreparedStatement -> Ptr DuckDBResult -> IO DuckDBState
c_duckdb_execute_prepared DuckDBPreparedStatement
handle Ptr DuckDBResult
resPtr
if rc == DuckDBSuccess
then do
rows <- collectRows resPtr
c_duckdb_destroy_result resPtr
convertRows queryText rows
else do
(errMsg, errType) <- fetchResultError resPtr
c_duckdb_destroy_result resPtr
throwIO $ mkExecuteError queryText errMsg errType
query_ :: (FromRow r) => Connection -> Query -> IO [r]
query_ :: forall r. FromRow r => Connection -> Query -> IO [r]
query_ = RowParser r -> Connection -> Query -> IO [r]
forall r. RowParser r -> Connection -> Query -> IO [r]
queryWith_ RowParser r
forall a. FromRow a => RowParser a
fromRow
queryWith_ :: RowParser r -> Connection -> Query -> IO [r]
queryWith_ :: forall r. RowParser r -> Connection -> Query -> IO [r]
queryWith_ RowParser r
parser Connection
conn Query
queryText =
Connection -> (DuckDBConnection -> IO [r]) -> IO [r]
forall a. Connection -> (DuckDBConnection -> IO a) -> IO a
withConnectionHandle Connection
conn \DuckDBConnection
connPtr ->
Query -> (CString -> IO [r]) -> IO [r]
forall a. Query -> (CString -> IO a) -> IO a
withQueryCString Query
queryText \CString
sql ->
(Ptr DuckDBResult -> IO [r]) -> IO [r]
forall a b. Storable a => (Ptr a -> IO b) -> IO b
alloca \Ptr DuckDBResult
resPtr -> do
rc <- DuckDBConnection -> CString -> Ptr DuckDBResult -> IO DuckDBState
c_duckdb_query DuckDBConnection
connPtr CString
sql Ptr DuckDBResult
resPtr
if rc == DuckDBSuccess
then do
rows <- collectRows resPtr
c_duckdb_destroy_result resPtr
convertRowsWith parser queryText rows
else do
(errMsg, errType) <- fetchResultError resPtr
c_duckdb_destroy_result resPtr
throwIO $ mkExecuteError queryText errMsg errType
fold :: (FromRow row, ToRow params) => Connection -> Query -> params -> a -> (a -> row -> IO a) -> IO a
fold :: forall row params a.
(FromRow row, ToRow params) =>
Connection -> Query -> params -> a -> (a -> row -> IO a) -> IO a
fold Connection
conn Query
queryText params
params a
initial a -> row -> IO a
step =
Connection -> Query -> (Statement -> IO a) -> IO a
forall a. Connection -> Query -> (Statement -> IO a) -> IO a
withStatement Connection
conn Query
queryText \Statement
stmt -> do
Statement -> IO ()
resetStatementStream Statement
stmt
Statement -> [FieldBinding] -> IO ()
bind Statement
stmt (params -> [FieldBinding]
forall a. ToRow a => a -> [FieldBinding]
toRow params
params)
RowParser row -> Statement -> a -> (a -> row -> IO a) -> IO a
forall row a.
RowParser row -> Statement -> a -> (a -> row -> IO a) -> IO a
foldStatementWith RowParser row
forall a. FromRow a => RowParser a
fromRow Statement
stmt a
initial a -> row -> IO a
step
fold_ :: (FromRow row) => Connection -> Query -> a -> (a -> row -> IO a) -> IO a
fold_ :: forall row a.
FromRow row =>
Connection -> Query -> a -> (a -> row -> IO a) -> IO a
fold_ Connection
conn Query
queryText a
initial a -> row -> IO a
step =
Connection -> Query -> (Statement -> IO a) -> IO a
forall a. Connection -> Query -> (Statement -> IO a) -> IO a
withStatement Connection
conn Query
queryText \Statement
stmt -> do
Statement -> IO ()
resetStatementStream Statement
stmt
RowParser row -> Statement -> a -> (a -> row -> IO a) -> IO a
forall row a.
RowParser row -> Statement -> a -> (a -> row -> IO a) -> IO a
foldStatementWith RowParser row
forall a. FromRow a => RowParser a
fromRow Statement
stmt a
initial a -> row -> IO a
step
foldNamed :: (FromRow row) => Connection -> Query -> [NamedParam] -> a -> (a -> row -> IO a) -> IO a
foldNamed :: forall row a.
FromRow row =>
Connection
-> Query -> [NamedParam] -> a -> (a -> row -> IO a) -> IO a
foldNamed Connection
conn Query
queryText [NamedParam]
params a
initial a -> row -> IO a
step =
Connection -> Query -> (Statement -> IO a) -> IO a
forall a. Connection -> Query -> (Statement -> IO a) -> IO a
withStatement Connection
conn Query
queryText \Statement
stmt -> do
Statement -> IO ()
resetStatementStream Statement
stmt
Statement -> [NamedParam] -> IO ()
bindNamed Statement
stmt [NamedParam]
params
RowParser row -> Statement -> a -> (a -> row -> IO a) -> IO a
forall row a.
RowParser row -> Statement -> a -> (a -> row -> IO a) -> IO a
foldStatementWith RowParser row
forall a. FromRow a => RowParser a
fromRow Statement
stmt a
initial a -> row -> IO a
step
foldStatementWith :: RowParser row -> Statement -> a -> (a -> row -> IO a) -> IO a
foldStatementWith :: forall row a.
RowParser row -> Statement -> a -> (a -> row -> IO a) -> IO a
foldStatementWith RowParser row
parser Statement
stmt a
initial a -> row -> IO a
step =
let loop :: a -> IO a
loop a
acc = do
nextVal <- RowParser row -> Statement -> IO (Maybe row)
forall r. RowParser r -> Statement -> IO (Maybe r)
nextRowWith RowParser row
parser Statement
stmt
case nextVal of
Maybe row
Nothing -> a -> IO a
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
acc
Just row
row -> do
acc' <- a -> row -> IO a
step a
acc row
row
acc' `seq` loop acc'
in a -> IO a
loop a
initial IO a -> IO () -> IO a
forall a b. IO a -> IO b -> IO a
`finally` Statement -> IO ()
resetStatementStream Statement
stmt
nextRow :: (FromRow r) => Statement -> IO (Maybe r)
nextRow :: forall r. FromRow r => Statement -> IO (Maybe r)
nextRow = RowParser r -> Statement -> IO (Maybe r)
forall r. RowParser r -> Statement -> IO (Maybe r)
nextRowWith RowParser r
forall a. FromRow a => RowParser a
fromRow
nextRowWith :: RowParser r -> Statement -> IO (Maybe r)
nextRowWith :: forall r. RowParser r -> Statement -> IO (Maybe r)
nextRowWith RowParser r
parser stmt :: Statement
stmt@Statement{IORef StatementStreamState
statementStream :: IORef StatementStreamState
statementStream :: Statement -> IORef StatementStreamState
statementStream} =
((forall a. IO a -> IO a) -> IO (Maybe r)) -> IO (Maybe r)
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask \forall a. IO a -> IO a
restore -> do
state <- IORef StatementStreamState -> IO StatementStreamState
forall a. IORef a -> IO a
readIORef IORef StatementStreamState
statementStream
case state of
StatementStreamState
StatementStreamIdle -> do
newStream <- IO (Maybe StatementStream) -> IO (Maybe StatementStream)
forall a. IO a -> IO a
restore (Statement -> IO (Maybe StatementStream)
startStatementStream Statement
stmt)
case newStream of
Maybe StatementStream
Nothing -> Maybe r -> IO (Maybe r)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe r
forall a. Maybe a
Nothing
Just StatementStream
stream -> IO (Maybe r) -> IO (Maybe r)
forall a. IO a -> IO a
restore (IORef StatementStreamState
-> RowParser r -> Statement -> StatementStream -> IO (Maybe r)
forall r.
IORef StatementStreamState
-> RowParser r -> Statement -> StatementStream -> IO (Maybe r)
consumeStream IORef StatementStreamState
statementStream RowParser r
parser Statement
stmt StatementStream
stream)
StatementStreamActive StatementStream
stream ->
IO (Maybe r) -> IO (Maybe r)
forall a. IO a -> IO a
restore (IORef StatementStreamState
-> RowParser r -> Statement -> StatementStream -> IO (Maybe r)
forall r.
IORef StatementStreamState
-> RowParser r -> Statement -> StatementStream -> IO (Maybe r)
consumeStream IORef StatementStreamState
statementStream RowParser r
parser Statement
stmt StatementStream
stream)
resetStatementStream :: Statement -> IO ()
resetStatementStream :: Statement -> IO ()
resetStatementStream Statement{IORef StatementStreamState
statementStream :: Statement -> IORef StatementStreamState
statementStream :: IORef StatementStreamState
statementStream} =
IORef StatementStreamState -> IO ()
cleanupStatementStreamRef IORef StatementStreamState
statementStream
consumeStream :: IORef StatementStreamState -> RowParser r -> Statement -> StatementStream -> IO (Maybe r)
consumeStream :: forall r.
IORef StatementStreamState
-> RowParser r -> Statement -> StatementStream -> IO (Maybe r)
consumeStream IORef StatementStreamState
streamRef RowParser r
parser Statement
stmt StatementStream
stream = do
result <-
( IO (Maybe [Field], StatementStream)
-> IO (Either SomeException (Maybe [Field], StatementStream))
forall e a. Exception e => IO a -> IO (Either e a)
try (Query -> StatementStream -> IO (Maybe [Field], StatementStream)
streamNextRow (Statement -> Query
statementQuery Statement
stmt) StatementStream
stream) ::
IO (Either SomeException (Maybe [Field], StatementStream))
)
case result of
Left SomeException
err -> do
StatementStream -> IO ()
finalizeStream StatementStream
stream
IORef StatementStreamState -> StatementStreamState -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef StatementStreamState
streamRef StatementStreamState
StatementStreamIdle
SomeException -> IO (Maybe r)
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO SomeException
err
Right (Maybe [Field]
maybeFields, StatementStream
updatedStream) ->
case Maybe [Field]
maybeFields of
Maybe [Field]
Nothing -> do
StatementStream -> IO ()
finalizeStream StatementStream
updatedStream
IORef StatementStreamState -> StatementStreamState -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef StatementStreamState
streamRef StatementStreamState
StatementStreamIdle
Maybe r -> IO (Maybe r)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe r
forall a. Maybe a
Nothing
Just [Field]
fields ->
case RowParser r -> [Field] -> Ok r
forall a. RowParser a -> [Field] -> Ok a
parseRow RowParser r
parser [Field]
fields of
Errors [SomeException]
rowErr -> do
StatementStream -> IO ()
finalizeStream StatementStream
updatedStream
IORef StatementStreamState -> StatementStreamState -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef StatementStreamState
streamRef StatementStreamState
StatementStreamIdle
SQLError -> IO (Maybe r)
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO (SQLError -> IO (Maybe r)) -> SQLError -> IO (Maybe r)
forall a b. (a -> b) -> a -> b
$ Query -> [SomeException] -> SQLError
rowErrorsToSqlError (Statement -> Query
statementQuery Statement
stmt) [SomeException]
rowErr
Ok r
value -> do
IORef StatementStreamState -> StatementStreamState -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef StatementStreamState
streamRef (StatementStream -> StatementStreamState
StatementStreamActive StatementStream
updatedStream)
Maybe r -> IO (Maybe r)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (r -> Maybe r
forall a. a -> Maybe a
Just r
value)
startStatementStream :: Statement -> IO (Maybe StatementStream)
startStatementStream :: Statement -> IO (Maybe StatementStream)
startStatementStream Statement
stmt =
Statement
-> (DuckDBPreparedStatement -> IO (Maybe StatementStream))
-> IO (Maybe StatementStream)
forall a. Statement -> (DuckDBPreparedStatement -> IO a) -> IO a
withStatementHandle Statement
stmt \DuckDBPreparedStatement
handle -> do
columns <- DuckDBPreparedStatement -> IO [StatementStreamColumn]
collectStreamColumns DuckDBPreparedStatement
handle
resultPtr <- malloc
rc <- c_duckdb_execute_prepared handle resultPtr
if rc /= DuckDBSuccess
then do
(errMsg, errType) <- fetchResultError resultPtr
c_duckdb_destroy_result resultPtr
free resultPtr
throwIO $ mkExecuteError (statementQuery stmt) errMsg errType
else do
resultType <- c_duckdb_result_return_type resultPtr
if resultType /= DuckDBResultTypeQueryResult
then do
c_duckdb_destroy_result resultPtr
free resultPtr
pure Nothing
else pure (Just (StatementStream resultPtr columns Nothing))
collectStreamColumns :: DuckDBPreparedStatement -> IO [StatementStreamColumn]
collectStreamColumns :: DuckDBPreparedStatement -> IO [StatementStreamColumn]
collectStreamColumns DuckDBPreparedStatement
handle = do
rawCount <- DuckDBPreparedStatement -> IO DuckDBIdx
c_duckdb_prepared_statement_column_count DuckDBPreparedStatement
handle
let cc = DuckDBIdx -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral DuckDBIdx
rawCount :: Int
forM [0 .. cc - 1] \Int
idx -> do
namePtr <- DuckDBPreparedStatement -> DuckDBIdx -> IO CString
c_duckdb_prepared_statement_column_name DuckDBPreparedStatement
handle (Int -> DuckDBIdx
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
idx)
name <-
if namePtr == nullPtr
then pure (Text.pack ("column" <> show idx))
else Text.pack <$> peekCString namePtr
dtype <- c_duckdb_prepared_statement_column_type handle (fromIntegral idx)
pure
StatementStreamColumn
{ statementStreamColumnIndex = idx
, statementStreamColumnName = name
, statementStreamColumnType = dtype
}
streamNextRow :: Query -> StatementStream -> IO (Maybe [Field], StatementStream)
streamNextRow :: Query -> StatementStream -> IO (Maybe [Field], StatementStream)
streamNextRow Query
queryText stream :: StatementStream
stream@StatementStream{statementStreamChunk :: StatementStream -> Maybe StatementStreamChunk
statementStreamChunk = Maybe StatementStreamChunk
Nothing} = do
refreshed <- StatementStream -> IO StatementStream
fetchChunk StatementStream
stream
case statementStreamChunk refreshed of
Maybe StatementStreamChunk
Nothing -> (Maybe [Field], StatementStream)
-> IO (Maybe [Field], StatementStream)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe [Field]
forall a. Maybe a
Nothing, StatementStream
refreshed)
Just StatementStreamChunk
chunk -> Query
-> StatementStream
-> StatementStreamChunk
-> IO (Maybe [Field], StatementStream)
emitRow Query
queryText StatementStream
refreshed StatementStreamChunk
chunk
streamNextRow Query
queryText stream :: StatementStream
stream@StatementStream{statementStreamChunk :: StatementStream -> Maybe StatementStreamChunk
statementStreamChunk = Just StatementStreamChunk
chunk} =
Query
-> StatementStream
-> StatementStreamChunk
-> IO (Maybe [Field], StatementStream)
emitRow Query
queryText StatementStream
stream StatementStreamChunk
chunk
fetchChunk :: StatementStream -> IO StatementStream
fetchChunk :: StatementStream -> IO StatementStream
fetchChunk stream :: StatementStream
stream@StatementStream{Ptr DuckDBResult
statementStreamResult :: Ptr DuckDBResult
statementStreamResult :: StatementStream -> Ptr DuckDBResult
statementStreamResult} = do
chunk <- Ptr DuckDBResult -> IO DuckDBDataChunk
c_duckdb_fetch_chunk Ptr DuckDBResult
statementStreamResult
if chunk == nullPtr
then pure stream
else do
rawSize <- c_duckdb_data_chunk_get_size chunk
let rowCount = DuckDBIdx -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral DuckDBIdx
rawSize :: Int
if rowCount <= 0
then do
destroyDataChunk chunk
fetchChunk stream
else do
vectors <- prepareChunkVectors chunk (statementStreamColumns stream)
let chunkState =
StatementStreamChunk
{ statementStreamChunkPtr :: DuckDBDataChunk
statementStreamChunkPtr = DuckDBDataChunk
chunk
, statementStreamChunkSize :: Int
statementStreamChunkSize = Int
rowCount
, statementStreamChunkIndex :: Int
statementStreamChunkIndex = Int
0
, statementStreamChunkVectors :: [StatementStreamChunkVector]
statementStreamChunkVectors = [StatementStreamChunkVector]
vectors
}
pure stream{statementStreamChunk = Just chunkState}
prepareChunkVectors :: DuckDBDataChunk -> [StatementStreamColumn] -> IO [StatementStreamChunkVector]
prepareChunkVectors :: DuckDBDataChunk
-> [StatementStreamColumn] -> IO [StatementStreamChunkVector]
prepareChunkVectors DuckDBDataChunk
chunk [StatementStreamColumn]
columns =
[StatementStreamColumn]
-> (StatementStreamColumn -> IO StatementStreamChunkVector)
-> IO [StatementStreamChunkVector]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [StatementStreamColumn]
columns \StatementStreamColumn{Int
statementStreamColumnIndex :: StatementStreamColumn -> Int
statementStreamColumnIndex :: Int
statementStreamColumnIndex} -> do
vector <- DuckDBDataChunk -> DuckDBIdx -> IO DuckDBVector
c_duckdb_data_chunk_get_vector DuckDBDataChunk
chunk (Int -> DuckDBIdx
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
statementStreamColumnIndex)
dataPtr <- c_duckdb_vector_get_data vector
validity <- c_duckdb_vector_get_validity vector
pure
StatementStreamChunkVector
{ statementStreamChunkVectorHandle = vector
, statementStreamChunkVectorData = dataPtr
, statementStreamChunkVectorValidity = validity
}
emitRow :: Query -> StatementStream -> StatementStreamChunk -> IO (Maybe [Field], StatementStream)
emitRow :: Query
-> StatementStream
-> StatementStreamChunk
-> IO (Maybe [Field], StatementStream)
emitRow Query
queryText StatementStream
stream chunk :: StatementStreamChunk
chunk@StatementStreamChunk{Int
statementStreamChunkIndex :: StatementStreamChunk -> Int
statementStreamChunkIndex :: Int
statementStreamChunkIndex, Int
statementStreamChunkSize :: StatementStreamChunk -> Int
statementStreamChunkSize :: Int
statementStreamChunkSize} = do
fields <-
Query
-> [StatementStreamColumn]
-> [StatementStreamChunkVector]
-> Int
-> IO [Field]
buildRow
Query
queryText
(StatementStream -> [StatementStreamColumn]
statementStreamColumns StatementStream
stream)
(StatementStreamChunk -> [StatementStreamChunkVector]
statementStreamChunkVectors StatementStreamChunk
chunk)
Int
statementStreamChunkIndex
let nextIndex = Int
statementStreamChunkIndex Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
if nextIndex < statementStreamChunkSize
then
let updatedChunk = StatementStreamChunk
chunk{statementStreamChunkIndex = nextIndex}
in pure (Just fields, stream{statementStreamChunk = Just updatedChunk})
else do
destroyDataChunk (statementStreamChunkPtr chunk)
pure (Just fields, stream{statementStreamChunk = Nothing})
buildRow :: Query -> [StatementStreamColumn] -> [StatementStreamChunkVector] -> Int -> IO [Field]
buildRow :: Query
-> [StatementStreamColumn]
-> [StatementStreamChunkVector]
-> Int
-> IO [Field]
buildRow Query
queryText [StatementStreamColumn]
columns [StatementStreamChunkVector]
vectors Int
rowIdx =
(StatementStreamColumn -> StatementStreamChunkVector -> IO Field)
-> [StatementStreamColumn]
-> [StatementStreamChunkVector]
-> IO [Field]
forall (m :: * -> *) a b c.
Applicative m =>
(a -> b -> m c) -> [a] -> [b] -> m [c]
zipWithM (Query
-> Int
-> StatementStreamColumn
-> StatementStreamChunkVector
-> IO Field
buildField Query
queryText Int
rowIdx) [StatementStreamColumn]
columns [StatementStreamChunkVector]
vectors
buildField :: Query -> Int -> StatementStreamColumn -> StatementStreamChunkVector -> IO Field
buildField :: Query
-> Int
-> StatementStreamColumn
-> StatementStreamChunkVector
-> IO Field
buildField Query
queryText Int
rowIdx StatementStreamColumn
column StatementStreamChunkVector{DuckDBVector
statementStreamChunkVectorHandle :: StatementStreamChunkVector -> DuckDBVector
statementStreamChunkVectorHandle :: DuckDBVector
statementStreamChunkVectorHandle, Ptr ()
statementStreamChunkVectorData :: StatementStreamChunkVector -> Ptr ()
statementStreamChunkVectorData :: Ptr ()
statementStreamChunkVectorData, Ptr DuckDBIdx
statementStreamChunkVectorValidity :: StatementStreamChunkVector -> Ptr DuckDBIdx
statementStreamChunkVectorValidity :: Ptr DuckDBIdx
statementStreamChunkVectorValidity} = do
let dtype :: DuckDBType
dtype = StatementStreamColumn -> DuckDBType
statementStreamColumnType StatementStreamColumn
column
value <-
case DuckDBType
dtype of
DuckDBType
DuckDBTypeStruct ->
SQLError -> IO FieldValue
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO (Query -> StatementStreamColumn -> SQLError
streamingUnsupportedTypeError Query
queryText StatementStreamColumn
column)
DuckDBType
DuckDBTypeUnion ->
SQLError -> IO FieldValue
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO (Query -> StatementStreamColumn -> SQLError
streamingUnsupportedTypeError Query
queryText StatementStreamColumn
column)
DuckDBType
_ ->
DuckDBType
-> DuckDBVector -> Ptr () -> Ptr DuckDBIdx -> Int -> IO FieldValue
materializeValue
DuckDBType
dtype
DuckDBVector
statementStreamChunkVectorHandle
Ptr ()
statementStreamChunkVectorData
Ptr DuckDBIdx
statementStreamChunkVectorValidity
Int
rowIdx
pure
Field
{ fieldName = statementStreamColumnName column
, fieldIndex = statementStreamColumnIndex column
, fieldValue = value
}
cleanupStatementStreamRef :: IORef StatementStreamState -> IO ()
cleanupStatementStreamRef :: IORef StatementStreamState -> IO ()
cleanupStatementStreamRef IORef StatementStreamState
ref = do
state <- IORef StatementStreamState
-> (StatementStreamState
-> (StatementStreamState, StatementStreamState))
-> IO StatementStreamState
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef StatementStreamState
ref (StatementStreamState
StatementStreamIdle,)
finalizeStreamState state
finalizeStreamState :: StatementStreamState -> IO ()
finalizeStreamState :: StatementStreamState -> IO ()
finalizeStreamState = \case
StatementStreamState
StatementStreamIdle -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
StatementStreamActive StatementStream
stream -> StatementStream -> IO ()
finalizeStream StatementStream
stream
finalizeStream :: StatementStream -> IO ()
finalizeStream :: StatementStream -> IO ()
finalizeStream StatementStream{Ptr DuckDBResult
statementStreamResult :: StatementStream -> Ptr DuckDBResult
statementStreamResult :: Ptr DuckDBResult
statementStreamResult, Maybe StatementStreamChunk
statementStreamChunk :: StatementStream -> Maybe StatementStreamChunk
statementStreamChunk :: Maybe StatementStreamChunk
statementStreamChunk} = do
IO ()
-> (StatementStreamChunk -> IO ())
-> Maybe StatementStreamChunk
-> IO ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) StatementStreamChunk -> IO ()
finalizeChunk Maybe StatementStreamChunk
statementStreamChunk
Ptr DuckDBResult -> IO ()
c_duckdb_destroy_result Ptr DuckDBResult
statementStreamResult
Ptr DuckDBResult -> IO ()
forall a. Ptr a -> IO ()
free Ptr DuckDBResult
statementStreamResult
finalizeChunk :: StatementStreamChunk -> IO ()
finalizeChunk :: StatementStreamChunk -> IO ()
finalizeChunk StatementStreamChunk{DuckDBDataChunk
statementStreamChunkPtr :: StatementStreamChunk -> DuckDBDataChunk
statementStreamChunkPtr :: DuckDBDataChunk
statementStreamChunkPtr} =
DuckDBDataChunk -> IO ()
destroyDataChunk DuckDBDataChunk
statementStreamChunkPtr
destroyDataChunk :: DuckDBDataChunk -> IO ()
destroyDataChunk :: DuckDBDataChunk -> IO ()
destroyDataChunk DuckDBDataChunk
chunk =
(Ptr DuckDBDataChunk -> IO ()) -> IO ()
forall a b. Storable a => (Ptr a -> IO b) -> IO b
alloca \Ptr DuckDBDataChunk
ptr -> do
Ptr DuckDBDataChunk -> DuckDBDataChunk -> IO ()
forall a. Storable a => Ptr a -> a -> IO ()
poke Ptr DuckDBDataChunk
ptr DuckDBDataChunk
chunk
Ptr DuckDBDataChunk -> IO ()
c_duckdb_destroy_data_chunk Ptr DuckDBDataChunk
ptr
streamingUnsupportedTypeError :: Query -> StatementStreamColumn -> SQLError
streamingUnsupportedTypeError :: Query -> StatementStreamColumn -> SQLError
streamingUnsupportedTypeError Query
queryText StatementStreamColumn{Text
statementStreamColumnName :: StatementStreamColumn -> Text
statementStreamColumnName :: Text
statementStreamColumnName, DuckDBType
statementStreamColumnType :: StatementStreamColumn -> DuckDBType
statementStreamColumnType :: DuckDBType
statementStreamColumnType} =
SQLError
{ sqlErrorMessage :: Text
sqlErrorMessage =
[Text] -> Text
Text.concat
[ String -> Text
Text.pack String
"duckdb-simple: streaming does not yet support column "
, Text
statementStreamColumnName
, String -> Text
Text.pack String
" with DuckDB type "
, String -> Text
Text.pack (DuckDBType -> String
forall a. Show a => a -> String
show DuckDBType
statementStreamColumnType)
]
, sqlErrorType :: Maybe DuckDBErrorType
sqlErrorType = Maybe DuckDBErrorType
forall a. Maybe a
Nothing
, sqlErrorQuery :: Maybe Query
sqlErrorQuery = Query -> Maybe Query
forall a. a -> Maybe a
Just Query
queryText
}
withTransaction :: Connection -> IO a -> IO a
withTransaction :: forall a. Connection -> IO a -> IO a
withTransaction Connection
conn IO a
action =
((forall a. IO a -> IO a) -> IO a) -> IO a
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask \forall a. IO a -> IO a
restore -> do
IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Connection -> Query -> IO Int
execute_ Connection
conn Query
begin)
let rollbackAction :: IO ()
rollbackAction = IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Connection -> Query -> IO Int
execute_ Connection
conn Query
rollback)
result <- IO a -> IO a
forall a. IO a -> IO a
restore IO a
action IO a -> IO () -> IO a
forall a b. IO a -> IO b -> IO a
`onException` IO ()
rollbackAction
void (execute_ conn commit)
pure result
where
begin :: Query
begin = Text -> Query
Query (String -> Text
Text.pack String
"BEGIN TRANSACTION")
commit :: Query
commit = Text -> Query
Query (String -> Text
Text.pack String
"COMMIT")
rollback :: Query
rollback = Text -> Query
Query (String -> Text
Text.pack String
"ROLLBACK")
createConnection :: DuckDBDatabase -> DuckDBConnection -> IO Connection
createConnection :: DuckDBDatabase -> DuckDBConnection -> IO Connection
createConnection DuckDBDatabase
db DuckDBConnection
conn = do
ref <- ConnectionState -> IO (IORef ConnectionState)
forall a. a -> IO (IORef a)
newIORef (DuckDBDatabase -> DuckDBConnection -> ConnectionState
ConnectionOpen DuckDBDatabase
db DuckDBConnection
conn)
_ <-
mkWeakIORef ref $
void $
atomicModifyIORef' ref \case
ConnectionState
ConnectionClosed -> (ConnectionState
ConnectionClosed, () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
openState :: ConnectionState
openState@(ConnectionOpen{}) ->
(ConnectionState
ConnectionClosed, ConnectionState -> IO ()
closeHandles ConnectionState
openState)
pure Connection{connectionState = ref}
createStatement :: Connection -> DuckDBPreparedStatement -> Query -> IO Statement
createStatement :: Connection -> DuckDBPreparedStatement -> Query -> IO Statement
createStatement Connection
parent DuckDBPreparedStatement
handle Query
queryText = do
ref <- StatementState -> IO (IORef StatementState)
forall a. a -> IO (IORef a)
newIORef (DuckDBPreparedStatement -> StatementState
StatementOpen DuckDBPreparedStatement
handle)
streamRef <- newIORef StatementStreamIdle
_ <-
mkWeakIORef ref $
do
join $
atomicModifyIORef' ref $ \case
StatementState
StatementClosed -> (StatementState
StatementClosed, () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
StatementOpen{DuckDBPreparedStatement
statementHandle :: StatementState -> DuckDBPreparedStatement
statementHandle :: DuckDBPreparedStatement
statementHandle} ->
( StatementState
StatementClosed
, do
IORef StatementStreamState -> IO ()
cleanupStatementStreamRef IORef StatementStreamState
streamRef
DuckDBPreparedStatement -> IO ()
destroyPrepared DuckDBPreparedStatement
statementHandle
)
pure
Statement
{ statementState = ref
, statementConnection = parent
, statementQuery = queryText
, statementStream = streamRef
}
openDatabaseWithConfig :: FilePath -> [(Text, Text)] -> IO DuckDBDatabase
openDatabaseWithConfig :: String -> [(Text, Text)] -> IO DuckDBDatabase
openDatabaseWithConfig String
path [(Text, Text)]
settings =
(Ptr DuckDBDatabase -> IO DuckDBDatabase) -> IO DuckDBDatabase
forall a b. Storable a => (Ptr a -> IO b) -> IO b
alloca \Ptr DuckDBDatabase
dbPtr ->
(Ptr DuckDBConfig -> IO DuckDBDatabase) -> IO DuckDBDatabase
forall a b. Storable a => (Ptr a -> IO b) -> IO b
alloca \Ptr DuckDBConfig
configPtr ->
(Ptr CString -> IO DuckDBDatabase) -> IO DuckDBDatabase
forall a b. Storable a => (Ptr a -> IO b) -> IO b
alloca \Ptr CString
errPtr -> do
rcConfig <- Ptr DuckDBConfig -> IO DuckDBState
c_duckdb_create_config Ptr DuckDBConfig
configPtr
when (rcConfig /= DuckDBSuccess) $
throwIO (mkOpenError (Text.pack "duckdb-simple: failed to allocate DuckDB config"))
config <- peek configPtr
poke errPtr nullPtr
let destroyConfig =
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (DuckDBConfig
config DuckDBConfig -> DuckDBConfig -> Bool
forall a. Eq a => a -> a -> Bool
/= DuckDBConfig
forall a. Ptr a
nullPtr) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
(Ptr DuckDBConfig -> IO ()) -> IO ()
forall a b. Storable a => (Ptr a -> IO b) -> IO b
alloca \Ptr DuckDBConfig
cfgPtr -> Ptr DuckDBConfig -> DuckDBConfig -> IO ()
forall a. Storable a => Ptr a -> a -> IO ()
poke Ptr DuckDBConfig
cfgPtr DuckDBConfig
config IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Ptr DuckDBConfig -> IO ()
c_duckdb_destroy_config Ptr DuckDBConfig
cfgPtr
flip finally destroyConfig $
do
forM_ settings \(Text
name, Text
value) ->
Text -> (CString -> IO ()) -> IO ()
forall a. Text -> (CString -> IO a) -> IO a
TextForeign.withCString Text
name \CString
cName ->
Text -> (CString -> IO ()) -> IO ()
forall a. Text -> (CString -> IO a) -> IO a
TextForeign.withCString Text
value \CString
cValue -> do
rcSet <- DuckDBConfig -> CString -> CString -> IO DuckDBState
c_duckdb_set_config DuckDBConfig
config CString
cName CString
cValue
when (rcSet /= DuckDBSuccess) $
throwIO $
mkOpenError $
Text.concat
[ Text.pack "duckdb-simple: failed to set config option "
, name
]
withCString path \CString
cPath -> do
rc <- CString
-> Ptr DuckDBDatabase
-> DuckDBConfig
-> Ptr CString
-> IO DuckDBState
c_duckdb_open_ext CString
cPath Ptr DuckDBDatabase
dbPtr DuckDBConfig
config Ptr CString
errPtr
if rc == DuckDBSuccess
then do
db <- peek dbPtr
maybeFreeErr errPtr
pure db
else do
errMsg <- peekError errPtr
maybeFreeErr errPtr
throwIO $ mkOpenError errMsg
connectDatabase :: DuckDBDatabase -> IO DuckDBConnection
connectDatabase :: DuckDBDatabase -> IO DuckDBConnection
connectDatabase DuckDBDatabase
db =
(Ptr DuckDBConnection -> IO DuckDBConnection)
-> IO DuckDBConnection
forall a b. Storable a => (Ptr a -> IO b) -> IO b
alloca \Ptr DuckDBConnection
connPtr -> do
rc <- DuckDBDatabase -> Ptr DuckDBConnection -> IO DuckDBState
c_duckdb_connect DuckDBDatabase
db Ptr DuckDBConnection
connPtr
if rc == DuckDBSuccess
then peek connPtr
else throwIO mkConnectError
closeHandles :: ConnectionState -> IO ()
closeHandles :: ConnectionState -> IO ()
closeHandles ConnectionState
ConnectionClosed = () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
closeHandles ConnectionOpen{DuckDBDatabase
connectionDatabase :: DuckDBDatabase
connectionDatabase :: ConnectionState -> DuckDBDatabase
connectionDatabase, DuckDBConnection
connectionHandle :: DuckDBConnection
connectionHandle :: ConnectionState -> DuckDBConnection
connectionHandle} = do
DuckDBConnection -> IO ()
closeConnectionHandle DuckDBConnection
connectionHandle
DuckDBDatabase -> IO ()
closeDatabaseHandle DuckDBDatabase
connectionDatabase
closeConnectionHandle :: DuckDBConnection -> IO ()
closeConnectionHandle :: DuckDBConnection -> IO ()
closeConnectionHandle DuckDBConnection
conn =
(Ptr DuckDBConnection -> IO ()) -> IO ()
forall a b. Storable a => (Ptr a -> IO b) -> IO b
alloca \Ptr DuckDBConnection
ptr -> Ptr DuckDBConnection -> DuckDBConnection -> IO ()
forall a. Storable a => Ptr a -> a -> IO ()
poke Ptr DuckDBConnection
ptr DuckDBConnection
conn IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Ptr DuckDBConnection -> IO ()
c_duckdb_disconnect Ptr DuckDBConnection
ptr
closeDatabaseHandle :: DuckDBDatabase -> IO ()
closeDatabaseHandle :: DuckDBDatabase -> IO ()
closeDatabaseHandle DuckDBDatabase
db =
(Ptr DuckDBDatabase -> IO ()) -> IO ()
forall a b. Storable a => (Ptr a -> IO b) -> IO b
alloca \Ptr DuckDBDatabase
ptr -> Ptr DuckDBDatabase -> DuckDBDatabase -> IO ()
forall a. Storable a => Ptr a -> a -> IO ()
poke Ptr DuckDBDatabase
ptr DuckDBDatabase
db IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Ptr DuckDBDatabase -> IO ()
c_duckdb_close Ptr DuckDBDatabase
ptr
destroyPrepared :: DuckDBPreparedStatement -> IO ()
destroyPrepared :: DuckDBPreparedStatement -> IO ()
destroyPrepared DuckDBPreparedStatement
stmt =
(Ptr DuckDBPreparedStatement -> IO ()) -> IO ()
forall a b. Storable a => (Ptr a -> IO b) -> IO b
alloca \Ptr DuckDBPreparedStatement
ptr -> Ptr DuckDBPreparedStatement -> DuckDBPreparedStatement -> IO ()
forall a. Storable a => Ptr a -> a -> IO ()
poke Ptr DuckDBPreparedStatement
ptr DuckDBPreparedStatement
stmt IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Ptr DuckDBPreparedStatement -> IO ()
c_duckdb_destroy_prepare Ptr DuckDBPreparedStatement
ptr
fetchPrepareError :: DuckDBPreparedStatement -> IO Text
fetchPrepareError :: DuckDBPreparedStatement -> IO Text
fetchPrepareError DuckDBPreparedStatement
stmt = do
msgPtr <- DuckDBPreparedStatement -> IO CString
c_duckdb_prepare_error DuckDBPreparedStatement
stmt
if msgPtr == nullPtr
then pure (Text.pack "duckdb-simple: prepare failed")
else Text.pack <$> peekCString msgPtr
fetchResultError :: Ptr DuckDBResult -> IO (Text, Maybe DuckDBErrorType)
fetchResultError :: Ptr DuckDBResult -> IO (Text, Maybe DuckDBErrorType)
fetchResultError Ptr DuckDBResult
resultPtr = do
msgPtr <- Ptr DuckDBResult -> IO CString
c_duckdb_result_error Ptr DuckDBResult
resultPtr
msg <-
if msgPtr == nullPtr
then pure (Text.pack "duckdb-simple: query failed")
else Text.pack <$> peekCString msgPtr
errType <- c_duckdb_result_error_type resultPtr
let classified =
if DuckDBErrorType
errType DuckDBErrorType -> DuckDBErrorType -> Bool
forall a. Eq a => a -> a -> Bool
== DuckDBErrorType
DuckDBErrorInvalid
then Maybe DuckDBErrorType
forall a. Maybe a
Nothing
else DuckDBErrorType -> Maybe DuckDBErrorType
forall a. a -> Maybe a
Just DuckDBErrorType
errType
pure (msg, classified)
mkOpenError :: Text -> SQLError
mkOpenError :: Text -> SQLError
mkOpenError Text
msg =
SQLError
{ sqlErrorMessage :: Text
sqlErrorMessage = Text
msg
, sqlErrorType :: Maybe DuckDBErrorType
sqlErrorType = Maybe DuckDBErrorType
forall a. Maybe a
Nothing
, sqlErrorQuery :: Maybe Query
sqlErrorQuery = Maybe Query
forall a. Maybe a
Nothing
}
mkConnectError :: SQLError
mkConnectError :: SQLError
mkConnectError =
SQLError
{ sqlErrorMessage :: Text
sqlErrorMessage = String -> Text
Text.pack String
"duckdb-simple: failed to create connection handle"
, sqlErrorType :: Maybe DuckDBErrorType
sqlErrorType = Maybe DuckDBErrorType
forall a. Maybe a
Nothing
, sqlErrorQuery :: Maybe Query
sqlErrorQuery = Maybe Query
forall a. Maybe a
Nothing
}
mkPrepareError :: Query -> Text -> SQLError
mkPrepareError :: Query -> Text -> SQLError
mkPrepareError Query
queryText Text
msg =
SQLError
{ sqlErrorMessage :: Text
sqlErrorMessage = Text
msg
, sqlErrorType :: Maybe DuckDBErrorType
sqlErrorType = Maybe DuckDBErrorType
forall a. Maybe a
Nothing
, sqlErrorQuery :: Maybe Query
sqlErrorQuery = Query -> Maybe Query
forall a. a -> Maybe a
Just Query
queryText
}
mkExecuteError :: Query -> Text -> Maybe DuckDBErrorType -> SQLError
mkExecuteError :: Query -> Text -> Maybe DuckDBErrorType -> SQLError
mkExecuteError Query
queryText Text
msg Maybe DuckDBErrorType
errType =
SQLError
{ sqlErrorMessage :: Text
sqlErrorMessage = Text
msg
, sqlErrorType :: Maybe DuckDBErrorType
sqlErrorType = Maybe DuckDBErrorType
errType
, sqlErrorQuery :: Maybe Query
sqlErrorQuery = Query -> Maybe Query
forall a. a -> Maybe a
Just Query
queryText
}
throwFormatError :: Statement -> Text -> [String] -> IO a
throwFormatError :: forall a. Statement -> Text -> [String] -> IO a
throwFormatError Statement{Query
statementQuery :: Statement -> Query
statementQuery :: Query
statementQuery} Text
message [String]
params =
FormatError -> IO a
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO
FormatError
{ formatErrorMessage :: Text
formatErrorMessage = Text
message
, formatErrorQuery :: Query
formatErrorQuery = Query
statementQuery
, formatErrorParams :: [String]
formatErrorParams = [String]
params
}
throwFormatErrorBindings :: Statement -> Text -> [FieldBinding] -> IO a
throwFormatErrorBindings :: forall a. Statement -> Text -> [FieldBinding] -> IO a
throwFormatErrorBindings Statement
stmt Text
message [FieldBinding]
bindings =
Statement -> Text -> [String] -> IO a
forall a. Statement -> Text -> [String] -> IO a
throwFormatError Statement
stmt Text
message ((FieldBinding -> String) -> [FieldBinding] -> [String]
forall a b. (a -> b) -> [a] -> [b]
map FieldBinding -> String
renderFieldBinding [FieldBinding]
bindings)
throwFormatErrorNamed :: Statement -> Text -> [(Text, FieldBinding)] -> IO a
throwFormatErrorNamed :: forall a. Statement -> Text -> [(Text, FieldBinding)] -> IO a
throwFormatErrorNamed Statement
stmt Text
message [(Text, FieldBinding)]
bindings =
Statement -> Text -> [String] -> IO a
forall a. Statement -> Text -> [String] -> IO a
throwFormatError Statement
stmt Text
message (((Text, FieldBinding) -> String)
-> [(Text, FieldBinding)] -> [String]
forall a b. (a -> b) -> [a] -> [b]
map (Text, FieldBinding) -> String
renderNamed [(Text, FieldBinding)]
bindings)
where
renderNamed :: (Text, FieldBinding) -> String
renderNamed (Text
name, FieldBinding
binding) =
Text -> String
Text.unpack Text
name String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
" := " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> FieldBinding -> String
renderFieldBinding FieldBinding
binding
columnIndexError :: Statement -> Int -> Maybe Int -> SQLError
columnIndexError :: Statement -> Int -> Maybe Int -> SQLError
columnIndexError Statement
stmt Int
idx Maybe Int
total =
let base :: Text
base =
[Text] -> Text
Text.concat
[ String -> Text
Text.pack String
"duckdb-simple: column index "
, String -> Text
Text.pack (Int -> String
forall a. Show a => a -> String
show Int
idx)
, String -> Text
Text.pack String
" out of bounds"
]
message :: Text
message =
case Maybe Int
total of
Maybe Int
Nothing -> Text
base
Just Int
count ->
[Text] -> Text
Text.concat
[ Text
base
, String -> Text
Text.pack String
" (column count: "
, String -> Text
Text.pack (Int -> String
forall a. Show a => a -> String
show Int
count)
, String -> Text
Text.pack String
")"
]
in SQLError
{ sqlErrorMessage :: Text
sqlErrorMessage = Text
message
, sqlErrorType :: Maybe DuckDBErrorType
sqlErrorType = Maybe DuckDBErrorType
forall a. Maybe a
Nothing
, sqlErrorQuery :: Maybe Query
sqlErrorQuery = Query -> Maybe Query
forall a. a -> Maybe a
Just (Statement -> Query
statementQuery Statement
stmt)
}
columnNameUnavailableError :: Statement -> Int -> SQLError
columnNameUnavailableError :: Statement -> Int -> SQLError
columnNameUnavailableError Statement
stmt Int
idx =
SQLError
{ sqlErrorMessage :: Text
sqlErrorMessage =
[Text] -> Text
Text.concat
[ String -> Text
Text.pack String
"duckdb-simple: column name unavailable for index "
, String -> Text
Text.pack (Int -> String
forall a. Show a => a -> String
show Int
idx)
]
, sqlErrorType :: Maybe DuckDBErrorType
sqlErrorType = Maybe DuckDBErrorType
forall a. Maybe a
Nothing
, sqlErrorQuery :: Maybe Query
sqlErrorQuery = Query -> Maybe Query
forall a. a -> Maybe a
Just (Statement -> Query
statementQuery Statement
stmt)
}
normalizeName :: Text -> Text
normalizeName :: Text -> Text
normalizeName Text
name =
case Text -> Maybe (Char, Text)
Text.uncons Text
name of
Just (Char
prefix, Text
rest)
| Char
prefix Char -> Char -> Bool
forall a. Eq a => a -> a -> Bool
== Char
':' Bool -> Bool -> Bool
|| Char
prefix Char -> Char -> Bool
forall a. Eq a => a -> a -> Bool
== Char
'$' Bool -> Bool -> Bool
|| Char
prefix Char -> Char -> Bool
forall a. Eq a => a -> a -> Bool
== Char
'@' -> Text
rest
Maybe (Char, Text)
_ -> Text
name
resultRowsChanged :: Ptr DuckDBResult -> IO Int
resultRowsChanged :: Ptr DuckDBResult -> IO Int
resultRowsChanged Ptr DuckDBResult
resPtr = DuckDBIdx -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (DuckDBIdx -> Int) -> IO DuckDBIdx -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Ptr DuckDBResult -> IO DuckDBIdx
c_duckdb_rows_changed Ptr DuckDBResult
resPtr
convertRows :: (FromRow r) => Query -> [[Field]] -> IO [r]
convertRows :: forall r. FromRow r => Query -> [[Field]] -> IO [r]
convertRows = RowParser r -> Query -> [[Field]] -> IO [r]
forall r. RowParser r -> Query -> [[Field]] -> IO [r]
convertRowsWith RowParser r
forall a. FromRow a => RowParser a
fromRow
convertRowsWith :: RowParser r -> Query -> [[Field]] -> IO [r]
convertRowsWith :: forall r. RowParser r -> Query -> [[Field]] -> IO [r]
convertRowsWith RowParser r
parser Query
queryText [[Field]]
rows =
case ([Field] -> Ok r) -> [[Field]] -> Ok [r]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> [a] -> f [b]
traverse (RowParser r -> [Field] -> Ok r
forall a. RowParser a -> [Field] -> Ok a
parseRow RowParser r
parser) [[Field]]
rows of
Errors [SomeException]
err -> SQLError -> IO [r]
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO (Query -> [SomeException] -> SQLError
rowErrorsToSqlError Query
queryText [SomeException]
err)
Ok [r]
ok -> [r] -> IO [r]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [r]
ok
collectRows :: Ptr DuckDBResult -> IO [[Field]]
collectRows :: Ptr DuckDBResult -> IO [[Field]]
collectRows Ptr DuckDBResult
resPtr = do
columns <- Ptr DuckDBResult -> IO [StatementStreamColumn]
collectResultColumns Ptr DuckDBResult
resPtr
collectChunks columns []
where
collectChunks :: [StatementStreamColumn] -> [[[Field]]] -> IO [[Field]]
collectChunks [StatementStreamColumn]
columns [[[Field]]]
acc = do
chunk <- Ptr DuckDBResult -> IO DuckDBDataChunk
c_duckdb_fetch_chunk Ptr DuckDBResult
resPtr
if chunk == nullPtr
then pure (concat (reverse acc))
else do
rows <-
finally
(decodeChunk columns chunk)
(destroyDataChunk chunk)
let acc' = [[[Field]]]
-> ([[Field]] -> [[[Field]]]) -> Maybe [[Field]] -> [[[Field]]]
forall b a. b -> (a -> b) -> Maybe a -> b
maybe [[[Field]]]
acc ([[Field]] -> [[[Field]]] -> [[[Field]]]
forall a. a -> [a] -> [a]
: [[[Field]]]
acc) Maybe [[Field]]
rows
collectChunks columns acc'
decodeChunk :: [StatementStreamColumn] -> DuckDBDataChunk -> IO (Maybe [[Field]])
decodeChunk [StatementStreamColumn]
columns DuckDBDataChunk
chunk = do
rawSize <- DuckDBDataChunk -> IO DuckDBIdx
c_duckdb_data_chunk_get_size DuckDBDataChunk
chunk
let rowCount = DuckDBIdx -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral DuckDBIdx
rawSize :: Int
if rowCount <= 0
then pure Nothing
else
if null columns
then pure (Just (replicate rowCount []))
else do
vectors <- prepareChunkVectors chunk columns
rows <- mapM (buildMaterializedRow columns vectors) [0 .. rowCount - 1]
pure (Just rows)
collectResultColumns :: Ptr DuckDBResult -> IO [StatementStreamColumn]
collectResultColumns :: Ptr DuckDBResult -> IO [StatementStreamColumn]
collectResultColumns Ptr DuckDBResult
resPtr = do
rawCount <- Ptr DuckDBResult -> IO DuckDBIdx
c_duckdb_column_count Ptr DuckDBResult
resPtr
let cc = DuckDBIdx -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral DuckDBIdx
rawCount :: Int
forM [0 .. cc - 1] \Int
columnIndex -> do
namePtr <- Ptr DuckDBResult -> DuckDBIdx -> IO CString
c_duckdb_column_name Ptr DuckDBResult
resPtr (Int -> DuckDBIdx
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
columnIndex)
name <-
if namePtr == nullPtr
then pure (Text.pack ("column" <> show columnIndex))
else Text.pack <$> peekCString namePtr
dtype <- c_duckdb_column_type resPtr (fromIntegral columnIndex)
pure
StatementStreamColumn
{ statementStreamColumnIndex = columnIndex
, statementStreamColumnName = name
, statementStreamColumnType = dtype
}
buildMaterializedRow :: [StatementStreamColumn] -> [StatementStreamChunkVector] -> Int -> IO [Field]
buildMaterializedRow :: [StatementStreamColumn]
-> [StatementStreamChunkVector] -> Int -> IO [Field]
buildMaterializedRow [StatementStreamColumn]
columns [StatementStreamChunkVector]
vectors Int
rowIdx =
(StatementStreamColumn -> StatementStreamChunkVector -> IO Field)
-> [StatementStreamColumn]
-> [StatementStreamChunkVector]
-> IO [Field]
forall (m :: * -> *) a b c.
Applicative m =>
(a -> b -> m c) -> [a] -> [b] -> m [c]
zipWithM (Int
-> StatementStreamColumn -> StatementStreamChunkVector -> IO Field
buildMaterializedField Int
rowIdx) [StatementStreamColumn]
columns [StatementStreamChunkVector]
vectors
buildMaterializedField :: Int -> StatementStreamColumn -> StatementStreamChunkVector -> IO Field
buildMaterializedField :: Int
-> StatementStreamColumn -> StatementStreamChunkVector -> IO Field
buildMaterializedField Int
rowIdx StatementStreamColumn
column StatementStreamChunkVector{DuckDBVector
statementStreamChunkVectorHandle :: StatementStreamChunkVector -> DuckDBVector
statementStreamChunkVectorHandle :: DuckDBVector
statementStreamChunkVectorHandle, Ptr ()
statementStreamChunkVectorData :: StatementStreamChunkVector -> Ptr ()
statementStreamChunkVectorData :: Ptr ()
statementStreamChunkVectorData, Ptr DuckDBIdx
statementStreamChunkVectorValidity :: StatementStreamChunkVector -> Ptr DuckDBIdx
statementStreamChunkVectorValidity :: Ptr DuckDBIdx
statementStreamChunkVectorValidity} = do
value <-
DuckDBType
-> DuckDBVector -> Ptr () -> Ptr DuckDBIdx -> Int -> IO FieldValue
materializeValue
(StatementStreamColumn -> DuckDBType
statementStreamColumnType StatementStreamColumn
column)
DuckDBVector
statementStreamChunkVectorHandle
Ptr ()
statementStreamChunkVectorData
Ptr DuckDBIdx
statementStreamChunkVectorValidity
Int
rowIdx
pure
Field
{ fieldName = statementStreamColumnName column
, fieldIndex = statementStreamColumnIndex column
, fieldValue = value
}
peekError :: Ptr CString -> IO Text
peekError :: Ptr CString -> IO Text
peekError Ptr CString
ptr = do
errPtr <- Ptr CString -> IO CString
forall a. Storable a => Ptr a -> IO a
peek Ptr CString
ptr
if errPtr == nullPtr
then pure (Text.pack "duckdb-simple: failed to open database")
else do
message <- peekCString errPtr
pure (Text.pack message)
maybeFreeErr :: Ptr CString -> IO ()
maybeFreeErr :: Ptr CString -> IO ()
maybeFreeErr Ptr CString
ptr = do
errPtr <- Ptr CString -> IO CString
forall a. Storable a => Ptr a -> IO a
peek Ptr CString
ptr
when (errPtr /= nullPtr) $ c_duckdb_free (castPtr errPtr)