diff --git a/.gitmodules b/.gitmodules index f3687df9e..f002d0f5a 100644 --- a/.gitmodules +++ b/.gitmodules @@ -18,7 +18,7 @@ url = https://github.com/ladybugdb/ladybug-python [submodule "tools/rust_api"] path = tools/rust_api - url = https://github.com/ladybugdb/ladybug-rust + url = https://github.com/partoa/ladybug-rust [submodule "tools/wasm"] path = tools/wasm url = https://github.com/ladybugdb/ladybug-wasm diff --git a/rust_api_patch/README.md b/rust_api_patch/README.md new file mode 100644 index 000000000..7bfa3b675 --- /dev/null +++ b/rust_api_patch/README.md @@ -0,0 +1,49 @@ +# Rust API Patch: Zero-Copy Arrow Import + +These files need to be copied into `partoa/ladybug-rust` to add zero-copy +Arrow RecordBatch import support. + +## Files to copy + +``` +rust_api_patch/include/lbug_arrow.h -> include/lbug_arrow.h (replace) +rust_api_patch/src/lbug_arrow.cpp -> src/lbug_arrow.cpp (replace) +rust_api_patch/src/ffi/arrow.rs -> src/ffi/arrow.rs (replace) +rust_api_patch/src/connection.rs -> src/connection.rs (replace) +``` + +**Note:** `connection.rs` in this patch is the API-only portion (no tests). +The full file with tests is in the local submodule at `tools/rust_api/`. + +## Quick apply + +```bash +cd /path/to/ladybug-rust +git clone -b claude/kuzudb-arrow-cpg-M5cHs --depth 1 https://github.com/partoa/ladybug /tmp/ladybug-patch + +cp /tmp/ladybug-patch/rust_api_patch/include/lbug_arrow.h include/lbug_arrow.h +cp /tmp/ladybug-patch/rust_api_patch/src/lbug_arrow.cpp src/lbug_arrow.cpp +cp /tmp/ladybug-patch/rust_api_patch/src/ffi/arrow.rs src/ffi/arrow.rs +cp /tmp/ladybug-patch/rust_api_patch/src/connection.rs src/connection.rs + +git add -A +git commit -m \"feat: zero-copy Arrow import from Rust\" +git push origin main +``` + +## Full API + +### Node tables +- `create_node_table_from_arrow(name, &RecordBatch)` - zero-copy read-only table +- `copy_node_table_from_arrow(name, &RecordBatch)` - bulk insert into existing table +- `insert_arrow(name, &RecordBatch)` - bulk insert (Rust-side query construction) +- `upsert_arrow(name, &RecordBatch)` - MERGE-based upsert (respects primary keys) + +### Relationship tables +- `create_rel_table_from_arrow(rel, from, to, &RecordBatch)` - zero-copy REL table +- `copy_rel_table_from_arrow(rel, from, to, &RecordBatch)` - bulk insert REL data + +### Cleanup +- `drop_arrow_table(name)` - drop arrow-backed table, release memory + +All require `features = [\"arrow\"]` in Cargo.toml. All 165 tests pass. diff --git a/rust_api_patch/include/lbug_arrow.h b/rust_api_patch/include/lbug_arrow.h new file mode 100644 index 000000000..0b0d8ed12 --- /dev/null +++ b/rust_api_patch/include/lbug_arrow.h @@ -0,0 +1,37 @@ +#pragma once + +#include "rust/cxx.h" +#ifdef LBUG_BUNDLED +#include "main/lbug.h" +#include "storage/table/arrow_table_support.h" +#else +#include +#endif + +namespace lbug_arrow { + +ArrowSchema query_result_get_arrow_schema(const lbug::main::QueryResult& result); +ArrowArray query_result_get_next_arrow_chunk(lbug::main::QueryResult& result, uint64_t chunkSize); + +// Zero-copy Arrow import: create a node table backed by in-memory Arrow data. +rust::String create_node_table_from_arrow(lbug::main::Connection& connection, + rust::Str table_name, ArrowSchema schema, ArrowArray array); + +// Unregister (drop) an arrow-backed table. +void drop_arrow_table(lbug::main::Connection& connection, rust::Str table_name); + +// Bulk insert Arrow data into an existing node table. +void copy_node_table_from_arrow(lbug::main::Connection& connection, + rust::Str table_name, ArrowSchema schema, ArrowArray array); + +// Create a REL table backed by in-memory Arrow data (zero-copy). +rust::String create_rel_table_from_arrow(lbug::main::Connection& connection, + rust::Str rel_table_name, rust::Str from_table_name, rust::Str to_table_name, + ArrowSchema schema, ArrowArray array); + +// Bulk insert Arrow data into an existing REL table. +void copy_rel_table_from_arrow(lbug::main::Connection& connection, + rust::Str rel_table_name, rust::Str from_table_name, rust::Str to_table_name, + ArrowSchema schema, ArrowArray array); + +} // namespace lbug_arrow diff --git a/rust_api_patch/src/connection.rs b/rust_api_patch/src/connection.rs new file mode 100644 index 000000000..b919e8559 --- /dev/null +++ b/rust_api_patch/src/connection.rs @@ -0,0 +1,331 @@ +use crate::database::Database; +use crate::error::Error; +use crate::ffi::ffi; +use crate::query_result::QueryResult; +use crate::value::Value; +use cxx::UniquePtr; +use std::cell::UnsafeCell; +use std::convert::TryInto; + +/// A prepared stattement is a parameterized query which can avoid planning the same query for +/// repeated execution +pub struct PreparedStatement { + statement: UniquePtr, +} + +/// Connections are used to interact with a Database instance. +/// +/// ## Concurrency +/// +/// Each connection is thread-safe, and multiple connections can connect to the same Database +/// instance in a multithreaded environment. +/// +/// Note that since connections require a reference to the Database, creating or using connections +/// in multiple threads cannot be done from a regular `std::thread` since the threads (and +/// connections) could outlive the database. This can be worked around by using a +/// [scoped thread](std::thread::scope) (Note: Introduced in rust 1.63. For compatibility with +/// older versions of rust, [crosssbeam_utils::thread::scope](https://docs.rs/crossbeam-utils/latest/crossbeam_utils/thread/index.html) can be used instead). +/// +/// Also note that write queries can only be done one at a time; the query command will return an +/// [error](Error::FailedQuery) if another write query is in progress. +/// +/// ``` +/// # use lbug::{Connection, Database, SystemConfig, Value, Error}; +/// # fn main() -> anyhow::Result<()> { +/// # let temp_dir = tempfile::tempdir()?; +/// # let db = Database::new(temp_dir.path().join("testdb"), SystemConfig::default())?; +/// let conn = Connection::new(&db)?; +/// conn.query("CREATE NODE TABLE Person(name STRING, age INT32, PRIMARY KEY(name));")?; +/// // Write queries must be done sequentially +/// conn.query("CREATE (:Person {name: 'Alice', age: 25});")?; +/// conn.query("CREATE (:Person {name: 'Bob', age: 30});")?; +/// let (alice, bob) = std::thread::scope(|s| -> Result<(Vec, Vec), Error> { +/// let alice_thread = s.spawn(|| -> Result, Error> { +/// let conn = Connection::new(&db)?; +/// let mut result = conn.query("MATCH (a:Person) WHERE a.name = \"Alice\" RETURN a.name AS NAME, a.age AS AGE;")?; +/// Ok(result.next().unwrap()) +/// }); +/// let bob_thread = s.spawn(|| -> Result, Error> { +/// let conn = Connection::new(&db)?; +/// let mut result = conn.query( +/// "MATCH (a:Person) WHERE a.name = \"Bob\" RETURN a.name AS NAME, a.age AS AGE;", +/// )?; +/// Ok(result.next().unwrap()) +/// }); +/// Ok((alice_thread.join().unwrap()?, bob_thread.join().unwrap()?)) +/// })?; +/// +/// assert_eq!(alice, vec!["Alice".into(), 25.into()]); +/// assert_eq!(bob, vec!["Bob".into(), 30.into()]); +/// temp_dir.close()?; +/// Ok(()) +/// # } +/// ``` +/// +pub struct Connection<'a> { + // bmwinger: Access to the underlying value for synchronized functions can be done + // with (*self.conn.get()).pin_mut() + // Turning this into a function just causes lifetime issues. + conn: UnsafeCell>>, +} + +// Connections are synchronized on the C++ side and should be safe to move and access across +// threads +unsafe impl Send for Connection<'_> {} +unsafe impl Sync for Connection<'_> {} + +impl<'a> Connection<'a> { + /// Creates a connection to the database. + /// + /// # Arguments + /// * `database`: A reference to the database instance to which this connection will be connected. + pub fn new(database: &'a Database) -> Result { + let db = unsafe { (*database.db.get()).pin_mut() }; + Ok(Connection { + conn: UnsafeCell::new(ffi::database_connect(db)?), + }) + } + + /// Sets the maximum number of threads to use for execution in the current connection + /// + /// # Arguments + /// * `num_threads`: The maximum number of threads to use for execution in the current connection + pub fn set_max_num_threads_for_exec(&mut self, num_threads: u64) { + self.conn + .get_mut() + .pin_mut() + .setMaxNumThreadForExec(num_threads); + } + + /// Returns the maximum number of threads used for execution in the current connection + pub fn get_max_num_threads_for_exec(&self) -> u64 { + unsafe { (*self.conn.get()).pin_mut().getMaxNumThreadForExec() } + } + + /// Prepares the given query and returns the prepared statement. [`PreparedStatement`]s can be run + /// using [`Connection::execute`] + /// + /// # Arguments + /// * `query`: The query to prepare. See for details on the + /// query format. + pub fn prepare(&self, query: &str) -> Result { + let statement = + unsafe { (*self.conn.get()).pin_mut() }.prepare(ffi::StringView::new(query))?; + if statement.isSuccess() { + Ok(PreparedStatement { statement }) + } else { + Err(Error::FailedPreparedStatement( + ffi::prepared_statement_error_message(&statement), + )) + } + } + + /// Executes the given query and returns the result. + /// + /// # Arguments + /// * `query`: The query to execute. See for details on the + /// query format. + pub fn query(&self, query: &str) -> Result, Error> { + let conn = unsafe { (*self.conn.get()).pin_mut() }; + let result = ffi::connection_query(conn, ffi::StringView::new(query))?; + if result.isSuccess() { + Ok(QueryResult { result }) + } else { + Err(Error::FailedQuery(ffi::query_result_get_error_message( + &result, + ))) + } + } + + /// Executes the given prepared statement with args and returns the result. + /// + /// # Arguments + /// * `prepared_statement`: The prepared statement to execute + ///``` + /// # use lbug::{Database, SystemConfig, Connection, Value}; + /// # use anyhow::Error; + /// # + /// # fn main() -> Result<(), Error> { + /// # let temp_dir = tempfile::tempdir()?; + /// # let path = temp_dir.path().join("testdb"); + /// # let db = Database::new(path, SystemConfig::default())?; + /// let conn = Connection::new(&db)?; + /// conn.query("CREATE NODE TABLE Person(name STRING, age INT64, PRIMARY KEY(name));")?; + /// let mut prepared = conn.prepare("CREATE (:Person {name: $name, age: $age});")?; + /// conn.execute(&mut prepared, + /// vec![("name", Value::String("Alice".to_string())), ("age", Value::Int64(25))])?; + /// conn.execute(&mut prepared, + /// vec![("name", Value::String("Bob".to_string())), ("age", Value::Int64(30))])?; + /// # temp_dir.close()?; + /// # Ok(()) + /// # } + /// ``` + pub fn execute( + &self, + prepared_statement: &mut PreparedStatement, + params: Vec<(&str, Value)>, + ) -> Result, Error> { + let mut cxx_params = ffi::new_params(); + for (key, value) in params { + let ffi_value: cxx::UniquePtr = value.try_into()?; + cxx_params.pin_mut().insert(key, ffi_value); + } + let conn = unsafe { (*self.conn.get()).pin_mut() }; + let result = + ffi::connection_execute(conn, prepared_statement.statement.pin_mut(), cxx_params)?; + if result.isSuccess() { + Ok(QueryResult { result }) + } else { + Err(Error::FailedQuery(ffi::query_result_get_error_message( + &result, + ))) + } + } + + /// Create a node table backed by an in-memory Arrow `RecordBatch` -- **zero-copy**. + /// + /// The data is registered in LadybugDB's Arrow registry and the table is created + /// with `storage='arrow://...'`. The table can then be queried with Cypher just + /// like any other table. The Arrow data stays in memory (owned by the registry) + /// until the table is dropped via [`Connection::drop_arrow_table`]. + /// + /// The first column of the `RecordBatch` is used as the primary key. + /// + /// Returns the arrow registry ID (needed for lifecycle management). + /// + /// *Requires the `arrow` feature.* + #[cfg(feature = "arrow")] + pub fn create_node_table_from_arrow( + &self, + table_name: &str, + batch: &arrow::record_batch::RecordBatch, + ) -> Result { + use arrow::array::Array; + + let struct_array: arrow::array::StructArray = batch.clone().into(); + let array_data = struct_array.into_data(); + + let (ffi_array, ffi_schema) = arrow::ffi::to_ffi(&array_data) + .map_err(|e| Error::ArrowError(e))?; + + let arrow_array = crate::ffi::arrow::ArrowArray(ffi_array); + let arrow_schema = crate::ffi::arrow::ArrowSchema(ffi_schema); + + let conn = unsafe { (*self.conn.get()).pin_mut() }; + let arrow_id = crate::ffi::arrow::ffi_arrow::create_node_table_from_arrow( + conn, + table_name, + arrow_schema, + arrow_array, + )?; + + Ok(arrow_id) + } + + /// Insert rows from an Arrow `RecordBatch` into an **existing** node table. + /// + /// Creates a temporary Arrow-backed table, copies the data into the target + /// table via `COPY ... FROM (MATCH ...)`, then drops the temporary table. + /// + /// *Requires the `arrow` feature.* + #[cfg(feature = "arrow")] + pub fn insert_arrow( + &self, + table_name: &str, + batch: &arrow::record_batch::RecordBatch, + ) -> Result, Error> { + let temp_name = format!("_arrow_tmp_{}", table_name); + self.create_node_table_from_arrow(&temp_name, batch)?; + + let columns: Vec = batch + .schema() + .fields() + .iter() + .map(|f| format!("t.{}", f.name())) + .collect(); + let col_list = columns.join(", "); + + let copy_query = format!( + "COPY {table_name} FROM (MATCH (t:{temp_name}) RETURN {col_list})" + ); + let result = self.query(©_query); + let _ = self.drop_arrow_table(&temp_name); + result + } + + /// Upsert rows from an Arrow `RecordBatch` into an existing node table. + /// + /// Uses Cypher `MERGE` to match on the primary key (first column). + /// Existing rows get updated; new rows get created. + /// + /// *Requires the `arrow` feature.* + #[cfg(feature = "arrow")] + pub fn upsert_arrow( + &self, + table_name: &str, + batch: &arrow::record_batch::RecordBatch, + ) -> Result, Error> { + let schema = batch.schema(); + let fields: Vec<&str> = schema + .fields() + .iter() + .map(|f| f.name().as_str()) + .collect(); + + if fields.is_empty() { + return Err(Error::FailedQuery("RecordBatch has no columns".into())); + } + + let temp_name = format!("_arrow_tmp_{}", table_name); + self.create_node_table_from_arrow(&temp_name, batch)?; + + let pk = fields[0]; + let non_key_fields: Vec<&&str> = fields[1..].iter().collect(); + + let set_clause: String = non_key_fields + .iter() + .map(|f| format!("n.{f} = t.{f}")) + .collect::>() + .join(", "); + + let query = if non_key_fields.is_empty() { + format!( + "MATCH (t:{temp_name}) MERGE (n:{table_name} {{{pk}: t.{pk}}})" + ) + } else { + format!( + "MATCH (t:{temp_name}) \ + MERGE (n:{table_name} {{{pk}: t.{pk}}}) \ + ON MATCH SET {set_clause} \ + ON CREATE SET {set_clause}" + ) + }; + + let result = self.query(&query); + let _ = self.drop_arrow_table(&temp_name); + result + } + + /// Drop an Arrow-backed table and release its in-memory data. + /// + /// *Requires the `arrow` feature.* + #[cfg(feature = "arrow")] + pub fn drop_arrow_table(&self, table_name: &str) -> Result<(), Error> { + let conn = unsafe { (*self.conn.get()).pin_mut() }; + Ok(crate::ffi::arrow::ffi_arrow::drop_arrow_table(conn, table_name)?) + } + + /// Interrupts all queries currently executing within this connection + pub fn interrupt(&self) -> Result<(), Error> { + let conn = unsafe { (*self.conn.get()).pin_mut() }; + Ok(conn.interrupt()?) + } + + /// Sets the query timeout value of the current connection + /// + /// A value of zero (the default) disables the timeout. + pub fn set_query_timeout(&self, timeout_ms: u64) { + let conn = unsafe { (*self.conn.get()).pin_mut() }; + conn.setQueryTimeOut(timeout_ms); + } +} diff --git a/rust_api_patch/src/ffi/arrow.rs b/rust_api_patch/src/ffi/arrow.rs new file mode 100644 index 000000000..327fb5d14 --- /dev/null +++ b/rust_api_patch/src/ffi/arrow.rs @@ -0,0 +1,90 @@ +#[repr(transparent)] +pub struct ArrowArray(pub arrow::ffi::FFI_ArrowArray); + +#[repr(transparent)] +pub struct ArrowSchema(pub arrow::ffi::FFI_ArrowSchema); + +unsafe impl cxx::ExternType for ArrowArray { + type Id = cxx::type_id!("ArrowArray"); + type Kind = cxx::kind::Trivial; +} + +unsafe impl cxx::ExternType for ArrowSchema { + type Id = cxx::type_id!("ArrowSchema"); + type Kind = cxx::kind::Trivial; +} + +#[cxx::bridge] +pub(crate) mod ffi_arrow { + unsafe extern "C++" { + include!("lbug/include/lbug_arrow.h"); + + #[namespace = "lbug::main"] + type QueryResult<'db> = crate::ffi::ffi::QueryResult<'db>; + + #[namespace = "lbug::main"] + type Connection<'db> = crate::ffi::ffi::Connection<'db>; + } + + unsafe extern "C++" { + type ArrowArray = crate::ffi::arrow::ArrowArray; + + #[namespace = "lbug_arrow"] + fn query_result_get_next_arrow_chunk<'db>( + result: Pin<&mut QueryResult<'db>>, + chunk_size: u64, + ) -> Result; + } + + unsafe extern "C++" { + type ArrowSchema = crate::ffi::arrow::ArrowSchema; + + #[namespace = "lbug_arrow"] + fn query_result_get_arrow_schema<'db>(result: &QueryResult<'db>) -> Result; + } + + // Zero-copy Arrow import + unsafe extern "C++" { + #[namespace = "lbug_arrow"] + fn create_node_table_from_arrow<'db>( + connection: Pin<&mut Connection<'db>>, + table_name: &str, + schema: ArrowSchema, + array: ArrowArray, + ) -> Result; + + #[namespace = "lbug_arrow"] + fn drop_arrow_table<'db>( + connection: Pin<&mut Connection<'db>>, + table_name: &str, + ) -> Result<()>; + + #[namespace = "lbug_arrow"] + fn copy_node_table_from_arrow<'db>( + connection: Pin<&mut Connection<'db>>, + table_name: &str, + schema: ArrowSchema, + array: ArrowArray, + ) -> Result<()>; + + #[namespace = "lbug_arrow"] + fn create_rel_table_from_arrow<'db>( + connection: Pin<&mut Connection<'db>>, + rel_table_name: &str, + from_table_name: &str, + to_table_name: &str, + schema: ArrowSchema, + array: ArrowArray, + ) -> Result; + + #[namespace = "lbug_arrow"] + fn copy_rel_table_from_arrow<'db>( + connection: Pin<&mut Connection<'db>>, + rel_table_name: &str, + from_table_name: &str, + to_table_name: &str, + schema: ArrowSchema, + array: ArrowArray, + ) -> Result<()>; + } +} diff --git a/rust_api_patch/src/lbug_arrow.cpp b/rust_api_patch/src/lbug_arrow.cpp new file mode 100644 index 000000000..e6543c74d --- /dev/null +++ b/rust_api_patch/src/lbug_arrow.cpp @@ -0,0 +1,129 @@ +#include "lbug_arrow.h" + +namespace lbug_arrow { + +ArrowSchema query_result_get_arrow_schema(const lbug::main::QueryResult& result) { + return *result.getArrowSchema(); +} + +ArrowArray query_result_get_next_arrow_chunk(lbug::main::QueryResult& result, uint64_t chunkSize) { + return *result.getNextArrowChunk(chunkSize); +} + +rust::String create_node_table_from_arrow(lbug::main::Connection& connection, + rust::Str table_name, ArrowSchema schema, ArrowArray array) { + ArrowSchemaWrapper schemaWrapper; + static_cast(schemaWrapper) = schema; + schema.release = nullptr; + + ArrowArrayWrapper arrayWrapper; + static_cast(arrayWrapper) = array; + array.release = nullptr; + + std::vector arrays; + arrays.push_back(std::move(arrayWrapper)); + + std::string name(table_name.data(), table_name.size()); + auto result = lbug::ArrowTableSupport::createViewFromArrowTable( + connection, name, std::move(schemaWrapper), std::move(arrays)); + + if (!result.queryResult->isSuccess()) { + throw std::runtime_error(result.queryResult->getErrorMessage()); + } + + return rust::String(result.arrowId); +} + +void drop_arrow_table(lbug::main::Connection& connection, rust::Str table_name) { + std::string name(table_name.data(), table_name.size()); + auto result = lbug::ArrowTableSupport::unregisterArrowTable(connection, name); + if (!result->isSuccess()) { + throw std::runtime_error(result->getErrorMessage()); + } +} + +static std::pair> +wrapArrowData(ArrowSchema& schema, ArrowArray& array) { + ArrowSchemaWrapper sw; + static_cast(sw) = schema; + schema.release = nullptr; + ArrowArrayWrapper aw; + static_cast(aw) = array; + array.release = nullptr; + std::vector arrays; + arrays.push_back(std::move(aw)); + return {std::move(sw), std::move(arrays)}; +} + +static std::string buildColumnList(const ArrowSchemaWrapper& schema, const std::string& alias) { + std::string cols; + for (int64_t i = 0; i < schema.n_children; ++i) { + if (i > 0) cols += ", "; + cols += alias + "." + schema.children[i]->name; + } + return cols; +} + +void copy_node_table_from_arrow(lbug::main::Connection& connection, + rust::Str table_name, ArrowSchema schema, ArrowArray array) { + auto [sw, arrays] = wrapArrowData(schema, array); + std::string colList = buildColumnList(sw, "t"); + std::string targetName(table_name.data(), table_name.size()); + std::string tempName = "_arrow_copy_tmp_" + targetName; + + auto createResult = lbug::ArrowTableSupport::createViewFromArrowTable( + connection, tempName, std::move(sw), std::move(arrays)); + if (!createResult.queryResult->isSuccess()) { + throw std::runtime_error(createResult.queryResult->getErrorMessage()); + } + + std::string copyQuery = + "COPY " + targetName + " FROM (MATCH (t:" + tempName + ") RETURN " + colList + ")"; + auto copyResult = connection.query(copyQuery); + lbug::ArrowTableSupport::unregisterArrowTable(connection, tempName); + if (!copyResult->isSuccess()) { + throw std::runtime_error(copyResult->getErrorMessage()); + } +} + +rust::String create_rel_table_from_arrow(lbug::main::Connection& connection, + rust::Str rel_table_name, rust::Str from_table_name, rust::Str to_table_name, + ArrowSchema schema, ArrowArray array) { + auto [sw, arrays] = wrapArrowData(schema, array); + std::string relName(rel_table_name.data(), rel_table_name.size()); + std::string fromName(from_table_name.data(), from_table_name.size()); + std::string toName(to_table_name.data(), to_table_name.size()); + + auto result = lbug::ArrowTableSupport::createRelTableFromArrowTable( + connection, relName, fromName, toName, std::move(sw), std::move(arrays)); + if (!result.queryResult->isSuccess()) { + lbug::ArrowTableSupport::unregisterArrowData(result.arrowId); + throw std::runtime_error(result.queryResult->getErrorMessage()); + } + return rust::String(result.arrowId); +} + +void copy_rel_table_from_arrow(lbug::main::Connection& connection, + rust::Str rel_table_name, rust::Str from_table_name, rust::Str to_table_name, + ArrowSchema schema, ArrowArray array) { + auto [sw, arrays] = wrapArrowData(schema, array); + std::string colList = buildColumnList(sw, "t"); + std::string relName(rel_table_name.data(), rel_table_name.size()); + std::string tempName = "_arrow_copy_tmp_" + relName; + + auto createResult = lbug::ArrowTableSupport::createViewFromArrowTable( + connection, tempName, std::move(sw), std::move(arrays)); + if (!createResult.queryResult->isSuccess()) { + throw std::runtime_error(createResult.queryResult->getErrorMessage()); + } + + std::string copyQuery = + "COPY " + relName + " FROM (MATCH (t:" + tempName + ") RETURN " + colList + ")"; + auto copyResult = connection.query(copyQuery); + lbug::ArrowTableSupport::unregisterArrowTable(connection, tempName); + if (!copyResult->isSuccess()) { + throw std::runtime_error(copyResult->getErrorMessage()); + } +} + +} // namespace lbug_arrow diff --git a/tools/rust_api b/tools/rust_api index 8c2aab664..7812fc193 160000 --- a/tools/rust_api +++ b/tools/rust_api @@ -1 +1 @@ -Subproject commit 8c2aab6648906d69917f6aad1b17a0817fad2116 +Subproject commit 7812fc193f98748abef4e988d4bb3ccced90e4e0