1mod transaction_in_memory;
22mod transaction_redis;
23
24use deadpool_redis::Pool;
25pub use transaction_in_memory::*;
26pub use transaction_redis::*;
27
28use crate::{
29 models::{
30 NetworkTransactionData, TransactionRepoModel, TransactionStatus, TransactionUpdateRequest,
31 },
32 repositories::{BatchDeleteResult, TransactionDeleteRequest, *},
33 utils::RedisConnections,
34};
35use async_trait::async_trait;
36use eyre::Result;
37use std::sync::Arc;
38
39#[async_trait]
41pub trait TransactionRepository: Repository<TransactionRepoModel, String> {
42 async fn find_by_relayer_id(
44 &self,
45 relayer_id: &str,
46 query: PaginationQuery,
47 ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError>;
48
49 async fn find_by_status(
53 &self,
54 relayer_id: &str,
55 statuses: &[TransactionStatus],
56 ) -> Result<Vec<TransactionRepoModel>, RepositoryError>;
57
58 async fn find_by_status_paginated(
71 &self,
72 relayer_id: &str,
73 statuses: &[TransactionStatus],
74 query: PaginationQuery,
75 oldest_first: bool,
76 ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError>;
77
78 async fn find_by_nonce(
80 &self,
81 relayer_id: &str,
82 nonce: u64,
83 ) -> Result<Option<TransactionRepoModel>, RepositoryError>;
84
85 async fn update_status(
87 &self,
88 tx_id: String,
89 status: TransactionStatus,
90 ) -> Result<TransactionRepoModel, RepositoryError>;
91
92 async fn partial_update(
94 &self,
95 tx_id: String,
96 update: TransactionUpdateRequest,
97 ) -> Result<TransactionRepoModel, RepositoryError>;
98
99 async fn update_network_data(
101 &self,
102 tx_id: String,
103 network_data: NetworkTransactionData,
104 ) -> Result<TransactionRepoModel, RepositoryError>;
105
106 async fn set_sent_at(
108 &self,
109 tx_id: String,
110 sent_at: String,
111 ) -> Result<TransactionRepoModel, RepositoryError>;
112
113 async fn set_confirmed_at(
115 &self,
116 tx_id: String,
117 confirmed_at: String,
118 ) -> Result<TransactionRepoModel, RepositoryError>;
119
120 async fn count_by_status(
123 &self,
124 relayer_id: &str,
125 statuses: &[TransactionStatus],
126 ) -> Result<u64, RepositoryError>;
127
128 async fn delete_by_ids(&self, ids: Vec<String>) -> Result<BatchDeleteResult, RepositoryError>;
143
144 async fn delete_by_requests(
156 &self,
157 requests: Vec<TransactionDeleteRequest>,
158 ) -> Result<BatchDeleteResult, RepositoryError>;
159}
160
161#[cfg(test)]
162mockall::mock! {
163 pub TransactionRepository {}
164
165 #[async_trait]
166 impl Repository<TransactionRepoModel, String> for TransactionRepository {
167 async fn create(&self, entity: TransactionRepoModel) -> Result<TransactionRepoModel, RepositoryError>;
168 async fn get_by_id(&self, id: String) -> Result<TransactionRepoModel, RepositoryError>;
169 async fn list_all(&self) -> Result<Vec<TransactionRepoModel>, RepositoryError>;
170 async fn list_paginated(&self, query: PaginationQuery) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError>;
171 async fn update(&self, id: String, entity: TransactionRepoModel) -> Result<TransactionRepoModel, RepositoryError>;
172 async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError>;
173 async fn count(&self) -> Result<usize, RepositoryError>;
174 async fn has_entries(&self) -> Result<bool, RepositoryError>;
175 async fn drop_all_entries(&self) -> Result<(), RepositoryError>;
176 }
177
178 #[async_trait]
179 impl TransactionRepository for TransactionRepository {
180 async fn find_by_relayer_id(&self, relayer_id: &str, query: PaginationQuery) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError>;
181 async fn find_by_status(&self, relayer_id: &str, statuses: &[TransactionStatus]) -> Result<Vec<TransactionRepoModel>, RepositoryError>;
182 async fn find_by_status_paginated(&self, relayer_id: &str, statuses: &[TransactionStatus], query: PaginationQuery, oldest_first: bool) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError>;
183 async fn find_by_nonce(&self, relayer_id: &str, nonce: u64) -> Result<Option<TransactionRepoModel>, RepositoryError>;
184 async fn update_status(&self, tx_id: String, status: TransactionStatus) -> Result<TransactionRepoModel, RepositoryError>;
185 async fn partial_update(&self, tx_id: String, update: TransactionUpdateRequest) -> Result<TransactionRepoModel, RepositoryError>;
186 async fn update_network_data(&self, tx_id: String, network_data: NetworkTransactionData) -> Result<TransactionRepoModel, RepositoryError>;
187 async fn set_sent_at(&self, tx_id: String, sent_at: String) -> Result<TransactionRepoModel, RepositoryError>;
188 async fn set_confirmed_at(&self, tx_id: String, confirmed_at: String) -> Result<TransactionRepoModel, RepositoryError>;
189 async fn count_by_status(&self, relayer_id: &str, statuses: &[TransactionStatus]) -> Result<u64, RepositoryError>;
190 async fn delete_by_ids(&self, ids: Vec<String>) -> Result<BatchDeleteResult, RepositoryError>;
191 async fn delete_by_requests(&self, requests: Vec<TransactionDeleteRequest>) -> Result<BatchDeleteResult, RepositoryError>;
192 }
193}
194
195#[derive(Debug, Clone)]
197pub enum TransactionRepositoryStorage {
198 InMemory(InMemoryTransactionRepository),
199 Redis(RedisTransactionRepository),
200}
201
202impl TransactionRepositoryStorage {
203 pub fn new_in_memory() -> Self {
204 Self::InMemory(InMemoryTransactionRepository::new())
205 }
206 pub fn new_redis(
207 connections: Arc<RedisConnections>,
208 key_prefix: String,
209 ) -> Result<Self, RepositoryError> {
210 Ok(Self::Redis(RedisTransactionRepository::new(
211 connections,
212 key_prefix,
213 )?))
214 }
215
216 pub fn connection_info(&self) -> Option<(Arc<Pool>, &str)> {
226 match self {
227 TransactionRepositoryStorage::InMemory(_) => None,
228 TransactionRepositoryStorage::Redis(repo) => {
229 Some((repo.connections.primary().clone(), &repo.key_prefix))
230 }
231 }
232 }
233}
234
235#[async_trait]
236impl TransactionRepository for TransactionRepositoryStorage {
237 async fn find_by_relayer_id(
238 &self,
239 relayer_id: &str,
240 query: PaginationQuery,
241 ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
242 match self {
243 TransactionRepositoryStorage::InMemory(repo) => {
244 repo.find_by_relayer_id(relayer_id, query).await
245 }
246 TransactionRepositoryStorage::Redis(repo) => {
247 repo.find_by_relayer_id(relayer_id, query).await
248 }
249 }
250 }
251
252 async fn find_by_status(
253 &self,
254 relayer_id: &str,
255 statuses: &[TransactionStatus],
256 ) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
257 match self {
258 TransactionRepositoryStorage::InMemory(repo) => {
259 repo.find_by_status(relayer_id, statuses).await
260 }
261 TransactionRepositoryStorage::Redis(repo) => {
262 repo.find_by_status(relayer_id, statuses).await
263 }
264 }
265 }
266
267 async fn find_by_status_paginated(
268 &self,
269 relayer_id: &str,
270 statuses: &[TransactionStatus],
271 query: PaginationQuery,
272 oldest_first: bool,
273 ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
274 match self {
275 TransactionRepositoryStorage::InMemory(repo) => {
276 repo.find_by_status_paginated(relayer_id, statuses, query, oldest_first)
277 .await
278 }
279 TransactionRepositoryStorage::Redis(repo) => {
280 repo.find_by_status_paginated(relayer_id, statuses, query, oldest_first)
281 .await
282 }
283 }
284 }
285
286 async fn find_by_nonce(
287 &self,
288 relayer_id: &str,
289 nonce: u64,
290 ) -> Result<Option<TransactionRepoModel>, RepositoryError> {
291 match self {
292 TransactionRepositoryStorage::InMemory(repo) => {
293 repo.find_by_nonce(relayer_id, nonce).await
294 }
295 TransactionRepositoryStorage::Redis(repo) => {
296 repo.find_by_nonce(relayer_id, nonce).await
297 }
298 }
299 }
300
301 async fn update_status(
302 &self,
303 tx_id: String,
304 status: TransactionStatus,
305 ) -> Result<TransactionRepoModel, RepositoryError> {
306 match self {
307 TransactionRepositoryStorage::InMemory(repo) => repo.update_status(tx_id, status).await,
308 TransactionRepositoryStorage::Redis(repo) => repo.update_status(tx_id, status).await,
309 }
310 }
311
312 async fn partial_update(
313 &self,
314 tx_id: String,
315 update: TransactionUpdateRequest,
316 ) -> Result<TransactionRepoModel, RepositoryError> {
317 match self {
318 TransactionRepositoryStorage::InMemory(repo) => {
319 repo.partial_update(tx_id, update).await
320 }
321 TransactionRepositoryStorage::Redis(repo) => repo.partial_update(tx_id, update).await,
322 }
323 }
324
325 async fn update_network_data(
326 &self,
327 tx_id: String,
328 network_data: NetworkTransactionData,
329 ) -> Result<TransactionRepoModel, RepositoryError> {
330 match self {
331 TransactionRepositoryStorage::InMemory(repo) => {
332 repo.update_network_data(tx_id, network_data).await
333 }
334 TransactionRepositoryStorage::Redis(repo) => {
335 repo.update_network_data(tx_id, network_data).await
336 }
337 }
338 }
339
340 async fn set_sent_at(
341 &self,
342 tx_id: String,
343 sent_at: String,
344 ) -> Result<TransactionRepoModel, RepositoryError> {
345 match self {
346 TransactionRepositoryStorage::InMemory(repo) => repo.set_sent_at(tx_id, sent_at).await,
347 TransactionRepositoryStorage::Redis(repo) => repo.set_sent_at(tx_id, sent_at).await,
348 }
349 }
350
351 async fn set_confirmed_at(
352 &self,
353 tx_id: String,
354 confirmed_at: String,
355 ) -> Result<TransactionRepoModel, RepositoryError> {
356 match self {
357 TransactionRepositoryStorage::InMemory(repo) => {
358 repo.set_confirmed_at(tx_id, confirmed_at).await
359 }
360 TransactionRepositoryStorage::Redis(repo) => {
361 repo.set_confirmed_at(tx_id, confirmed_at).await
362 }
363 }
364 }
365
366 async fn count_by_status(
367 &self,
368 relayer_id: &str,
369 statuses: &[TransactionStatus],
370 ) -> Result<u64, RepositoryError> {
371 match self {
372 TransactionRepositoryStorage::InMemory(repo) => {
373 repo.count_by_status(relayer_id, statuses).await
374 }
375 TransactionRepositoryStorage::Redis(repo) => {
376 repo.count_by_status(relayer_id, statuses).await
377 }
378 }
379 }
380
381 async fn delete_by_ids(&self, ids: Vec<String>) -> Result<BatchDeleteResult, RepositoryError> {
382 match self {
383 TransactionRepositoryStorage::InMemory(repo) => repo.delete_by_ids(ids).await,
384 TransactionRepositoryStorage::Redis(repo) => repo.delete_by_ids(ids).await,
385 }
386 }
387
388 async fn delete_by_requests(
389 &self,
390 requests: Vec<TransactionDeleteRequest>,
391 ) -> Result<BatchDeleteResult, RepositoryError> {
392 match self {
393 TransactionRepositoryStorage::InMemory(repo) => repo.delete_by_requests(requests).await,
394 TransactionRepositoryStorage::Redis(repo) => repo.delete_by_requests(requests).await,
395 }
396 }
397}
398
399#[async_trait]
400impl Repository<TransactionRepoModel, String> for TransactionRepositoryStorage {
401 async fn create(
402 &self,
403 entity: TransactionRepoModel,
404 ) -> Result<TransactionRepoModel, RepositoryError> {
405 match self {
406 TransactionRepositoryStorage::InMemory(repo) => repo.create(entity).await,
407 TransactionRepositoryStorage::Redis(repo) => repo.create(entity).await,
408 }
409 }
410
411 async fn get_by_id(&self, id: String) -> Result<TransactionRepoModel, RepositoryError> {
412 match self {
413 TransactionRepositoryStorage::InMemory(repo) => repo.get_by_id(id).await,
414 TransactionRepositoryStorage::Redis(repo) => repo.get_by_id(id).await,
415 }
416 }
417
418 async fn list_all(&self) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
419 match self {
420 TransactionRepositoryStorage::InMemory(repo) => repo.list_all().await,
421 TransactionRepositoryStorage::Redis(repo) => repo.list_all().await,
422 }
423 }
424
425 async fn list_paginated(
426 &self,
427 query: PaginationQuery,
428 ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
429 match self {
430 TransactionRepositoryStorage::InMemory(repo) => repo.list_paginated(query).await,
431 TransactionRepositoryStorage::Redis(repo) => repo.list_paginated(query).await,
432 }
433 }
434
435 async fn update(
436 &self,
437 id: String,
438 entity: TransactionRepoModel,
439 ) -> Result<TransactionRepoModel, RepositoryError> {
440 match self {
441 TransactionRepositoryStorage::InMemory(repo) => repo.update(id, entity).await,
442 TransactionRepositoryStorage::Redis(repo) => repo.update(id, entity).await,
443 }
444 }
445
446 async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError> {
447 match self {
448 TransactionRepositoryStorage::InMemory(repo) => repo.delete_by_id(id).await,
449 TransactionRepositoryStorage::Redis(repo) => repo.delete_by_id(id).await,
450 }
451 }
452
453 async fn count(&self) -> Result<usize, RepositoryError> {
454 match self {
455 TransactionRepositoryStorage::InMemory(repo) => repo.count().await,
456 TransactionRepositoryStorage::Redis(repo) => repo.count().await,
457 }
458 }
459
460 async fn has_entries(&self) -> Result<bool, RepositoryError> {
461 match self {
462 TransactionRepositoryStorage::InMemory(repo) => repo.has_entries().await,
463 TransactionRepositoryStorage::Redis(repo) => repo.has_entries().await,
464 }
465 }
466
467 async fn drop_all_entries(&self) -> Result<(), RepositoryError> {
468 match self {
469 TransactionRepositoryStorage::InMemory(repo) => repo.drop_all_entries().await,
470 TransactionRepositoryStorage::Redis(repo) => repo.drop_all_entries().await,
471 }
472 }
473}
474
475#[cfg(test)]
476mod tests {
477 use chrono::Utc;
478 use color_eyre::Result;
479 use deadpool_redis::{Config, Runtime};
480
481 use super::*;
482 use crate::models::{
483 EvmTransactionData, NetworkTransactionData, TransactionStatus, TransactionUpdateRequest,
484 };
485 use crate::repositories::PaginationQuery;
486 use crate::utils::mocks::mockutils::create_mock_transaction;
487
488 fn create_test_transaction(id: &str, relayer_id: &str) -> TransactionRepoModel {
489 let mut transaction = create_mock_transaction();
490 transaction.id = id.to_string();
491 transaction.relayer_id = relayer_id.to_string();
492 transaction
493 }
494
495 fn create_test_transaction_with_status(
496 id: &str,
497 relayer_id: &str,
498 status: TransactionStatus,
499 ) -> TransactionRepoModel {
500 let mut transaction = create_test_transaction(id, relayer_id);
501 transaction.status = status;
502 transaction
503 }
504
505 fn create_test_transaction_with_nonce(
506 id: &str,
507 relayer_id: &str,
508 nonce: u64,
509 ) -> TransactionRepoModel {
510 let mut transaction = create_test_transaction(id, relayer_id);
511 if let NetworkTransactionData::Evm(ref mut evm_data) = transaction.network_data {
512 evm_data.nonce = Some(nonce);
513 }
514 transaction
515 }
516
517 fn create_test_update_request() -> TransactionUpdateRequest {
518 TransactionUpdateRequest {
519 status: Some(TransactionStatus::Sent),
520 status_reason: Some("Test reason".to_string()),
521 sent_at: Some(Utc::now().to_string()),
522 confirmed_at: None,
523 network_data: None,
524 priced_at: None,
525 hashes: Some(vec!["test_hash".to_string()]),
526 noop_count: None,
527 is_canceled: None,
528 delete_at: None,
529 }
530 }
531
532 #[tokio::test]
533 async fn test_new_in_memory() {
534 let storage = TransactionRepositoryStorage::new_in_memory();
535
536 match storage {
537 TransactionRepositoryStorage::InMemory(_) => {
538 }
540 TransactionRepositoryStorage::Redis(_) => {
541 panic!("Expected InMemory variant, got Redis");
542 }
543 }
544 }
545
546 #[tokio::test]
547 async fn test_connection_info_returns_none_for_in_memory() {
548 let storage = TransactionRepositoryStorage::new_in_memory();
549
550 assert!(storage.connection_info().is_none());
552 }
553
554 #[tokio::test]
555 #[ignore = "Requires active Redis instance"]
556 async fn test_connection_info_returns_some_for_redis() -> Result<()> {
557 let redis_url = std::env::var("REDIS_TEST_URL")
558 .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
559 let cfg = Config::from_url(&redis_url);
560 let pool = Arc::new(
561 cfg.builder()
562 .map_err(|e| eyre::eyre!("Failed to create Redis pool builder: {}", e))?
563 .max_size(16)
564 .runtime(Runtime::Tokio1)
565 .build()
566 .map_err(|e| eyre::eyre!("Failed to build Redis pool: {}", e))?,
567 );
568 let connections = Arc::new(RedisConnections::new_single_pool(pool.clone()));
569 let key_prefix = "test_prefix".to_string();
570
571 let storage = TransactionRepositoryStorage::new_redis(connections, key_prefix.clone())?;
572
573 let (returned_connection, returned_prefix) = storage
574 .connection_info()
575 .expect("Expected Redis connection info");
576
577 assert!(Arc::ptr_eq(&pool, &returned_connection));
578 assert_eq!(returned_prefix, key_prefix);
579
580 Ok(())
581 }
582
583 #[tokio::test]
584 async fn test_create_in_memory() -> Result<()> {
585 let storage = TransactionRepositoryStorage::new_in_memory();
586 let transaction = create_test_transaction("test-tx", "test-relayer");
587
588 let created = storage.create(transaction.clone()).await?;
589 assert_eq!(created.id, transaction.id);
590 assert_eq!(created.relayer_id, transaction.relayer_id);
591 assert_eq!(created.status, transaction.status);
592
593 Ok(())
594 }
595
596 #[tokio::test]
597 async fn test_get_by_id_in_memory() -> Result<()> {
598 let storage = TransactionRepositoryStorage::new_in_memory();
599 let transaction = create_test_transaction("test-tx", "test-relayer");
600
601 storage.create(transaction.clone()).await?;
603
604 let retrieved = storage.get_by_id("test-tx".to_string()).await?;
606 assert_eq!(retrieved.id, transaction.id);
607 assert_eq!(retrieved.relayer_id, transaction.relayer_id);
608 assert_eq!(retrieved.status, transaction.status);
609
610 Ok(())
611 }
612
613 #[tokio::test]
614 async fn test_get_by_id_not_found_in_memory() -> Result<()> {
615 let storage = TransactionRepositoryStorage::new_in_memory();
616
617 let result = storage.get_by_id("non-existent".to_string()).await;
618 assert!(result.is_err());
619
620 Ok(())
621 }
622
623 #[tokio::test]
624 async fn test_list_all_in_memory() -> Result<()> {
625 let storage = TransactionRepositoryStorage::new_in_memory();
626
627 let transactions = storage.list_all().await?;
629 assert!(transactions.is_empty());
630
631 let tx1 = create_test_transaction("tx-1", "relayer-1");
633 let tx2 = create_test_transaction("tx-2", "relayer-2");
634
635 storage.create(tx1.clone()).await?;
636 storage.create(tx2.clone()).await?;
637
638 let all_transactions = storage.list_all().await?;
639 assert_eq!(all_transactions.len(), 2);
640
641 let ids: Vec<&str> = all_transactions.iter().map(|t| t.id.as_str()).collect();
642 assert!(ids.contains(&"tx-1"));
643 assert!(ids.contains(&"tx-2"));
644
645 Ok(())
646 }
647
648 #[tokio::test]
649 async fn test_list_paginated_in_memory() -> Result<()> {
650 let storage = TransactionRepositoryStorage::new_in_memory();
651
652 for i in 1..=5 {
654 let tx = create_test_transaction(&format!("tx-{}", i), "test-relayer");
655 storage.create(tx).await?;
656 }
657
658 let query = PaginationQuery {
660 page: 1,
661 per_page: 2,
662 };
663 let page = storage.list_paginated(query).await?;
664
665 assert_eq!(page.items.len(), 2);
666 assert_eq!(page.total, 5);
667 assert_eq!(page.page, 1);
668 assert_eq!(page.per_page, 2);
669
670 let query2 = PaginationQuery {
672 page: 2,
673 per_page: 2,
674 };
675 let page2 = storage.list_paginated(query2).await?;
676
677 assert_eq!(page2.items.len(), 2);
678 assert_eq!(page2.total, 5);
679 assert_eq!(page2.page, 2);
680 assert_eq!(page2.per_page, 2);
681
682 Ok(())
683 }
684
685 #[tokio::test]
686 async fn test_update_in_memory() -> Result<()> {
687 let storage = TransactionRepositoryStorage::new_in_memory();
688 let transaction = create_test_transaction("test-tx", "test-relayer");
689
690 storage.create(transaction.clone()).await?;
692
693 let mut updated_transaction = transaction.clone();
695 updated_transaction.status = TransactionStatus::Sent;
696 updated_transaction.status_reason = Some("Updated reason".to_string());
697
698 let result = storage
699 .update("test-tx".to_string(), updated_transaction.clone())
700 .await?;
701 assert_eq!(result.id, "test-tx");
702 assert_eq!(result.status, TransactionStatus::Sent);
703 assert_eq!(result.status_reason, Some("Updated reason".to_string()));
704
705 let retrieved = storage.get_by_id("test-tx".to_string()).await?;
707 assert_eq!(retrieved.status, TransactionStatus::Sent);
708 assert_eq!(retrieved.status_reason, Some("Updated reason".to_string()));
709
710 Ok(())
711 }
712
713 #[tokio::test]
714 async fn test_update_not_found_in_memory() -> Result<()> {
715 let storage = TransactionRepositoryStorage::new_in_memory();
716 let transaction = create_test_transaction("non-existent", "test-relayer");
717
718 let result = storage
719 .update("non-existent".to_string(), transaction)
720 .await;
721 assert!(result.is_err());
722
723 Ok(())
724 }
725
726 #[tokio::test]
727 async fn test_delete_by_id_in_memory() -> Result<()> {
728 let storage = TransactionRepositoryStorage::new_in_memory();
729 let transaction = create_test_transaction("test-tx", "test-relayer");
730
731 storage.create(transaction.clone()).await?;
733
734 let retrieved = storage.get_by_id("test-tx".to_string()).await?;
736 assert_eq!(retrieved.id, "test-tx");
737
738 storage.delete_by_id("test-tx".to_string()).await?;
740
741 let result = storage.get_by_id("test-tx".to_string()).await;
743 assert!(result.is_err());
744
745 Ok(())
746 }
747
748 #[tokio::test]
749 async fn test_delete_by_id_not_found_in_memory() -> Result<()> {
750 let storage = TransactionRepositoryStorage::new_in_memory();
751
752 let result = storage.delete_by_id("non-existent".to_string()).await;
753 assert!(result.is_err());
754
755 Ok(())
756 }
757
758 #[tokio::test]
759 async fn test_count_in_memory() -> Result<()> {
760 let storage = TransactionRepositoryStorage::new_in_memory();
761
762 let count = storage.count().await?;
764 assert_eq!(count, 0);
765
766 let tx1 = create_test_transaction("tx-1", "relayer-1");
768 let tx2 = create_test_transaction("tx-2", "relayer-2");
769
770 storage.create(tx1).await?;
771 let count_after_one = storage.count().await?;
772 assert_eq!(count_after_one, 1);
773
774 storage.create(tx2).await?;
775 let count_after_two = storage.count().await?;
776 assert_eq!(count_after_two, 2);
777
778 storage.delete_by_id("tx-1".to_string()).await?;
780 let count_after_delete = storage.count().await?;
781 assert_eq!(count_after_delete, 1);
782
783 Ok(())
784 }
785
786 #[tokio::test]
787 async fn test_has_entries_in_memory() -> Result<()> {
788 let storage = TransactionRepositoryStorage::new_in_memory();
789
790 let has_entries = storage.has_entries().await?;
792 assert!(!has_entries);
793
794 let transaction = create_test_transaction("test-tx", "test-relayer");
796 storage.create(transaction).await?;
797
798 let has_entries_after_create = storage.has_entries().await?;
799 assert!(has_entries_after_create);
800
801 storage.delete_by_id("test-tx".to_string()).await?;
803
804 let has_entries_after_delete = storage.has_entries().await?;
805 assert!(!has_entries_after_delete);
806
807 Ok(())
808 }
809
810 #[tokio::test]
811 async fn test_drop_all_entries_in_memory() -> Result<()> {
812 let storage = TransactionRepositoryStorage::new_in_memory();
813
814 for i in 1..=5 {
816 let tx = create_test_transaction(&format!("tx-{}", i), "test-relayer");
817 storage.create(tx).await?;
818 }
819
820 let count_before = storage.count().await?;
822 assert_eq!(count_before, 5);
823
824 let has_entries_before = storage.has_entries().await?;
825 assert!(has_entries_before);
826
827 storage.drop_all_entries().await?;
829
830 let count_after = storage.count().await?;
832 assert_eq!(count_after, 0);
833
834 let has_entries_after = storage.has_entries().await?;
835 assert!(!has_entries_after);
836
837 let all_transactions = storage.list_all().await?;
838 assert!(all_transactions.is_empty());
839
840 Ok(())
841 }
842
843 #[tokio::test]
844 async fn test_find_by_relayer_id_in_memory() -> Result<()> {
845 let storage = TransactionRepositoryStorage::new_in_memory();
846
847 let tx1 = create_test_transaction("tx-1", "relayer-1");
849 let tx2 = create_test_transaction("tx-2", "relayer-1");
850 let tx3 = create_test_transaction("tx-3", "relayer-2");
851
852 storage.create(tx1).await?;
853 storage.create(tx2).await?;
854 storage.create(tx3).await?;
855
856 let query = PaginationQuery {
858 page: 1,
859 per_page: 10,
860 };
861 let result = storage.find_by_relayer_id("relayer-1", query).await?;
862
863 assert_eq!(result.items.len(), 2);
864 assert_eq!(result.total, 2);
865
866 for tx in result.items {
868 assert_eq!(tx.relayer_id, "relayer-1");
869 }
870
871 Ok(())
872 }
873
874 #[tokio::test]
875 async fn test_find_by_status_in_memory() -> Result<()> {
876 let storage = TransactionRepositoryStorage::new_in_memory();
877
878 let tx1 =
880 create_test_transaction_with_status("tx-1", "relayer-1", TransactionStatus::Pending);
881 let tx2 = create_test_transaction_with_status("tx-2", "relayer-1", TransactionStatus::Sent);
882 let tx3 =
883 create_test_transaction_with_status("tx-3", "relayer-1", TransactionStatus::Pending);
884 let tx4 =
885 create_test_transaction_with_status("tx-4", "relayer-2", TransactionStatus::Pending);
886
887 storage.create(tx1).await?;
888 storage.create(tx2).await?;
889 storage.create(tx3).await?;
890 storage.create(tx4).await?;
891
892 let statuses = vec![TransactionStatus::Pending];
894 let result = storage.find_by_status("relayer-1", &statuses).await?;
895
896 assert_eq!(result.len(), 2);
897
898 for tx in result {
900 assert_eq!(tx.status, TransactionStatus::Pending);
901 assert_eq!(tx.relayer_id, "relayer-1");
902 }
903
904 Ok(())
905 }
906
907 #[tokio::test]
908 async fn test_find_by_nonce_in_memory() -> Result<()> {
909 let storage = TransactionRepositoryStorage::new_in_memory();
910
911 let tx1 = create_test_transaction_with_nonce("tx-1", "relayer-1", 10);
913 let tx2 = create_test_transaction_with_nonce("tx-2", "relayer-1", 20);
914 let tx3 = create_test_transaction_with_nonce("tx-3", "relayer-2", 10);
915
916 storage.create(tx1).await?;
917 storage.create(tx2).await?;
918 storage.create(tx3).await?;
919
920 let result = storage.find_by_nonce("relayer-1", 10).await?;
922
923 assert!(result.is_some());
924 let found_tx = result.unwrap();
925 assert_eq!(found_tx.id, "tx-1");
926 assert_eq!(found_tx.relayer_id, "relayer-1");
927
928 if let NetworkTransactionData::Evm(evm_data) = found_tx.network_data {
930 assert_eq!(evm_data.nonce, Some(10));
931 }
932
933 let not_found = storage.find_by_nonce("relayer-1", 99).await?;
935 assert!(not_found.is_none());
936
937 Ok(())
938 }
939
940 #[tokio::test]
941 async fn test_update_status_in_memory() -> Result<()> {
942 let storage = TransactionRepositoryStorage::new_in_memory();
943 let transaction = create_test_transaction("test-tx", "test-relayer");
944
945 storage.create(transaction).await?;
947
948 let updated = storage
950 .update_status("test-tx".to_string(), TransactionStatus::Sent)
951 .await?;
952
953 assert_eq!(updated.id, "test-tx");
954 assert_eq!(updated.status, TransactionStatus::Sent);
955
956 let retrieved = storage.get_by_id("test-tx".to_string()).await?;
958 assert_eq!(retrieved.status, TransactionStatus::Sent);
959
960 Ok(())
961 }
962
963 #[tokio::test]
964 async fn test_partial_update_in_memory() -> Result<()> {
965 let storage = TransactionRepositoryStorage::new_in_memory();
966 let transaction = create_test_transaction("test-tx", "test-relayer");
967
968 storage.create(transaction).await?;
970
971 let update_request = create_test_update_request();
973 let updated = storage
974 .partial_update("test-tx".to_string(), update_request)
975 .await?;
976
977 assert_eq!(updated.id, "test-tx");
978 assert_eq!(updated.status, TransactionStatus::Sent);
979 assert_eq!(updated.status_reason, Some("Test reason".to_string()));
980 assert!(updated.sent_at.is_some());
981 assert_eq!(updated.hashes, vec!["test_hash".to_string()]);
982
983 Ok(())
984 }
985
986 #[tokio::test]
987 async fn test_update_network_data_in_memory() -> Result<()> {
988 let storage = TransactionRepositoryStorage::new_in_memory();
989 let transaction = create_test_transaction("test-tx", "test-relayer");
990
991 storage.create(transaction).await?;
993
994 let new_evm_data = EvmTransactionData {
996 nonce: Some(42),
997 gas_limit: Some(21000),
998 ..Default::default()
999 };
1000 let new_network_data = NetworkTransactionData::Evm(new_evm_data);
1001
1002 let updated = storage
1003 .update_network_data("test-tx".to_string(), new_network_data)
1004 .await?;
1005
1006 assert_eq!(updated.id, "test-tx");
1007 if let NetworkTransactionData::Evm(evm_data) = updated.network_data {
1008 assert_eq!(evm_data.nonce, Some(42));
1009 assert_eq!(evm_data.gas_limit, Some(21000));
1010 } else {
1011 panic!("Expected EVM network data");
1012 }
1013
1014 Ok(())
1015 }
1016
1017 #[tokio::test]
1018 async fn test_set_sent_at_in_memory() -> Result<()> {
1019 let storage = TransactionRepositoryStorage::new_in_memory();
1020 let transaction = create_test_transaction("test-tx", "test-relayer");
1021
1022 storage.create(transaction).await?;
1024
1025 let sent_at = Utc::now().to_string();
1027 let updated = storage
1028 .set_sent_at("test-tx".to_string(), sent_at.clone())
1029 .await?;
1030
1031 assert_eq!(updated.id, "test-tx");
1032 assert_eq!(updated.sent_at, Some(sent_at));
1033
1034 Ok(())
1035 }
1036
1037 #[tokio::test]
1038 async fn test_set_confirmed_at_in_memory() -> Result<()> {
1039 let storage = TransactionRepositoryStorage::new_in_memory();
1040 let transaction = create_test_transaction("test-tx", "test-relayer");
1041
1042 storage.create(transaction).await?;
1044
1045 let confirmed_at = Utc::now().to_string();
1047 let updated = storage
1048 .set_confirmed_at("test-tx".to_string(), confirmed_at.clone())
1049 .await?;
1050
1051 assert_eq!(updated.id, "test-tx");
1052 assert_eq!(updated.confirmed_at, Some(confirmed_at));
1053
1054 Ok(())
1055 }
1056
1057 #[tokio::test]
1058 async fn test_create_duplicate_id_in_memory() -> Result<()> {
1059 let storage = TransactionRepositoryStorage::new_in_memory();
1060 let transaction = create_test_transaction("duplicate-id", "test-relayer");
1061
1062 storage.create(transaction.clone()).await?;
1064
1065 let result = storage.create(transaction.clone()).await;
1067 assert!(result.is_err());
1068
1069 Ok(())
1070 }
1071
1072 #[tokio::test]
1073 async fn test_workflow_in_memory() -> Result<()> {
1074 let storage = TransactionRepositoryStorage::new_in_memory();
1075
1076 assert!(!storage.has_entries().await?);
1078 assert_eq!(storage.count().await?, 0);
1079
1080 let transaction = create_test_transaction("workflow-test", "test-relayer");
1082 let created = storage.create(transaction.clone()).await?;
1083 assert_eq!(created.id, "workflow-test");
1084
1085 assert!(storage.has_entries().await?);
1087 assert_eq!(storage.count().await?, 1);
1088
1089 let retrieved = storage.get_by_id("workflow-test".to_string()).await?;
1091 assert_eq!(retrieved.id, "workflow-test");
1092
1093 let updated = storage
1095 .update_status("workflow-test".to_string(), TransactionStatus::Sent)
1096 .await?;
1097 assert_eq!(updated.status, TransactionStatus::Sent);
1098
1099 let retrieved_updated = storage.get_by_id("workflow-test".to_string()).await?;
1101 assert_eq!(retrieved_updated.status, TransactionStatus::Sent);
1102
1103 storage.delete_by_id("workflow-test".to_string()).await?;
1105
1106 assert!(!storage.has_entries().await?);
1108 assert_eq!(storage.count().await?, 0);
1109
1110 let result = storage.get_by_id("workflow-test".to_string()).await;
1111 assert!(result.is_err());
1112
1113 Ok(())
1114 }
1115
1116 #[tokio::test]
1117 async fn test_multiple_relayers_workflow() -> Result<()> {
1118 let storage = TransactionRepositoryStorage::new_in_memory();
1119
1120 let tx1 =
1122 create_test_transaction_with_status("tx-1", "relayer-1", TransactionStatus::Pending);
1123 let tx2 = create_test_transaction_with_status("tx-2", "relayer-1", TransactionStatus::Sent);
1124 let tx3 =
1125 create_test_transaction_with_status("tx-3", "relayer-2", TransactionStatus::Pending);
1126
1127 storage.create(tx1).await?;
1128 storage.create(tx2).await?;
1129 storage.create(tx3).await?;
1130
1131 let query = PaginationQuery {
1133 page: 1,
1134 per_page: 10,
1135 };
1136 let relayer1_txs = storage.find_by_relayer_id("relayer-1", query).await?;
1137 assert_eq!(relayer1_txs.items.len(), 2);
1138
1139 let pending_txs = storage
1141 .find_by_status("relayer-1", &[TransactionStatus::Pending])
1142 .await?;
1143 assert_eq!(pending_txs.len(), 1);
1144 assert_eq!(pending_txs[0].id, "tx-1");
1145
1146 assert_eq!(storage.count().await?, 3);
1148
1149 Ok(())
1150 }
1151
1152 #[tokio::test]
1153 async fn test_pagination_edge_cases_in_memory() -> Result<()> {
1154 let storage = TransactionRepositoryStorage::new_in_memory();
1155
1156 let query = PaginationQuery {
1158 page: 1,
1159 per_page: 10,
1160 };
1161 let page = storage.list_paginated(query).await?;
1162 assert_eq!(page.items.len(), 0);
1163 assert_eq!(page.total, 0);
1164 assert_eq!(page.page, 1);
1165 assert_eq!(page.per_page, 10);
1166
1167 let transaction = create_test_transaction("single-tx", "test-relayer");
1169 storage.create(transaction).await?;
1170
1171 let query = PaginationQuery {
1173 page: 1,
1174 per_page: 10,
1175 };
1176 let page = storage.list_paginated(query).await?;
1177 assert_eq!(page.items.len(), 1);
1178 assert_eq!(page.total, 1);
1179 assert_eq!(page.page, 1);
1180 assert_eq!(page.per_page, 10);
1181
1182 let query = PaginationQuery {
1184 page: 3,
1185 per_page: 10,
1186 };
1187 let page = storage.list_paginated(query).await?;
1188 assert_eq!(page.items.len(), 0);
1189 assert_eq!(page.total, 1);
1190 assert_eq!(page.page, 3);
1191 assert_eq!(page.per_page, 10);
1192
1193 Ok(())
1194 }
1195
1196 #[tokio::test]
1197 async fn test_find_by_relayer_id_pagination() -> Result<()> {
1198 let storage = TransactionRepositoryStorage::new_in_memory();
1199
1200 for i in 1..=10 {
1202 let tx = create_test_transaction(&format!("tx-{}", i), "test-relayer");
1203 storage.create(tx).await?;
1204 }
1205
1206 let query = PaginationQuery {
1208 page: 1,
1209 per_page: 3,
1210 };
1211 let page1 = storage.find_by_relayer_id("test-relayer", query).await?;
1212 assert_eq!(page1.items.len(), 3);
1213 assert_eq!(page1.total, 10);
1214 assert_eq!(page1.page, 1);
1215 assert_eq!(page1.per_page, 3);
1216
1217 let query = PaginationQuery {
1219 page: 2,
1220 per_page: 3,
1221 };
1222 let page2 = storage.find_by_relayer_id("test-relayer", query).await?;
1223 assert_eq!(page2.items.len(), 3);
1224 assert_eq!(page2.total, 10);
1225 assert_eq!(page2.page, 2);
1226 assert_eq!(page2.per_page, 3);
1227
1228 Ok(())
1229 }
1230
1231 #[tokio::test]
1232 async fn test_find_by_multiple_statuses() -> Result<()> {
1233 let storage = TransactionRepositoryStorage::new_in_memory();
1234
1235 let tx1 =
1237 create_test_transaction_with_status("tx-1", "test-relayer", TransactionStatus::Pending);
1238 let tx2 =
1239 create_test_transaction_with_status("tx-2", "test-relayer", TransactionStatus::Sent);
1240 let tx3 = create_test_transaction_with_status(
1241 "tx-3",
1242 "test-relayer",
1243 TransactionStatus::Confirmed,
1244 );
1245 let tx4 =
1246 create_test_transaction_with_status("tx-4", "test-relayer", TransactionStatus::Failed);
1247
1248 storage.create(tx1).await?;
1249 storage.create(tx2).await?;
1250 storage.create(tx3).await?;
1251 storage.create(tx4).await?;
1252
1253 let statuses = vec![TransactionStatus::Pending, TransactionStatus::Sent];
1255 let result = storage.find_by_status("test-relayer", &statuses).await?;
1256
1257 assert_eq!(result.len(), 2);
1258
1259 let found_statuses: Vec<TransactionStatus> =
1261 result.iter().map(|tx| tx.status.clone()).collect();
1262 assert!(found_statuses.contains(&TransactionStatus::Pending));
1263 assert!(found_statuses.contains(&TransactionStatus::Sent));
1264
1265 Ok(())
1266 }
1267}