11//! Iceberg Context
22
3- use iceberg:: { Catalog , NamespaceIdent } ;
4- use iceberg_catalog_rest:: { RestCatalog , RestCatalogConfig } ;
53use std:: sync:: Arc ;
4+ use std:: sync:: RwLock ;
5+
6+ use futures:: stream:: StreamExt ;
7+ use iceberg:: { Catalog , NamespaceIdent , TableIdent } ;
8+ use iceberg_catalog_rest:: { RestCatalog , RestCatalogConfig } ;
69use tokio:: sync:: mpsc:: UnboundedSender ;
710use tokio:: sync:: watch:: Receiver ;
8- use std:: sync:: RwLock ;
911use tokio:: task:: JoinHandle ;
10- use tokio_stream:: { wrappers:: WatchStream , StreamExt } ;
12+ use tokio_stream:: wrappers:: WatchStream ;
1113
1214use tanic_core:: config:: ConnectionDetails ;
1315use tanic_core:: message:: { NamespaceDeets , TableDeets } ;
1416use tanic_core:: { Result , TanicError } ;
17+ use tokio:: sync:: mpsc:: { channel, Receiver as MpscReceiver , Sender as MpscSender } ;
18+ use tokio_stream:: wrappers:: ReceiverStream ;
1519
1620use crate :: state:: { TanicAction , TanicAppState , TanicIcebergState } ;
1721
1822type ActionTx = UnboundedSender < TanicAction > ;
1923type IceCtxRef = Arc < RwLock < IcebergContext > > ;
2024
25+ const JOB_STREAM_CONCURRENCY : usize = 1 ;
26+
2127#[ derive( Debug , Default ) ]
2228struct IcebergContext {
2329 connection_details : Option < ConnectionDetails > ,
@@ -40,6 +46,13 @@ pub struct IcebergContextManager {
4046 state_ref : Arc < RwLock < TanicAppState > > ,
4147}
4248
49+ #[ derive( Debug ) ]
50+ enum IcebergTask {
51+ Namespaces ,
52+ TablesForNamespace ( NamespaceDeets ) ,
53+ SummaryForTable ( TableDeets ) ,
54+ }
55+
4356impl IcebergContextManager {
4457 pub fn new ( action_tx : ActionTx , state_ref : Arc < RwLock < TanicAppState > > ) -> Self {
4558 Self {
@@ -52,8 +65,16 @@ impl IcebergContextManager {
5265 pub async fn event_loop ( & self , state_rx : Receiver < ( ) > ) -> Result < ( ) > {
5366 let mut state_stream = WatchStream :: new ( state_rx) ;
5467
55- while state_stream. next ( ) . await . is_some ( ) {
68+ let ( job_queue_tx, job_queue_rx) = channel ( 10 ) ;
69+
70+ tokio:: spawn ( {
71+ let action_tx = self . action_tx . clone ( ) ;
72+ let job_queue_tx = job_queue_tx. clone ( ) ;
73+ let iceberg_ctx = self . iceberg_context . clone ( ) ;
74+ async move { Self :: job_handler ( job_queue_rx, job_queue_tx, action_tx, iceberg_ctx) . await }
75+ } ) ;
5676
77+ while state_stream. next ( ) . await . is_some ( ) {
5778 let new_conn_details = {
5879 let state = self . state_ref . read ( ) . unwrap ( ) ;
5980
@@ -64,36 +85,27 @@ impl IcebergContextManager {
6485 TanicIcebergState :: Exiting => {
6586 break ;
6687 }
67- _ => None
88+ _ => None ,
6889 }
6990 } ;
7091
7192 if let Some ( new_conn_details) = new_conn_details {
72- self . connect_to ( & new_conn_details) . await ?;
73-
74- let namespaces = {
75- self . iceberg_context . read ( ) . unwrap ( ) . namespaces . clone ( )
76- } ;
93+ self . connect_to ( & new_conn_details, job_queue_tx. clone ( ) )
94+ . await ?;
7795
7896 // begin crawl
79- for namespace in namespaces {
80-
81- // spawn a task to start populating the namespaces
82- let action_tx = self . action_tx . clone ( ) ;
83- let ctx = self . iceberg_context . clone ( ) ;
84-
85- // TODO: handle handle, lol
86- let _jh = tokio:: spawn ( async move {
87- Self :: populate_tables ( ctx, action_tx, namespace) . await
88- } ) ;
89- }
97+ let _ = job_queue_tx. send ( IcebergTask :: Namespaces ) . await ;
9098 }
9199 }
92100
93101 Ok ( ( ) )
94102 }
95103
96- async fn connect_to ( & self , new_conn_details : & ConnectionDetails ) -> Result < ( ) > {
104+ async fn connect_to (
105+ & self ,
106+ new_conn_details : & ConnectionDetails ,
107+ _job_queue_tx : MpscSender < IcebergTask > ,
108+ ) -> Result < ( ) > {
97109 {
98110 let ctx = self . iceberg_context . read ( ) . unwrap ( ) ;
99111 if let Some ( ref existing_conn_details) = ctx. connection_details {
@@ -104,31 +116,19 @@ impl IcebergContextManager {
104116 }
105117 }
106118
107- // cancel any in-progress action and connect to the new connection
108119 {
109120 let mut ctx = self . iceberg_context . write ( ) . unwrap ( ) ;
110- // TODO: cancel in-prog action
111- // if let Some(cancellable) = *ctx.deref_mut().cancellable_action {
112- // cancellable.abort();
113- // }
114121 ctx. connect_to ( new_conn_details) ;
115122 }
116123
117- // spawn a task to start populating the namespaces
118- let action_tx = self . action_tx . clone ( ) ;
119- let ctx = self . iceberg_context . clone ( ) ;
120- // TODO: store the join handle for cancellation
121- let _jh = tokio:: spawn ( async move {
122- let res = Self :: populate_namespaces ( ctx, action_tx) . await ;
123- if let Err ( error) = res {
124- tracing:: error!( %error, "Error populating namespaces" ) ;
125- }
126- } ) ;
127-
128124 Ok ( ( ) )
129125 }
130126
131- async fn populate_namespaces ( ctx : IceCtxRef , action_tx : ActionTx ) -> Result < ( ) > {
127+ async fn populate_namespaces (
128+ ctx : IceCtxRef ,
129+ action_tx : ActionTx ,
130+ job_queue_tx : MpscSender < IcebergTask > ,
131+ ) -> Result < ( ) > {
132132 let root_namespaces = {
133133 let catalog = {
134134 let r_ctx = ctx. read ( ) . unwrap ( ) ;
@@ -157,23 +157,30 @@ impl IcebergContextManager {
157157
158158 action_tx
159159 . send ( TanicAction :: UpdateNamespacesList (
160- namespaces. iter ( ) . map ( |ns| {
161- ns. name . clone ( )
162- } ) . collect :: < Vec < _ > > ( )
160+ namespaces
161+ . iter ( )
162+ . map ( |ns| ns. name . clone ( ) )
163+ . collect :: < Vec < _ > > ( ) ,
163164 ) )
164165 . map_err ( |err| TanicError :: UnexpectedError ( err. to_string ( ) ) ) ?;
165166
167+ for namespace in namespaces {
168+ let _ = job_queue_tx
169+ . send ( IcebergTask :: TablesForNamespace ( namespace. clone ( ) ) )
170+ . await ;
171+ }
172+
166173 Ok ( ( ) )
167174 }
168175
169176 async fn populate_tables (
170177 ctx : IceCtxRef ,
171178 action_tx : ActionTx ,
172179 namespace : NamespaceDeets ,
180+ job_queue_tx : MpscSender < IcebergTask > ,
173181 ) -> Result < ( ) > {
174182 let namespace_ident = NamespaceIdent :: from_strs ( namespace. parts . clone ( ) ) ?;
175183 let tables = {
176-
177184 let catalog = {
178185 let r_ctx = ctx. read ( ) . unwrap ( ) ;
179186
@@ -206,10 +213,59 @@ impl IcebergContextManager {
206213 action_tx
207214 . send ( TanicAction :: UpdateNamespaceTableList (
208215 namespace. name . clone ( ) ,
209- tables. iter ( ) . map ( |t|& t. name ) . cloned ( ) . collect ( ) ,
216+ tables. iter ( ) . map ( |t| & t. name ) . cloned ( ) . collect ( ) ,
210217 ) )
211218 . map_err ( TanicError :: unexpected) ?;
212219
220+ for table in tables {
221+ let _ = job_queue_tx
222+ . send ( IcebergTask :: SummaryForTable ( table. clone ( ) ) )
223+ . await ;
224+ }
225+
226+ Ok ( ( ) )
227+ }
228+
229+ async fn populate_table_summary (
230+ ctx : IceCtxRef ,
231+ action_tx : ActionTx ,
232+ table : TableDeets ,
233+ _job_queue_tx : MpscSender < IcebergTask > ,
234+ ) -> Result < ( ) > {
235+ let namespace_ident = NamespaceIdent :: from_strs ( table. namespace . clone ( ) ) ?;
236+ let table_ident = TableIdent :: new ( namespace_ident. clone ( ) , table. name . clone ( ) ) ;
237+
238+ let loaded_table = {
239+ let catalog = {
240+ let r_ctx = ctx. read ( ) . unwrap ( ) ;
241+
242+ let Some ( ref catalog) = r_ctx. catalog else {
243+ return Err ( TanicError :: unexpected (
244+ "Attempted to populate table summary when catalog not initialised" ,
245+ ) ) ;
246+ } ;
247+
248+ catalog. clone ( )
249+ } ;
250+
251+ catalog. load_table ( & table_ident) . await ?
252+ } ;
253+
254+ let summary = loaded_table
255+ . metadata ( )
256+ . current_snapshot ( )
257+ . unwrap ( )
258+ . summary ( ) ;
259+ tracing:: info!( ?summary) ;
260+
261+ action_tx
262+ . send ( TanicAction :: UpdateTableSummary {
263+ namespace : namespace_ident. to_url_string ( ) ,
264+ table_name : table_ident. name . clone ( ) ,
265+ table_summary : summary. additional_properties . clone ( ) ,
266+ } )
267+ . map_err ( TanicError :: unexpected) ?;
268+
213269 Ok ( ( ) )
214270 }
215271}
@@ -229,3 +285,62 @@ impl IcebergContext {
229285 self . tables = vec ! [ ] ;
230286 }
231287}
288+
289+ impl IcebergContextManager {
290+ async fn job_handler (
291+ job_queue_rx : MpscReceiver < IcebergTask > ,
292+ job_queue_tx : MpscSender < IcebergTask > ,
293+ action_tx : ActionTx ,
294+ iceberg_ctx : IceCtxRef ,
295+ ) {
296+ let job_stream = ReceiverStream :: new ( job_queue_rx) ;
297+
298+ // let _ = tokio::spawn(async move {
299+ job_stream
300+ . map ( |task| {
301+ (
302+ task,
303+ iceberg_ctx. clone ( ) ,
304+ action_tx. clone ( ) ,
305+ job_queue_tx. clone ( ) ,
306+ )
307+ } )
308+ . for_each_concurrent (
309+ JOB_STREAM_CONCURRENCY ,
310+ async move |( task, iceberg_ctx, action_tx, job_queue_tx) | {
311+ match task {
312+ IcebergTask :: Namespaces => {
313+ let _ = IcebergContextManager :: populate_namespaces (
314+ iceberg_ctx,
315+ action_tx,
316+ job_queue_tx,
317+ )
318+ . await ;
319+ }
320+
321+ IcebergTask :: TablesForNamespace ( namespace) => {
322+ let _ = IcebergContextManager :: populate_tables (
323+ iceberg_ctx,
324+ action_tx,
325+ namespace,
326+ job_queue_tx,
327+ )
328+ . await ;
329+ }
330+
331+ IcebergTask :: SummaryForTable ( table) => {
332+ let _ = IcebergContextManager :: populate_table_summary (
333+ iceberg_ctx,
334+ action_tx,
335+ table,
336+ job_queue_tx,
337+ )
338+ . await ;
339+ } // _ => {}
340+ }
341+ } ,
342+ )
343+ . await ;
344+ // }).await;
345+ }
346+ }
0 commit comments