openzeppelin_relayer/repositories/transaction/
mod.rs

1//! Transaction Repository Module
2//!
3//! This module provides the transaction repository layer for the OpenZeppelin Relayer service.
4//! It implements the Repository pattern to abstract transaction data persistence operations,
5//! supporting both in-memory and Redis-backed storage implementations.
6//!
7//! ## Features
8//!
9//! - **CRUD Operations**: Create, read, update, and delete transactions
10//! - **Specialized Queries**: Find transactions by relayer ID, status, and nonce
11//! - **Pagination Support**: Efficient paginated listing of transactions
12//! - **Status Management**: Update transaction status and timestamps
13//! - **Partial Updates**: Support for partial transaction updates
14//! - **Network Data**: Manage transaction network-specific data
15//!
16//! ## Repository Implementations
17//!
18//! - [`InMemoryTransactionRepository`]: Fast in-memory storage for testing/development
19//! - [`RedisTransactionRepository`]: Redis-backed storage for production environments
20//!
21mod transaction_in_memory;
22mod transaction_redis;
23
24use deadpool_redis::Pool;
25pub use transaction_in_memory::*;
26pub use transaction_redis::*;
27
28use crate::{
29    models::{
30        NetworkTransactionData, TransactionRepoModel, TransactionStatus, TransactionUpdateRequest,
31    },
32    repositories::{BatchDeleteResult, TransactionDeleteRequest, *},
33    utils::RedisConnections,
34};
35use async_trait::async_trait;
36use eyre::Result;
37use std::sync::Arc;
38
39/// A trait defining transaction repository operations
40#[async_trait]
41pub trait TransactionRepository: Repository<TransactionRepoModel, String> {
42    /// Find transactions by relayer ID with pagination
43    async fn find_by_relayer_id(
44        &self,
45        relayer_id: &str,
46        query: PaginationQuery,
47    ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError>;
48
49    /// Find transactions by relayer ID and status(es).
50    ///
51    /// Results are sorted by created_at descending (newest first).
52    async fn find_by_status(
53        &self,
54        relayer_id: &str,
55        statuses: &[TransactionStatus],
56    ) -> Result<Vec<TransactionRepoModel>, RepositoryError>;
57
58    /// Find transactions by relayer ID and status(es) with pagination.
59    ///
60    /// Results are sorted by timestamp:
61    /// - For Confirmed transactions: sorted by confirmed_at (on-chain confirmation order)
62    /// - For all other statuses: sorted by created_at (queue/processing order)
63    ///
64    /// The `oldest_first` parameter controls sort direction:
65    /// - `false` (default): newest first (descending) - for displaying recent transactions
66    /// - `true`: oldest first (ascending) - for FIFO queue processing
67    ///
68    /// For multi-status queries, transactions are merged and sorted using the same rules,
69    /// ensuring consistent ordering across different statuses.
70    async fn find_by_status_paginated(
71        &self,
72        relayer_id: &str,
73        statuses: &[TransactionStatus],
74        query: PaginationQuery,
75        oldest_first: bool,
76    ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError>;
77
78    /// Find a transaction by relayer ID and nonce
79    async fn find_by_nonce(
80        &self,
81        relayer_id: &str,
82        nonce: u64,
83    ) -> Result<Option<TransactionRepoModel>, RepositoryError>;
84
85    /// Update the status of a transaction
86    async fn update_status(
87        &self,
88        tx_id: String,
89        status: TransactionStatus,
90    ) -> Result<TransactionRepoModel, RepositoryError>;
91
92    /// Partially update a transaction
93    async fn partial_update(
94        &self,
95        tx_id: String,
96        update: TransactionUpdateRequest,
97    ) -> Result<TransactionRepoModel, RepositoryError>;
98
99    /// Update the network data of a transaction
100    async fn update_network_data(
101        &self,
102        tx_id: String,
103        network_data: NetworkTransactionData,
104    ) -> Result<TransactionRepoModel, RepositoryError>;
105
106    /// Set the sent_at timestamp of a transaction
107    async fn set_sent_at(
108        &self,
109        tx_id: String,
110        sent_at: String,
111    ) -> Result<TransactionRepoModel, RepositoryError>;
112
113    /// Set the confirmed_at timestamp of a transaction
114    async fn set_confirmed_at(
115        &self,
116        tx_id: String,
117        confirmed_at: String,
118    ) -> Result<TransactionRepoModel, RepositoryError>;
119
120    /// Count transactions by status(es) without fetching full transaction data.
121    /// This is an optimized O(1) operation in Redis using ZCARD.
122    async fn count_by_status(
123        &self,
124        relayer_id: &str,
125        statuses: &[TransactionStatus],
126    ) -> Result<u64, RepositoryError>;
127
128    /// Delete multiple transactions by their IDs in a single batch operation.
129    ///
130    /// This is more efficient than calling `delete_by_id` multiple times as it
131    /// reduces the number of round-trips to the storage backend.
132    ///
133    /// Note: This method requires fetching transaction data first to clean up indexes.
134    /// If you already have transaction data, use `delete_by_requests` instead for
135    /// better performance.
136    ///
137    /// # Arguments
138    /// * `ids` - List of transaction IDs to delete
139    ///
140    /// # Returns
141    /// * `BatchDeleteResult` containing the count of successful deletions and any failures
142    async fn delete_by_ids(&self, ids: Vec<String>) -> Result<BatchDeleteResult, RepositoryError>;
143
144    /// Delete multiple transactions using pre-extracted data.
145    ///
146    /// This is the most efficient batch delete method as it doesn't require
147    /// re-fetching transaction data. Use this when you already have the transaction
148    /// data (e.g., from a previous query).
149    ///
150    /// # Arguments
151    /// * `requests` - List of delete requests containing transaction data needed for cleanup
152    ///
153    /// # Returns
154    /// * `BatchDeleteResult` containing the count of successful deletions and any failures
155    async fn delete_by_requests(
156        &self,
157        requests: Vec<TransactionDeleteRequest>,
158    ) -> Result<BatchDeleteResult, RepositoryError>;
159}
160
161#[cfg(test)]
162mockall::mock! {
163  pub TransactionRepository {}
164
165  #[async_trait]
166  impl Repository<TransactionRepoModel, String> for TransactionRepository {
167      async fn create(&self, entity: TransactionRepoModel) -> Result<TransactionRepoModel, RepositoryError>;
168      async fn get_by_id(&self, id: String) -> Result<TransactionRepoModel, RepositoryError>;
169      async fn list_all(&self) -> Result<Vec<TransactionRepoModel>, RepositoryError>;
170      async fn list_paginated(&self, query: PaginationQuery) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError>;
171      async fn update(&self, id: String, entity: TransactionRepoModel) -> Result<TransactionRepoModel, RepositoryError>;
172      async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError>;
173      async fn count(&self) -> Result<usize, RepositoryError>;
174      async fn has_entries(&self) -> Result<bool, RepositoryError>;
175      async fn drop_all_entries(&self) -> Result<(), RepositoryError>;
176  }
177
178  #[async_trait]
179  impl TransactionRepository for TransactionRepository {
180      async fn find_by_relayer_id(&self, relayer_id: &str, query: PaginationQuery) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError>;
181      async fn find_by_status(&self, relayer_id: &str, statuses: &[TransactionStatus]) -> Result<Vec<TransactionRepoModel>, RepositoryError>;
182      async fn find_by_status_paginated(&self, relayer_id: &str, statuses: &[TransactionStatus], query: PaginationQuery, oldest_first: bool) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError>;
183      async fn find_by_nonce(&self, relayer_id: &str, nonce: u64) -> Result<Option<TransactionRepoModel>, RepositoryError>;
184      async fn update_status(&self, tx_id: String, status: TransactionStatus) -> Result<TransactionRepoModel, RepositoryError>;
185      async fn partial_update(&self, tx_id: String, update: TransactionUpdateRequest) -> Result<TransactionRepoModel, RepositoryError>;
186      async fn update_network_data(&self, tx_id: String, network_data: NetworkTransactionData) -> Result<TransactionRepoModel, RepositoryError>;
187      async fn set_sent_at(&self, tx_id: String, sent_at: String) -> Result<TransactionRepoModel, RepositoryError>;
188      async fn set_confirmed_at(&self, tx_id: String, confirmed_at: String) -> Result<TransactionRepoModel, RepositoryError>;
189      async fn count_by_status(&self, relayer_id: &str, statuses: &[TransactionStatus]) -> Result<u64, RepositoryError>;
190      async fn delete_by_ids(&self, ids: Vec<String>) -> Result<BatchDeleteResult, RepositoryError>;
191      async fn delete_by_requests(&self, requests: Vec<TransactionDeleteRequest>) -> Result<BatchDeleteResult, RepositoryError>;
192  }
193}
194
195/// Enum wrapper for different transaction repository implementations
196#[derive(Debug, Clone)]
197pub enum TransactionRepositoryStorage {
198    InMemory(InMemoryTransactionRepository),
199    Redis(RedisTransactionRepository),
200}
201
202impl TransactionRepositoryStorage {
203    pub fn new_in_memory() -> Self {
204        Self::InMemory(InMemoryTransactionRepository::new())
205    }
206    pub fn new_redis(
207        connections: Arc<RedisConnections>,
208        key_prefix: String,
209    ) -> Result<Self, RepositoryError> {
210        Ok(Self::Redis(RedisTransactionRepository::new(
211            connections,
212            key_prefix,
213        )?))
214    }
215
216    /// Returns the underlying primary connection pool and key prefix if this is a persistent storage backend.
217    ///
218    /// This is useful for operations that need direct storage access, such as
219    /// distributed locking. The key prefix is used to namespace keys for multi-tenant
220    /// deployments. Currently supports Redis, but the design allows for future backends.
221    ///
222    /// # Returns
223    /// * `Some((pool, prefix))` - If using persistent storage (e.g., Redis) - returns primary pool
224    /// * `None` - If using in-memory storage
225    pub fn connection_info(&self) -> Option<(Arc<Pool>, &str)> {
226        match self {
227            TransactionRepositoryStorage::InMemory(_) => None,
228            TransactionRepositoryStorage::Redis(repo) => {
229                Some((repo.connections.primary().clone(), &repo.key_prefix))
230            }
231        }
232    }
233}
234
235#[async_trait]
236impl TransactionRepository for TransactionRepositoryStorage {
237    async fn find_by_relayer_id(
238        &self,
239        relayer_id: &str,
240        query: PaginationQuery,
241    ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
242        match self {
243            TransactionRepositoryStorage::InMemory(repo) => {
244                repo.find_by_relayer_id(relayer_id, query).await
245            }
246            TransactionRepositoryStorage::Redis(repo) => {
247                repo.find_by_relayer_id(relayer_id, query).await
248            }
249        }
250    }
251
252    async fn find_by_status(
253        &self,
254        relayer_id: &str,
255        statuses: &[TransactionStatus],
256    ) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
257        match self {
258            TransactionRepositoryStorage::InMemory(repo) => {
259                repo.find_by_status(relayer_id, statuses).await
260            }
261            TransactionRepositoryStorage::Redis(repo) => {
262                repo.find_by_status(relayer_id, statuses).await
263            }
264        }
265    }
266
267    async fn find_by_status_paginated(
268        &self,
269        relayer_id: &str,
270        statuses: &[TransactionStatus],
271        query: PaginationQuery,
272        oldest_first: bool,
273    ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
274        match self {
275            TransactionRepositoryStorage::InMemory(repo) => {
276                repo.find_by_status_paginated(relayer_id, statuses, query, oldest_first)
277                    .await
278            }
279            TransactionRepositoryStorage::Redis(repo) => {
280                repo.find_by_status_paginated(relayer_id, statuses, query, oldest_first)
281                    .await
282            }
283        }
284    }
285
286    async fn find_by_nonce(
287        &self,
288        relayer_id: &str,
289        nonce: u64,
290    ) -> Result<Option<TransactionRepoModel>, RepositoryError> {
291        match self {
292            TransactionRepositoryStorage::InMemory(repo) => {
293                repo.find_by_nonce(relayer_id, nonce).await
294            }
295            TransactionRepositoryStorage::Redis(repo) => {
296                repo.find_by_nonce(relayer_id, nonce).await
297            }
298        }
299    }
300
301    async fn update_status(
302        &self,
303        tx_id: String,
304        status: TransactionStatus,
305    ) -> Result<TransactionRepoModel, RepositoryError> {
306        match self {
307            TransactionRepositoryStorage::InMemory(repo) => repo.update_status(tx_id, status).await,
308            TransactionRepositoryStorage::Redis(repo) => repo.update_status(tx_id, status).await,
309        }
310    }
311
312    async fn partial_update(
313        &self,
314        tx_id: String,
315        update: TransactionUpdateRequest,
316    ) -> Result<TransactionRepoModel, RepositoryError> {
317        match self {
318            TransactionRepositoryStorage::InMemory(repo) => {
319                repo.partial_update(tx_id, update).await
320            }
321            TransactionRepositoryStorage::Redis(repo) => repo.partial_update(tx_id, update).await,
322        }
323    }
324
325    async fn update_network_data(
326        &self,
327        tx_id: String,
328        network_data: NetworkTransactionData,
329    ) -> Result<TransactionRepoModel, RepositoryError> {
330        match self {
331            TransactionRepositoryStorage::InMemory(repo) => {
332                repo.update_network_data(tx_id, network_data).await
333            }
334            TransactionRepositoryStorage::Redis(repo) => {
335                repo.update_network_data(tx_id, network_data).await
336            }
337        }
338    }
339
340    async fn set_sent_at(
341        &self,
342        tx_id: String,
343        sent_at: String,
344    ) -> Result<TransactionRepoModel, RepositoryError> {
345        match self {
346            TransactionRepositoryStorage::InMemory(repo) => repo.set_sent_at(tx_id, sent_at).await,
347            TransactionRepositoryStorage::Redis(repo) => repo.set_sent_at(tx_id, sent_at).await,
348        }
349    }
350
351    async fn set_confirmed_at(
352        &self,
353        tx_id: String,
354        confirmed_at: String,
355    ) -> Result<TransactionRepoModel, RepositoryError> {
356        match self {
357            TransactionRepositoryStorage::InMemory(repo) => {
358                repo.set_confirmed_at(tx_id, confirmed_at).await
359            }
360            TransactionRepositoryStorage::Redis(repo) => {
361                repo.set_confirmed_at(tx_id, confirmed_at).await
362            }
363        }
364    }
365
366    async fn count_by_status(
367        &self,
368        relayer_id: &str,
369        statuses: &[TransactionStatus],
370    ) -> Result<u64, RepositoryError> {
371        match self {
372            TransactionRepositoryStorage::InMemory(repo) => {
373                repo.count_by_status(relayer_id, statuses).await
374            }
375            TransactionRepositoryStorage::Redis(repo) => {
376                repo.count_by_status(relayer_id, statuses).await
377            }
378        }
379    }
380
381    async fn delete_by_ids(&self, ids: Vec<String>) -> Result<BatchDeleteResult, RepositoryError> {
382        match self {
383            TransactionRepositoryStorage::InMemory(repo) => repo.delete_by_ids(ids).await,
384            TransactionRepositoryStorage::Redis(repo) => repo.delete_by_ids(ids).await,
385        }
386    }
387
388    async fn delete_by_requests(
389        &self,
390        requests: Vec<TransactionDeleteRequest>,
391    ) -> Result<BatchDeleteResult, RepositoryError> {
392        match self {
393            TransactionRepositoryStorage::InMemory(repo) => repo.delete_by_requests(requests).await,
394            TransactionRepositoryStorage::Redis(repo) => repo.delete_by_requests(requests).await,
395        }
396    }
397}
398
399#[async_trait]
400impl Repository<TransactionRepoModel, String> for TransactionRepositoryStorage {
401    async fn create(
402        &self,
403        entity: TransactionRepoModel,
404    ) -> Result<TransactionRepoModel, RepositoryError> {
405        match self {
406            TransactionRepositoryStorage::InMemory(repo) => repo.create(entity).await,
407            TransactionRepositoryStorage::Redis(repo) => repo.create(entity).await,
408        }
409    }
410
411    async fn get_by_id(&self, id: String) -> Result<TransactionRepoModel, RepositoryError> {
412        match self {
413            TransactionRepositoryStorage::InMemory(repo) => repo.get_by_id(id).await,
414            TransactionRepositoryStorage::Redis(repo) => repo.get_by_id(id).await,
415        }
416    }
417
418    async fn list_all(&self) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
419        match self {
420            TransactionRepositoryStorage::InMemory(repo) => repo.list_all().await,
421            TransactionRepositoryStorage::Redis(repo) => repo.list_all().await,
422        }
423    }
424
425    async fn list_paginated(
426        &self,
427        query: PaginationQuery,
428    ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
429        match self {
430            TransactionRepositoryStorage::InMemory(repo) => repo.list_paginated(query).await,
431            TransactionRepositoryStorage::Redis(repo) => repo.list_paginated(query).await,
432        }
433    }
434
435    async fn update(
436        &self,
437        id: String,
438        entity: TransactionRepoModel,
439    ) -> Result<TransactionRepoModel, RepositoryError> {
440        match self {
441            TransactionRepositoryStorage::InMemory(repo) => repo.update(id, entity).await,
442            TransactionRepositoryStorage::Redis(repo) => repo.update(id, entity).await,
443        }
444    }
445
446    async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError> {
447        match self {
448            TransactionRepositoryStorage::InMemory(repo) => repo.delete_by_id(id).await,
449            TransactionRepositoryStorage::Redis(repo) => repo.delete_by_id(id).await,
450        }
451    }
452
453    async fn count(&self) -> Result<usize, RepositoryError> {
454        match self {
455            TransactionRepositoryStorage::InMemory(repo) => repo.count().await,
456            TransactionRepositoryStorage::Redis(repo) => repo.count().await,
457        }
458    }
459
460    async fn has_entries(&self) -> Result<bool, RepositoryError> {
461        match self {
462            TransactionRepositoryStorage::InMemory(repo) => repo.has_entries().await,
463            TransactionRepositoryStorage::Redis(repo) => repo.has_entries().await,
464        }
465    }
466
467    async fn drop_all_entries(&self) -> Result<(), RepositoryError> {
468        match self {
469            TransactionRepositoryStorage::InMemory(repo) => repo.drop_all_entries().await,
470            TransactionRepositoryStorage::Redis(repo) => repo.drop_all_entries().await,
471        }
472    }
473}
474
475#[cfg(test)]
476mod tests {
477    use chrono::Utc;
478    use color_eyre::Result;
479    use deadpool_redis::{Config, Runtime};
480
481    use super::*;
482    use crate::models::{
483        EvmTransactionData, NetworkTransactionData, TransactionStatus, TransactionUpdateRequest,
484    };
485    use crate::repositories::PaginationQuery;
486    use crate::utils::mocks::mockutils::create_mock_transaction;
487
488    fn create_test_transaction(id: &str, relayer_id: &str) -> TransactionRepoModel {
489        let mut transaction = create_mock_transaction();
490        transaction.id = id.to_string();
491        transaction.relayer_id = relayer_id.to_string();
492        transaction
493    }
494
495    fn create_test_transaction_with_status(
496        id: &str,
497        relayer_id: &str,
498        status: TransactionStatus,
499    ) -> TransactionRepoModel {
500        let mut transaction = create_test_transaction(id, relayer_id);
501        transaction.status = status;
502        transaction
503    }
504
505    fn create_test_transaction_with_nonce(
506        id: &str,
507        relayer_id: &str,
508        nonce: u64,
509    ) -> TransactionRepoModel {
510        let mut transaction = create_test_transaction(id, relayer_id);
511        if let NetworkTransactionData::Evm(ref mut evm_data) = transaction.network_data {
512            evm_data.nonce = Some(nonce);
513        }
514        transaction
515    }
516
517    fn create_test_update_request() -> TransactionUpdateRequest {
518        TransactionUpdateRequest {
519            status: Some(TransactionStatus::Sent),
520            status_reason: Some("Test reason".to_string()),
521            sent_at: Some(Utc::now().to_string()),
522            confirmed_at: None,
523            network_data: None,
524            priced_at: None,
525            hashes: Some(vec!["test_hash".to_string()]),
526            noop_count: None,
527            is_canceled: None,
528            delete_at: None,
529        }
530    }
531
532    #[tokio::test]
533    async fn test_new_in_memory() {
534        let storage = TransactionRepositoryStorage::new_in_memory();
535
536        match storage {
537            TransactionRepositoryStorage::InMemory(_) => {
538                // Success - verify it's the InMemory variant
539            }
540            TransactionRepositoryStorage::Redis(_) => {
541                panic!("Expected InMemory variant, got Redis");
542            }
543        }
544    }
545
546    #[tokio::test]
547    async fn test_connection_info_returns_none_for_in_memory() {
548        let storage = TransactionRepositoryStorage::new_in_memory();
549
550        // In-memory storage should return None for connection_info
551        assert!(storage.connection_info().is_none());
552    }
553
554    #[tokio::test]
555    #[ignore = "Requires active Redis instance"]
556    async fn test_connection_info_returns_some_for_redis() -> Result<()> {
557        let redis_url = std::env::var("REDIS_TEST_URL")
558            .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
559        let cfg = Config::from_url(&redis_url);
560        let pool = Arc::new(
561            cfg.builder()
562                .map_err(|e| eyre::eyre!("Failed to create Redis pool builder: {}", e))?
563                .max_size(16)
564                .runtime(Runtime::Tokio1)
565                .build()
566                .map_err(|e| eyre::eyre!("Failed to build Redis pool: {}", e))?,
567        );
568        let connections = Arc::new(RedisConnections::new_single_pool(pool.clone()));
569        let key_prefix = "test_prefix".to_string();
570
571        let storage = TransactionRepositoryStorage::new_redis(connections, key_prefix.clone())?;
572
573        let (returned_connection, returned_prefix) = storage
574            .connection_info()
575            .expect("Expected Redis connection info");
576
577        assert!(Arc::ptr_eq(&pool, &returned_connection));
578        assert_eq!(returned_prefix, key_prefix);
579
580        Ok(())
581    }
582
583    #[tokio::test]
584    async fn test_create_in_memory() -> Result<()> {
585        let storage = TransactionRepositoryStorage::new_in_memory();
586        let transaction = create_test_transaction("test-tx", "test-relayer");
587
588        let created = storage.create(transaction.clone()).await?;
589        assert_eq!(created.id, transaction.id);
590        assert_eq!(created.relayer_id, transaction.relayer_id);
591        assert_eq!(created.status, transaction.status);
592
593        Ok(())
594    }
595
596    #[tokio::test]
597    async fn test_get_by_id_in_memory() -> Result<()> {
598        let storage = TransactionRepositoryStorage::new_in_memory();
599        let transaction = create_test_transaction("test-tx", "test-relayer");
600
601        // Create transaction first
602        storage.create(transaction.clone()).await?;
603
604        // Get by ID
605        let retrieved = storage.get_by_id("test-tx".to_string()).await?;
606        assert_eq!(retrieved.id, transaction.id);
607        assert_eq!(retrieved.relayer_id, transaction.relayer_id);
608        assert_eq!(retrieved.status, transaction.status);
609
610        Ok(())
611    }
612
613    #[tokio::test]
614    async fn test_get_by_id_not_found_in_memory() -> Result<()> {
615        let storage = TransactionRepositoryStorage::new_in_memory();
616
617        let result = storage.get_by_id("non-existent".to_string()).await;
618        assert!(result.is_err());
619
620        Ok(())
621    }
622
623    #[tokio::test]
624    async fn test_list_all_in_memory() -> Result<()> {
625        let storage = TransactionRepositoryStorage::new_in_memory();
626
627        // Initially empty
628        let transactions = storage.list_all().await?;
629        assert!(transactions.is_empty());
630
631        // Add transactions
632        let tx1 = create_test_transaction("tx-1", "relayer-1");
633        let tx2 = create_test_transaction("tx-2", "relayer-2");
634
635        storage.create(tx1.clone()).await?;
636        storage.create(tx2.clone()).await?;
637
638        let all_transactions = storage.list_all().await?;
639        assert_eq!(all_transactions.len(), 2);
640
641        let ids: Vec<&str> = all_transactions.iter().map(|t| t.id.as_str()).collect();
642        assert!(ids.contains(&"tx-1"));
643        assert!(ids.contains(&"tx-2"));
644
645        Ok(())
646    }
647
648    #[tokio::test]
649    async fn test_list_paginated_in_memory() -> Result<()> {
650        let storage = TransactionRepositoryStorage::new_in_memory();
651
652        // Add test transactions
653        for i in 1..=5 {
654            let tx = create_test_transaction(&format!("tx-{}", i), "test-relayer");
655            storage.create(tx).await?;
656        }
657
658        // Test pagination
659        let query = PaginationQuery {
660            page: 1,
661            per_page: 2,
662        };
663        let page = storage.list_paginated(query).await?;
664
665        assert_eq!(page.items.len(), 2);
666        assert_eq!(page.total, 5);
667        assert_eq!(page.page, 1);
668        assert_eq!(page.per_page, 2);
669
670        // Test second page
671        let query2 = PaginationQuery {
672            page: 2,
673            per_page: 2,
674        };
675        let page2 = storage.list_paginated(query2).await?;
676
677        assert_eq!(page2.items.len(), 2);
678        assert_eq!(page2.total, 5);
679        assert_eq!(page2.page, 2);
680        assert_eq!(page2.per_page, 2);
681
682        Ok(())
683    }
684
685    #[tokio::test]
686    async fn test_update_in_memory() -> Result<()> {
687        let storage = TransactionRepositoryStorage::new_in_memory();
688        let transaction = create_test_transaction("test-tx", "test-relayer");
689
690        // Create transaction first
691        storage.create(transaction.clone()).await?;
692
693        // Update it
694        let mut updated_transaction = transaction.clone();
695        updated_transaction.status = TransactionStatus::Sent;
696        updated_transaction.status_reason = Some("Updated reason".to_string());
697
698        let result = storage
699            .update("test-tx".to_string(), updated_transaction.clone())
700            .await?;
701        assert_eq!(result.id, "test-tx");
702        assert_eq!(result.status, TransactionStatus::Sent);
703        assert_eq!(result.status_reason, Some("Updated reason".to_string()));
704
705        // Verify the update persisted
706        let retrieved = storage.get_by_id("test-tx".to_string()).await?;
707        assert_eq!(retrieved.status, TransactionStatus::Sent);
708        assert_eq!(retrieved.status_reason, Some("Updated reason".to_string()));
709
710        Ok(())
711    }
712
713    #[tokio::test]
714    async fn test_update_not_found_in_memory() -> Result<()> {
715        let storage = TransactionRepositoryStorage::new_in_memory();
716        let transaction = create_test_transaction("non-existent", "test-relayer");
717
718        let result = storage
719            .update("non-existent".to_string(), transaction)
720            .await;
721        assert!(result.is_err());
722
723        Ok(())
724    }
725
726    #[tokio::test]
727    async fn test_delete_by_id_in_memory() -> Result<()> {
728        let storage = TransactionRepositoryStorage::new_in_memory();
729        let transaction = create_test_transaction("test-tx", "test-relayer");
730
731        // Create transaction first
732        storage.create(transaction.clone()).await?;
733
734        // Verify it exists
735        let retrieved = storage.get_by_id("test-tx".to_string()).await?;
736        assert_eq!(retrieved.id, "test-tx");
737
738        // Delete it
739        storage.delete_by_id("test-tx".to_string()).await?;
740
741        // Verify it's gone
742        let result = storage.get_by_id("test-tx".to_string()).await;
743        assert!(result.is_err());
744
745        Ok(())
746    }
747
748    #[tokio::test]
749    async fn test_delete_by_id_not_found_in_memory() -> Result<()> {
750        let storage = TransactionRepositoryStorage::new_in_memory();
751
752        let result = storage.delete_by_id("non-existent".to_string()).await;
753        assert!(result.is_err());
754
755        Ok(())
756    }
757
758    #[tokio::test]
759    async fn test_count_in_memory() -> Result<()> {
760        let storage = TransactionRepositoryStorage::new_in_memory();
761
762        // Initially empty
763        let count = storage.count().await?;
764        assert_eq!(count, 0);
765
766        // Add transactions
767        let tx1 = create_test_transaction("tx-1", "relayer-1");
768        let tx2 = create_test_transaction("tx-2", "relayer-2");
769
770        storage.create(tx1).await?;
771        let count_after_one = storage.count().await?;
772        assert_eq!(count_after_one, 1);
773
774        storage.create(tx2).await?;
775        let count_after_two = storage.count().await?;
776        assert_eq!(count_after_two, 2);
777
778        // Delete one
779        storage.delete_by_id("tx-1".to_string()).await?;
780        let count_after_delete = storage.count().await?;
781        assert_eq!(count_after_delete, 1);
782
783        Ok(())
784    }
785
786    #[tokio::test]
787    async fn test_has_entries_in_memory() -> Result<()> {
788        let storage = TransactionRepositoryStorage::new_in_memory();
789
790        // Initially empty
791        let has_entries = storage.has_entries().await?;
792        assert!(!has_entries);
793
794        // Add transaction
795        let transaction = create_test_transaction("test-tx", "test-relayer");
796        storage.create(transaction).await?;
797
798        let has_entries_after_create = storage.has_entries().await?;
799        assert!(has_entries_after_create);
800
801        // Delete transaction
802        storage.delete_by_id("test-tx".to_string()).await?;
803
804        let has_entries_after_delete = storage.has_entries().await?;
805        assert!(!has_entries_after_delete);
806
807        Ok(())
808    }
809
810    #[tokio::test]
811    async fn test_drop_all_entries_in_memory() -> Result<()> {
812        let storage = TransactionRepositoryStorage::new_in_memory();
813
814        // Add multiple transactions
815        for i in 1..=5 {
816            let tx = create_test_transaction(&format!("tx-{}", i), "test-relayer");
817            storage.create(tx).await?;
818        }
819
820        // Verify they exist
821        let count_before = storage.count().await?;
822        assert_eq!(count_before, 5);
823
824        let has_entries_before = storage.has_entries().await?;
825        assert!(has_entries_before);
826
827        // Drop all entries
828        storage.drop_all_entries().await?;
829
830        // Verify they're gone
831        let count_after = storage.count().await?;
832        assert_eq!(count_after, 0);
833
834        let has_entries_after = storage.has_entries().await?;
835        assert!(!has_entries_after);
836
837        let all_transactions = storage.list_all().await?;
838        assert!(all_transactions.is_empty());
839
840        Ok(())
841    }
842
843    #[tokio::test]
844    async fn test_find_by_relayer_id_in_memory() -> Result<()> {
845        let storage = TransactionRepositoryStorage::new_in_memory();
846
847        // Add transactions for different relayers
848        let tx1 = create_test_transaction("tx-1", "relayer-1");
849        let tx2 = create_test_transaction("tx-2", "relayer-1");
850        let tx3 = create_test_transaction("tx-3", "relayer-2");
851
852        storage.create(tx1).await?;
853        storage.create(tx2).await?;
854        storage.create(tx3).await?;
855
856        // Find by relayer ID
857        let query = PaginationQuery {
858            page: 1,
859            per_page: 10,
860        };
861        let result = storage.find_by_relayer_id("relayer-1", query).await?;
862
863        assert_eq!(result.items.len(), 2);
864        assert_eq!(result.total, 2);
865
866        // Verify all transactions belong to relayer-1
867        for tx in result.items {
868            assert_eq!(tx.relayer_id, "relayer-1");
869        }
870
871        Ok(())
872    }
873
874    #[tokio::test]
875    async fn test_find_by_status_in_memory() -> Result<()> {
876        let storage = TransactionRepositoryStorage::new_in_memory();
877
878        // Add transactions with different statuses
879        let tx1 =
880            create_test_transaction_with_status("tx-1", "relayer-1", TransactionStatus::Pending);
881        let tx2 = create_test_transaction_with_status("tx-2", "relayer-1", TransactionStatus::Sent);
882        let tx3 =
883            create_test_transaction_with_status("tx-3", "relayer-1", TransactionStatus::Pending);
884        let tx4 =
885            create_test_transaction_with_status("tx-4", "relayer-2", TransactionStatus::Pending);
886
887        storage.create(tx1).await?;
888        storage.create(tx2).await?;
889        storage.create(tx3).await?;
890        storage.create(tx4).await?;
891
892        // Find by status
893        let statuses = vec![TransactionStatus::Pending];
894        let result = storage.find_by_status("relayer-1", &statuses).await?;
895
896        assert_eq!(result.len(), 2);
897
898        // Verify all transactions have Pending status and belong to relayer-1
899        for tx in result {
900            assert_eq!(tx.status, TransactionStatus::Pending);
901            assert_eq!(tx.relayer_id, "relayer-1");
902        }
903
904        Ok(())
905    }
906
907    #[tokio::test]
908    async fn test_find_by_nonce_in_memory() -> Result<()> {
909        let storage = TransactionRepositoryStorage::new_in_memory();
910
911        // Add transactions with different nonces
912        let tx1 = create_test_transaction_with_nonce("tx-1", "relayer-1", 10);
913        let tx2 = create_test_transaction_with_nonce("tx-2", "relayer-1", 20);
914        let tx3 = create_test_transaction_with_nonce("tx-3", "relayer-2", 10);
915
916        storage.create(tx1).await?;
917        storage.create(tx2).await?;
918        storage.create(tx3).await?;
919
920        // Find by nonce
921        let result = storage.find_by_nonce("relayer-1", 10).await?;
922
923        assert!(result.is_some());
924        let found_tx = result.unwrap();
925        assert_eq!(found_tx.id, "tx-1");
926        assert_eq!(found_tx.relayer_id, "relayer-1");
927
928        // Check EVM nonce
929        if let NetworkTransactionData::Evm(evm_data) = found_tx.network_data {
930            assert_eq!(evm_data.nonce, Some(10));
931        }
932
933        // Test not found
934        let not_found = storage.find_by_nonce("relayer-1", 99).await?;
935        assert!(not_found.is_none());
936
937        Ok(())
938    }
939
940    #[tokio::test]
941    async fn test_update_status_in_memory() -> Result<()> {
942        let storage = TransactionRepositoryStorage::new_in_memory();
943        let transaction = create_test_transaction("test-tx", "test-relayer");
944
945        // Create transaction first
946        storage.create(transaction).await?;
947
948        // Update status
949        let updated = storage
950            .update_status("test-tx".to_string(), TransactionStatus::Sent)
951            .await?;
952
953        assert_eq!(updated.id, "test-tx");
954        assert_eq!(updated.status, TransactionStatus::Sent);
955
956        // Verify the update persisted
957        let retrieved = storage.get_by_id("test-tx".to_string()).await?;
958        assert_eq!(retrieved.status, TransactionStatus::Sent);
959
960        Ok(())
961    }
962
963    #[tokio::test]
964    async fn test_partial_update_in_memory() -> Result<()> {
965        let storage = TransactionRepositoryStorage::new_in_memory();
966        let transaction = create_test_transaction("test-tx", "test-relayer");
967
968        // Create transaction first
969        storage.create(transaction).await?;
970
971        // Partial update
972        let update_request = create_test_update_request();
973        let updated = storage
974            .partial_update("test-tx".to_string(), update_request)
975            .await?;
976
977        assert_eq!(updated.id, "test-tx");
978        assert_eq!(updated.status, TransactionStatus::Sent);
979        assert_eq!(updated.status_reason, Some("Test reason".to_string()));
980        assert!(updated.sent_at.is_some());
981        assert_eq!(updated.hashes, vec!["test_hash".to_string()]);
982
983        Ok(())
984    }
985
986    #[tokio::test]
987    async fn test_update_network_data_in_memory() -> Result<()> {
988        let storage = TransactionRepositoryStorage::new_in_memory();
989        let transaction = create_test_transaction("test-tx", "test-relayer");
990
991        // Create transaction first
992        storage.create(transaction).await?;
993
994        // Update network data
995        let new_evm_data = EvmTransactionData {
996            nonce: Some(42),
997            gas_limit: Some(21000),
998            ..Default::default()
999        };
1000        let new_network_data = NetworkTransactionData::Evm(new_evm_data);
1001
1002        let updated = storage
1003            .update_network_data("test-tx".to_string(), new_network_data)
1004            .await?;
1005
1006        assert_eq!(updated.id, "test-tx");
1007        if let NetworkTransactionData::Evm(evm_data) = updated.network_data {
1008            assert_eq!(evm_data.nonce, Some(42));
1009            assert_eq!(evm_data.gas_limit, Some(21000));
1010        } else {
1011            panic!("Expected EVM network data");
1012        }
1013
1014        Ok(())
1015    }
1016
1017    #[tokio::test]
1018    async fn test_set_sent_at_in_memory() -> Result<()> {
1019        let storage = TransactionRepositoryStorage::new_in_memory();
1020        let transaction = create_test_transaction("test-tx", "test-relayer");
1021
1022        // Create transaction first
1023        storage.create(transaction).await?;
1024
1025        // Set sent_at
1026        let sent_at = Utc::now().to_string();
1027        let updated = storage
1028            .set_sent_at("test-tx".to_string(), sent_at.clone())
1029            .await?;
1030
1031        assert_eq!(updated.id, "test-tx");
1032        assert_eq!(updated.sent_at, Some(sent_at));
1033
1034        Ok(())
1035    }
1036
1037    #[tokio::test]
1038    async fn test_set_confirmed_at_in_memory() -> Result<()> {
1039        let storage = TransactionRepositoryStorage::new_in_memory();
1040        let transaction = create_test_transaction("test-tx", "test-relayer");
1041
1042        // Create transaction first
1043        storage.create(transaction).await?;
1044
1045        // Set confirmed_at
1046        let confirmed_at = Utc::now().to_string();
1047        let updated = storage
1048            .set_confirmed_at("test-tx".to_string(), confirmed_at.clone())
1049            .await?;
1050
1051        assert_eq!(updated.id, "test-tx");
1052        assert_eq!(updated.confirmed_at, Some(confirmed_at));
1053
1054        Ok(())
1055    }
1056
1057    #[tokio::test]
1058    async fn test_create_duplicate_id_in_memory() -> Result<()> {
1059        let storage = TransactionRepositoryStorage::new_in_memory();
1060        let transaction = create_test_transaction("duplicate-id", "test-relayer");
1061
1062        // Create first transaction
1063        storage.create(transaction.clone()).await?;
1064
1065        // Try to create another with same ID - should fail
1066        let result = storage.create(transaction.clone()).await;
1067        assert!(result.is_err());
1068
1069        Ok(())
1070    }
1071
1072    #[tokio::test]
1073    async fn test_workflow_in_memory() -> Result<()> {
1074        let storage = TransactionRepositoryStorage::new_in_memory();
1075
1076        // 1. Start with empty storage
1077        assert!(!storage.has_entries().await?);
1078        assert_eq!(storage.count().await?, 0);
1079
1080        // 2. Create transaction
1081        let transaction = create_test_transaction("workflow-test", "test-relayer");
1082        let created = storage.create(transaction.clone()).await?;
1083        assert_eq!(created.id, "workflow-test");
1084
1085        // 3. Verify it exists
1086        assert!(storage.has_entries().await?);
1087        assert_eq!(storage.count().await?, 1);
1088
1089        // 4. Retrieve it
1090        let retrieved = storage.get_by_id("workflow-test".to_string()).await?;
1091        assert_eq!(retrieved.id, "workflow-test");
1092
1093        // 5. Update status
1094        let updated = storage
1095            .update_status("workflow-test".to_string(), TransactionStatus::Sent)
1096            .await?;
1097        assert_eq!(updated.status, TransactionStatus::Sent);
1098
1099        // 6. Verify update
1100        let retrieved_updated = storage.get_by_id("workflow-test".to_string()).await?;
1101        assert_eq!(retrieved_updated.status, TransactionStatus::Sent);
1102
1103        // 7. Delete it
1104        storage.delete_by_id("workflow-test".to_string()).await?;
1105
1106        // 8. Verify it's gone
1107        assert!(!storage.has_entries().await?);
1108        assert_eq!(storage.count().await?, 0);
1109
1110        let result = storage.get_by_id("workflow-test".to_string()).await;
1111        assert!(result.is_err());
1112
1113        Ok(())
1114    }
1115
1116    #[tokio::test]
1117    async fn test_multiple_relayers_workflow() -> Result<()> {
1118        let storage = TransactionRepositoryStorage::new_in_memory();
1119
1120        // Add transactions for multiple relayers
1121        let tx1 =
1122            create_test_transaction_with_status("tx-1", "relayer-1", TransactionStatus::Pending);
1123        let tx2 = create_test_transaction_with_status("tx-2", "relayer-1", TransactionStatus::Sent);
1124        let tx3 =
1125            create_test_transaction_with_status("tx-3", "relayer-2", TransactionStatus::Pending);
1126
1127        storage.create(tx1).await?;
1128        storage.create(tx2).await?;
1129        storage.create(tx3).await?;
1130
1131        // Test find_by_relayer_id
1132        let query = PaginationQuery {
1133            page: 1,
1134            per_page: 10,
1135        };
1136        let relayer1_txs = storage.find_by_relayer_id("relayer-1", query).await?;
1137        assert_eq!(relayer1_txs.items.len(), 2);
1138
1139        // Test find_by_status
1140        let pending_txs = storage
1141            .find_by_status("relayer-1", &[TransactionStatus::Pending])
1142            .await?;
1143        assert_eq!(pending_txs.len(), 1);
1144        assert_eq!(pending_txs[0].id, "tx-1");
1145
1146        // Test count remains accurate
1147        assert_eq!(storage.count().await?, 3);
1148
1149        Ok(())
1150    }
1151
1152    #[tokio::test]
1153    async fn test_pagination_edge_cases_in_memory() -> Result<()> {
1154        let storage = TransactionRepositoryStorage::new_in_memory();
1155
1156        // Test pagination with empty storage
1157        let query = PaginationQuery {
1158            page: 1,
1159            per_page: 10,
1160        };
1161        let page = storage.list_paginated(query).await?;
1162        assert_eq!(page.items.len(), 0);
1163        assert_eq!(page.total, 0);
1164        assert_eq!(page.page, 1);
1165        assert_eq!(page.per_page, 10);
1166
1167        // Add one transaction
1168        let transaction = create_test_transaction("single-tx", "test-relayer");
1169        storage.create(transaction).await?;
1170
1171        // Test pagination with single item
1172        let query = PaginationQuery {
1173            page: 1,
1174            per_page: 10,
1175        };
1176        let page = storage.list_paginated(query).await?;
1177        assert_eq!(page.items.len(), 1);
1178        assert_eq!(page.total, 1);
1179        assert_eq!(page.page, 1);
1180        assert_eq!(page.per_page, 10);
1181
1182        // Test pagination with page beyond total
1183        let query = PaginationQuery {
1184            page: 3,
1185            per_page: 10,
1186        };
1187        let page = storage.list_paginated(query).await?;
1188        assert_eq!(page.items.len(), 0);
1189        assert_eq!(page.total, 1);
1190        assert_eq!(page.page, 3);
1191        assert_eq!(page.per_page, 10);
1192
1193        Ok(())
1194    }
1195
1196    #[tokio::test]
1197    async fn test_find_by_relayer_id_pagination() -> Result<()> {
1198        let storage = TransactionRepositoryStorage::new_in_memory();
1199
1200        // Add many transactions for one relayer
1201        for i in 1..=10 {
1202            let tx = create_test_transaction(&format!("tx-{}", i), "test-relayer");
1203            storage.create(tx).await?;
1204        }
1205
1206        // Test first page
1207        let query = PaginationQuery {
1208            page: 1,
1209            per_page: 3,
1210        };
1211        let page1 = storage.find_by_relayer_id("test-relayer", query).await?;
1212        assert_eq!(page1.items.len(), 3);
1213        assert_eq!(page1.total, 10);
1214        assert_eq!(page1.page, 1);
1215        assert_eq!(page1.per_page, 3);
1216
1217        // Test second page
1218        let query = PaginationQuery {
1219            page: 2,
1220            per_page: 3,
1221        };
1222        let page2 = storage.find_by_relayer_id("test-relayer", query).await?;
1223        assert_eq!(page2.items.len(), 3);
1224        assert_eq!(page2.total, 10);
1225        assert_eq!(page2.page, 2);
1226        assert_eq!(page2.per_page, 3);
1227
1228        Ok(())
1229    }
1230
1231    #[tokio::test]
1232    async fn test_find_by_multiple_statuses() -> Result<()> {
1233        let storage = TransactionRepositoryStorage::new_in_memory();
1234
1235        // Add transactions with different statuses
1236        let tx1 =
1237            create_test_transaction_with_status("tx-1", "test-relayer", TransactionStatus::Pending);
1238        let tx2 =
1239            create_test_transaction_with_status("tx-2", "test-relayer", TransactionStatus::Sent);
1240        let tx3 = create_test_transaction_with_status(
1241            "tx-3",
1242            "test-relayer",
1243            TransactionStatus::Confirmed,
1244        );
1245        let tx4 =
1246            create_test_transaction_with_status("tx-4", "test-relayer", TransactionStatus::Failed);
1247
1248        storage.create(tx1).await?;
1249        storage.create(tx2).await?;
1250        storage.create(tx3).await?;
1251        storage.create(tx4).await?;
1252
1253        // Find by multiple statuses
1254        let statuses = vec![TransactionStatus::Pending, TransactionStatus::Sent];
1255        let result = storage.find_by_status("test-relayer", &statuses).await?;
1256
1257        assert_eq!(result.len(), 2);
1258
1259        // Verify all transactions have the correct statuses
1260        let found_statuses: Vec<TransactionStatus> =
1261            result.iter().map(|tx| tx.status.clone()).collect();
1262        assert!(found_statuses.contains(&TransactionStatus::Pending));
1263        assert!(found_statuses.contains(&TransactionStatus::Sent));
1264
1265        Ok(())
1266    }
1267}