1use actix_web::web::ThinData;
15use apalis::prelude::{Attempt, Data, *};
16use chrono::{DateTime, Utc};
17use eyre::Result;
18use std::sync::Arc;
19use std::time::Duration;
20use tracing::{debug, error, info, instrument, warn};
21
22use crate::{
23 constants::{FINAL_TRANSACTION_STATUSES, WORKER_TRANSACTION_CLEANUP_RETRIES},
24 jobs::handle_result,
25 models::{
26 DefaultAppState, NetworkTransactionData, PaginationQuery, RelayerRepoModel,
27 TransactionRepoModel, TransactionStatus,
28 },
29 repositories::{Repository, TransactionDeleteRequest, TransactionRepository},
30 utils::DistributedLock,
31};
32
33const MAX_CONCURRENT_RELAYERS: usize = 10;
35
36const CLEANUP_PAGE_SIZE: u32 = 100;
38
39const DELETE_BATCH_SIZE: usize = 100;
42
43const MAX_CLEANUP_ITERATIONS_PER_STATUS: u32 = 1500;
47
48const CLEANUP_LOCK_NAME: &str = "transaction_cleanup";
51
52const CLEANUP_LOCK_TTL_SECS: u64 = 9 * 60;
67
68#[instrument(
85 level = "debug",
86 skip(job, data),
87 fields(
88 job_type = "transaction_cleanup",
89 attempt = %attempt.current(),
90 ),
91 err
92)]
93pub async fn transaction_cleanup_handler(
94 job: TransactionCleanupCronReminder,
95 data: Data<ThinData<DefaultAppState>>,
96 attempt: Attempt,
97) -> Result<(), Error> {
98 let result = handle_request(job, data, attempt.clone()).await;
99
100 handle_result(
101 result,
102 attempt,
103 "TransactionCleanup",
104 WORKER_TRANSACTION_CLEANUP_RETRIES,
105 )
106}
107
108#[derive(Default, Debug, Clone)]
110pub struct TransactionCleanupCronReminder();
111
112async fn handle_request(
126 _job: TransactionCleanupCronReminder,
127 data: Data<ThinData<DefaultAppState>>,
128 _attempt: Attempt,
129) -> Result<()> {
130 let transaction_repo = data.transaction_repository();
131
132 let lock_guard = if let Some((conn, prefix)) = transaction_repo.connection_info() {
138 let lock_key = format!("{prefix}:lock:{CLEANUP_LOCK_NAME}");
139 let lock =
140 DistributedLock::new(conn, &lock_key, Duration::from_secs(CLEANUP_LOCK_TTL_SECS));
141
142 match lock.try_acquire().await {
143 Ok(Some(guard)) => {
144 debug!(lock_key = %lock_key, "acquired distributed lock for transaction cleanup");
145 Some(guard)
146 }
147 Ok(None) => {
148 info!(lock_key = %lock_key, "transaction cleanup skipped - another instance is processing");
149 return Ok(());
150 }
151 Err(e) => {
152 warn!(
155 error = %e,
156 lock_key = %lock_key,
157 "failed to acquire distributed lock, proceeding with cleanup anyway"
158 );
159 None
160 }
161 }
162 } else {
163 debug!("in-memory repository detected, skipping distributed lock");
164 None
165 };
166
167 let now = Utc::now();
168 info!(
169 timestamp = %now.to_rfc3339(),
170 "executing transaction cleanup from storage"
171 );
172
173 let relayer_repo = data.relayer_repository();
174
175 let relayers = relayer_repo.list_all().await.map_err(|e| {
177 error!(
178 error = %e,
179 "failed to fetch relayers for cleanup"
180 );
181 eyre::eyre!("Failed to fetch relayers: {}", e)
182 })?;
183
184 info!(
185 relayer_count = relayers.len(),
186 "found relayers to process for cleanup"
187 );
188
189 let cleanup_results = process_relayers_in_batches(relayers, transaction_repo, now).await;
191
192 let result = report_cleanup_results(cleanup_results).await;
194
195 drop(lock_guard);
198
199 result
200}
201
202async fn process_relayers_in_batches(
212 relayers: Vec<RelayerRepoModel>,
213 transaction_repo: Arc<impl TransactionRepository>,
214 now: DateTime<Utc>,
215) -> Vec<RelayerCleanupResult> {
216 use futures::stream::{self, StreamExt};
217
218 let results: Vec<RelayerCleanupResult> = stream::iter(relayers)
220 .map(|relayer| {
221 let repo_clone = Arc::clone(&transaction_repo);
222 async move { process_single_relayer(relayer, repo_clone, now).await }
223 })
224 .buffer_unordered(MAX_CONCURRENT_RELAYERS)
225 .collect()
226 .await;
227
228 results
229}
230
231#[derive(Debug)]
233struct RelayerCleanupResult {
234 relayer_id: String,
235 cleaned_count: usize,
236 error: Option<String>,
237}
238
239async fn process_single_relayer(
252 relayer: RelayerRepoModel,
253 transaction_repo: Arc<impl TransactionRepository>,
254 now: DateTime<Utc>,
255) -> RelayerCleanupResult {
256 debug!(
257 relayer_id = %relayer.id,
258 "processing cleanup for relayer"
259 );
260
261 let mut total_cleaned = 0usize;
262
263 for status in FINAL_TRANSACTION_STATUSES {
264 match process_status_cleanup(&relayer.id, status, &transaction_repo, now).await {
265 Ok(cleaned) => total_cleaned += cleaned,
266 Err(e) => {
267 error!(
268 error = %e,
269 relayer_id = %relayer.id,
270 status = ?status,
271 "failed to cleanup transactions for status"
272 );
273 return RelayerCleanupResult {
274 relayer_id: relayer.id,
275 cleaned_count: total_cleaned,
276 error: Some(e.to_string()),
277 };
278 }
279 }
280 }
281
282 if total_cleaned > 0 {
283 info!(
284 cleaned_count = total_cleaned,
285 relayer_id = %relayer.id,
286 "cleaned up expired transactions"
287 );
288 }
289
290 RelayerCleanupResult {
291 relayer_id: relayer.id,
292 cleaned_count: total_cleaned,
293 error: None,
294 }
295}
296
297async fn process_status_cleanup(
312 relayer_id: &str,
313 status: &TransactionStatus,
314 transaction_repo: &Arc<impl TransactionRepository>,
315 now: DateTime<Utc>,
316) -> Result<usize> {
317 let mut current_page = 1u32;
318 let mut total_cleaned = 0usize;
319 let mut iterations = 0u32;
320
321 loop {
322 if iterations >= MAX_CLEANUP_ITERATIONS_PER_STATUS {
323 warn!(
324 relayer_id = %relayer_id,
325 status = ?status,
326 iterations,
327 total_cleaned,
328 "reached max cleanup iterations, stopping"
329 );
330 break;
331 }
332 iterations += 1;
333
334 let query = PaginationQuery {
335 page: current_page,
336 per_page: CLEANUP_PAGE_SIZE,
337 };
338
339 let page_result = transaction_repo
340 .find_by_status_paginated(relayer_id, &[status.clone()], query, true)
341 .await
342 .map_err(|e| {
343 eyre::eyre!(
344 "Failed to fetch {:?} transactions for relayer {}: {}",
345 status,
346 relayer_id,
347 e
348 )
349 })?;
350
351 if page_result.items.is_empty() {
352 break;
353 }
354
355 debug!(
356 page = current_page,
357 page_count = page_result.items.len(),
358 total = page_result.total,
359 relayer_id = %relayer_id,
360 status = ?status,
361 "processing page of transactions for cleanup"
362 );
363
364 let cleaned_count =
365 process_transactions_for_cleanup(page_result.items, transaction_repo, relayer_id, now)
366 .await;
367
368 total_cleaned += cleaned_count;
369
370 if cleaned_count == 0 {
371 current_page += 1;
374 }
375 }
379
380 if total_cleaned > 0 {
381 debug!(
382 total_cleaned,
383 relayer_id = %relayer_id,
384 status = ?status,
385 "status cleanup completed"
386 );
387 }
388
389 Ok(total_cleaned)
390}
391
392#[cfg(test)]
395async fn fetch_final_transactions_paginated(
396 relayer_id: &str,
397 transaction_repo: &Arc<impl TransactionRepository>,
398 query: PaginationQuery,
399) -> Result<crate::repositories::PaginatedResult<TransactionRepoModel>> {
400 transaction_repo
401 .find_by_status_paginated(relayer_id, FINAL_TRANSACTION_STATUSES, query, true)
402 .await
403 .map_err(|e| {
404 eyre::eyre!(
405 "Failed to fetch final transactions for relayer {}: {}",
406 relayer_id,
407 e
408 )
409 })
410}
411
412async fn process_transactions_for_cleanup(
427 transactions: Vec<TransactionRepoModel>,
428 transaction_repo: &Arc<impl TransactionRepository>,
429 relayer_id: &str,
430 now: DateTime<Utc>,
431) -> usize {
432 if transactions.is_empty() {
433 return 0;
434 }
435
436 debug!(
437 transaction_count = transactions.len(),
438 relayer_id = %relayer_id,
439 "processing transactions for cleanup"
440 );
441
442 let delete_requests: Vec<TransactionDeleteRequest> = transactions
445 .into_iter()
446 .filter(|tx| {
447 if !FINAL_TRANSACTION_STATUSES.contains(&tx.status) {
449 warn!(
450 tx_id = %tx.id,
451 status = ?tx.status,
452 "skipping transaction not in final state"
453 );
454 return false;
455 }
456 should_delete_transaction(tx, now)
458 })
459 .map(|tx| {
460 let nonce = extract_nonce_from_network_data(&tx.network_data);
462 TransactionDeleteRequest::new(tx.id, tx.relayer_id, nonce)
463 })
464 .collect();
465
466 if delete_requests.is_empty() {
467 debug!(
468 relayer_id = %relayer_id,
469 "no expired transactions found"
470 );
471 return 0;
472 }
473
474 let total_expired = delete_requests.len();
475 debug!(
476 expired_count = total_expired,
477 relayer_id = %relayer_id,
478 "found expired transactions to delete"
479 );
480
481 let mut total_deleted = 0;
483 let mut total_failed = 0;
484
485 for (batch_idx, batch) in delete_requests.chunks(DELETE_BATCH_SIZE).enumerate() {
486 let batch_requests: Vec<TransactionDeleteRequest> = batch.to_vec();
487 let batch_size = batch_requests.len();
488
489 debug!(
490 batch = batch_idx + 1,
491 batch_size = batch_size,
492 relayer_id = %relayer_id,
493 "processing delete batch"
494 );
495
496 match transaction_repo.delete_by_requests(batch_requests).await {
497 Ok(result) => {
498 if !result.failed.is_empty() {
499 for (id, error) in &result.failed {
500 error!(
501 tx_id = %id,
502 error = %error,
503 relayer_id = %relayer_id,
504 "failed to delete expired transaction in batch"
505 );
506 }
507 }
508
509 total_deleted += result.deleted_count;
510 total_failed += result.failed.len();
511 }
512 Err(e) => {
513 error!(
514 error = %e,
515 relayer_id = %relayer_id,
516 batch = batch_idx + 1,
517 batch_size = batch_size,
518 "batch delete failed completely"
519 );
520 total_failed += batch_size;
521 }
522 }
523 }
524
525 debug!(
526 total_deleted,
527 total_failed,
528 total_expired,
529 relayer_id = %relayer_id,
530 "batch delete completed"
531 );
532
533 total_deleted
534}
535
536fn extract_nonce_from_network_data(network_data: &NetworkTransactionData) -> Option<u64> {
539 match network_data {
540 NetworkTransactionData::Evm(evm_data) => evm_data.nonce,
541 _ => None,
542 }
543}
544
545fn should_delete_transaction(transaction: &TransactionRepoModel, now: DateTime<Utc>) -> bool {
554 transaction
555 .delete_at
556 .as_ref()
557 .and_then(|delete_at_str| DateTime::parse_from_rfc3339(delete_at_str).ok())
558 .map(|delete_at| {
559 let is_expired = now >= delete_at.with_timezone(&Utc);
560 if is_expired {
561 debug!(
562 tx_id = %transaction.id,
563 expired_at = %delete_at.to_rfc3339(),
564 "transaction is expired"
565 );
566 }
567 is_expired
568 })
569 .unwrap_or_else(|| {
570 if transaction.delete_at.is_some() {
571 warn!(
572 tx_id = %transaction.id,
573 "transaction has invalid delete_at timestamp"
574 );
575 }
576 false
577 })
578}
579
580async fn report_cleanup_results(cleanup_results: Vec<RelayerCleanupResult>) -> Result<()> {
588 let total_cleaned: usize = cleanup_results.iter().map(|r| r.cleaned_count).sum();
589 let total_errors = cleanup_results.iter().filter(|r| r.error.is_some()).count();
590 let total_relayers = cleanup_results.len();
591
592 for result in &cleanup_results {
594 if let Some(error) = &result.error {
595 error!(
596 relayer_id = %result.relayer_id,
597 error = %error,
598 "failed to cleanup transactions for relayer"
599 );
600 }
601 }
602
603 if total_errors > 0 {
604 warn!(
605 total_errors,
606 total_relayers, total_cleaned, "transaction cleanup completed with errors"
607 );
608
609 Err(eyre::eyre!(
612 "Cleanup completed with {} errors out of {} relayers",
613 total_errors,
614 total_relayers
615 ))
616 } else {
617 info!(
618 total_cleaned,
619 total_relayers, "transaction cleanup completed successfully"
620 );
621 Ok(())
622 }
623}
624
625#[cfg(test)]
626mod tests {
627
628 use super::*;
629 use crate::{
630 models::{
631 NetworkType, RelayerEvmPolicy, RelayerNetworkPolicy, RelayerRepoModel,
632 TransactionRepoModel, TransactionStatus,
633 },
634 repositories::{InMemoryTransactionRepository, Repository},
635 utils::mocks::mockutils::create_mock_transaction,
636 };
637 use chrono::{Duration, Utc};
638
639 fn create_test_transaction(
640 id: &str,
641 relayer_id: &str,
642 status: TransactionStatus,
643 delete_at: Option<String>,
644 ) -> TransactionRepoModel {
645 let mut tx = create_mock_transaction();
646 tx.id = id.to_string();
647 tx.relayer_id = relayer_id.to_string();
648 tx.status = status;
649 tx.delete_at = delete_at;
650 tx
651 }
652
653 #[tokio::test]
654 async fn test_should_delete_transaction_expired() {
655 let now = Utc::now();
656 let expired_delete_at = (now - Duration::hours(1)).to_rfc3339();
657
658 let transaction = create_test_transaction(
659 "test-tx",
660 "test-relayer",
661 TransactionStatus::Confirmed,
662 Some(expired_delete_at),
663 );
664
665 assert!(should_delete_transaction(&transaction, now));
666 }
667
668 #[tokio::test]
669 async fn test_should_delete_transaction_not_expired() {
670 let now = Utc::now();
671 let future_delete_at = (now + Duration::hours(1)).to_rfc3339();
672
673 let transaction = create_test_transaction(
674 "test-tx",
675 "test-relayer",
676 TransactionStatus::Confirmed,
677 Some(future_delete_at),
678 );
679
680 assert!(!should_delete_transaction(&transaction, now));
681 }
682
683 #[tokio::test]
684 async fn test_should_delete_transaction_no_delete_at() {
685 let now = Utc::now();
686
687 let transaction = create_test_transaction(
688 "test-tx",
689 "test-relayer",
690 TransactionStatus::Confirmed,
691 None,
692 );
693
694 assert!(!should_delete_transaction(&transaction, now));
695 }
696
697 #[tokio::test]
698 async fn test_should_delete_transaction_invalid_timestamp() {
699 let now = Utc::now();
700
701 let transaction = create_test_transaction(
702 "test-tx",
703 "test-relayer",
704 TransactionStatus::Confirmed,
705 Some("invalid-timestamp".to_string()),
706 );
707
708 assert!(!should_delete_transaction(&transaction, now));
709 }
710
711 #[tokio::test]
712 async fn test_process_transactions_for_cleanup_parallel() {
713 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
714 let relayer_id = "test-relayer";
715 let now = Utc::now();
716
717 let expired_delete_at = (now - Duration::hours(1)).to_rfc3339();
719 let future_delete_at = (now + Duration::hours(1)).to_rfc3339();
720
721 let expired_tx = create_test_transaction(
722 "expired-tx",
723 relayer_id,
724 TransactionStatus::Confirmed,
725 Some(expired_delete_at),
726 );
727 let future_tx = create_test_transaction(
728 "future-tx",
729 relayer_id,
730 TransactionStatus::Failed,
731 Some(future_delete_at),
732 );
733 let no_delete_tx = create_test_transaction(
734 "no-delete-tx",
735 relayer_id,
736 TransactionStatus::Canceled,
737 None,
738 );
739
740 transaction_repo.create(expired_tx.clone()).await.unwrap();
742 transaction_repo.create(future_tx.clone()).await.unwrap();
743 transaction_repo.create(no_delete_tx.clone()).await.unwrap();
744
745 let transactions = vec![expired_tx, future_tx, no_delete_tx];
746
747 let cleaned_count =
749 process_transactions_for_cleanup(transactions, &transaction_repo, relayer_id, now)
750 .await;
751
752 assert_eq!(cleaned_count, 1);
754
755 assert!(transaction_repo
757 .get_by_id("expired-tx".to_string())
758 .await
759 .is_err());
760
761 assert!(transaction_repo
763 .get_by_id("future-tx".to_string())
764 .await
765 .is_ok());
766 assert!(transaction_repo
767 .get_by_id("no-delete-tx".to_string())
768 .await
769 .is_ok());
770 }
771
772 #[tokio::test]
773 async fn test_batch_delete_expired_transactions() {
774 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
775 let relayer_id = "test-relayer";
776 let now = Utc::now();
777
778 let expired_delete_at = (now - Duration::hours(1)).to_rfc3339();
780
781 for i in 0..5 {
782 let tx = create_test_transaction(
783 &format!("expired-tx-{}", i),
784 relayer_id,
785 TransactionStatus::Confirmed,
786 Some(expired_delete_at.clone()),
787 );
788 transaction_repo.create(tx).await.unwrap();
789 }
790
791 assert_eq!(transaction_repo.count().await.unwrap(), 5);
793
794 let ids: Vec<String> = (0..5).map(|i| format!("expired-tx-{}", i)).collect();
796 let result = transaction_repo.delete_by_ids(ids).await.unwrap();
797
798 assert_eq!(result.deleted_count, 5);
799 assert!(result.failed.is_empty());
800
801 assert_eq!(transaction_repo.count().await.unwrap(), 0);
803 }
804
805 #[tokio::test]
806 async fn test_batch_delete_with_nonexistent_ids() {
807 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
808 let relayer_id = "test-relayer";
809
810 let tx = create_test_transaction(
812 "existing-tx",
813 relayer_id,
814 TransactionStatus::Confirmed,
815 Some(Utc::now().to_rfc3339()),
816 );
817 transaction_repo.create(tx).await.unwrap();
818
819 let ids = vec![
821 "existing-tx".to_string(),
822 "nonexistent-1".to_string(),
823 "nonexistent-2".to_string(),
824 ];
825 let result = transaction_repo.delete_by_ids(ids).await.unwrap();
826
827 assert_eq!(result.deleted_count, 1);
829 assert_eq!(result.failed.len(), 2);
830
831 assert!(transaction_repo
833 .get_by_id("existing-tx".to_string())
834 .await
835 .is_err());
836 }
837
838 #[tokio::test]
839 async fn test_process_transactions_skips_non_final_status() {
840 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
841 let relayer_id = "test-relayer";
842 let now = Utc::now();
843
844 let expired_delete_at = (now - Duration::hours(1)).to_rfc3339();
846 let pending_tx = create_test_transaction(
847 "pending-tx",
848 relayer_id,
849 TransactionStatus::Pending, Some(expired_delete_at),
851 );
852 transaction_repo.create(pending_tx.clone()).await.unwrap();
853
854 let transactions = vec![pending_tx];
855
856 let cleaned_count =
858 process_transactions_for_cleanup(transactions, &transaction_repo, relayer_id, now)
859 .await;
860
861 assert_eq!(cleaned_count, 0);
863
864 assert!(transaction_repo
866 .get_by_id("pending-tx".to_string())
867 .await
868 .is_ok());
869 }
870
871 #[tokio::test]
872 async fn test_fetch_final_transactions_paginated() {
873 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
874 let relayer_id = "test-relayer";
875
876 let confirmed_tx = create_test_transaction(
878 "confirmed-tx",
879 relayer_id,
880 TransactionStatus::Confirmed,
881 None,
882 );
883 let pending_tx =
884 create_test_transaction("pending-tx", relayer_id, TransactionStatus::Pending, None);
885 let failed_tx =
886 create_test_transaction("failed-tx", relayer_id, TransactionStatus::Failed, None);
887
888 transaction_repo.create(confirmed_tx).await.unwrap();
890 transaction_repo.create(pending_tx).await.unwrap();
891 transaction_repo.create(failed_tx).await.unwrap();
892
893 let query = PaginationQuery {
895 page: 1,
896 per_page: 10,
897 };
898 let result = fetch_final_transactions_paginated(relayer_id, &transaction_repo, query)
899 .await
900 .unwrap();
901
902 assert_eq!(result.total, 2);
904 assert_eq!(result.items.len(), 2);
905 let final_ids: Vec<&String> = result.items.iter().map(|tx| &tx.id).collect();
906 assert!(final_ids.contains(&&"confirmed-tx".to_string()));
907 assert!(final_ids.contains(&&"failed-tx".to_string()));
908 assert!(!final_ids.contains(&&"pending-tx".to_string()));
909 }
910
911 #[tokio::test]
912 async fn test_report_cleanup_results_success() {
913 let results = vec![
914 RelayerCleanupResult {
915 relayer_id: "relayer-1".to_string(),
916 cleaned_count: 2,
917 error: None,
918 },
919 RelayerCleanupResult {
920 relayer_id: "relayer-2".to_string(),
921 cleaned_count: 1,
922 error: None,
923 },
924 ];
925
926 let result = report_cleanup_results(results).await;
927 assert!(result.is_ok());
928 }
929
930 #[tokio::test]
931 async fn test_report_cleanup_results_with_errors() {
932 let results = vec![
933 RelayerCleanupResult {
934 relayer_id: "relayer-1".to_string(),
935 cleaned_count: 2,
936 error: None,
937 },
938 RelayerCleanupResult {
939 relayer_id: "relayer-2".to_string(),
940 cleaned_count: 0,
941 error: Some("Database error".to_string()),
942 },
943 ];
944
945 let result = report_cleanup_results(results).await;
946 assert!(result.is_err());
947 }
948
949 #[tokio::test]
950 async fn test_process_single_relayer_success() {
951 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
952 let relayer = RelayerRepoModel {
953 id: "test-relayer".to_string(),
954 name: "Test Relayer".to_string(),
955 network: "ethereum".to_string(),
956 paused: false,
957 network_type: NetworkType::Evm,
958 signer_id: "test-signer".to_string(),
959 policies: RelayerNetworkPolicy::Evm(RelayerEvmPolicy::default()),
960 address: "0x1234567890123456789012345678901234567890".to_string(),
961 notification_id: None,
962 system_disabled: false,
963 custom_rpc_urls: None,
964 ..Default::default()
965 };
966 let now = Utc::now();
967
968 let expired_tx = create_test_transaction(
970 "expired-tx",
971 &relayer.id,
972 TransactionStatus::Confirmed,
973 Some((now - Duration::hours(1)).to_rfc3339()),
974 );
975 let future_tx = create_test_transaction(
976 "future-tx",
977 &relayer.id,
978 TransactionStatus::Failed,
979 Some((now + Duration::hours(1)).to_rfc3339()),
980 );
981
982 transaction_repo.create(expired_tx).await.unwrap();
983 transaction_repo.create(future_tx).await.unwrap();
984
985 let result = process_single_relayer(relayer.clone(), transaction_repo.clone(), now).await;
986
987 assert_eq!(result.relayer_id, relayer.id);
988 assert_eq!(result.cleaned_count, 1);
989 assert!(result.error.is_none());
990 }
991
992 #[tokio::test]
993 async fn test_process_single_relayer_no_transactions() {
994 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
996 let relayer = RelayerRepoModel {
997 id: "empty-relayer".to_string(),
998 name: "Empty Relayer".to_string(),
999 network: "ethereum".to_string(),
1000 paused: false,
1001 network_type: NetworkType::Evm,
1002 signer_id: "test-signer".to_string(),
1003 policies: RelayerNetworkPolicy::Evm(RelayerEvmPolicy::default()),
1004 address: "0x1234567890123456789012345678901234567890".to_string(),
1005 notification_id: None,
1006 system_disabled: false,
1007 custom_rpc_urls: None,
1008 ..Default::default()
1009 };
1010 let now = Utc::now();
1011
1012 let result = process_single_relayer(relayer.clone(), transaction_repo, now).await;
1014
1015 assert_eq!(result.relayer_id, relayer.id);
1016 assert_eq!(result.cleaned_count, 0);
1017 assert!(result.error.is_none()); }
1019
1020 #[tokio::test]
1021 async fn test_process_transactions_with_empty_list() {
1022 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
1023 let relayer_id = "test-relayer";
1024 let now = Utc::now();
1025 let transactions = vec![];
1026
1027 let cleaned_count =
1028 process_transactions_for_cleanup(transactions, &transaction_repo, relayer_id, now)
1029 .await;
1030
1031 assert_eq!(cleaned_count, 0);
1032 }
1033
1034 #[tokio::test]
1035 async fn test_process_transactions_with_no_expired() {
1036 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
1037 let relayer_id = "test-relayer";
1038 let now = Utc::now();
1039
1040 let future_tx1 = create_test_transaction(
1042 "future-tx-1",
1043 relayer_id,
1044 TransactionStatus::Confirmed,
1045 Some((now + Duration::hours(1)).to_rfc3339()),
1046 );
1047 let future_tx2 = create_test_transaction(
1048 "future-tx-2",
1049 relayer_id,
1050 TransactionStatus::Failed,
1051 Some((now + Duration::hours(2)).to_rfc3339()),
1052 );
1053 let no_delete_tx = create_test_transaction(
1054 "no-delete-tx",
1055 relayer_id,
1056 TransactionStatus::Canceled,
1057 None,
1058 );
1059
1060 let transactions = vec![future_tx1, future_tx2, no_delete_tx];
1061
1062 let cleaned_count =
1063 process_transactions_for_cleanup(transactions, &transaction_repo, relayer_id, now)
1064 .await;
1065
1066 assert_eq!(cleaned_count, 0);
1067 }
1068
1069 #[tokio::test]
1070 async fn test_should_delete_transaction_exactly_at_expiry_time() {
1071 let now = Utc::now();
1072 let exact_expiry_time = now.to_rfc3339();
1073
1074 let transaction = create_test_transaction(
1075 "test-tx",
1076 "test-relayer",
1077 TransactionStatus::Confirmed,
1078 Some(exact_expiry_time),
1079 );
1080
1081 assert!(should_delete_transaction(&transaction, now));
1083 }
1084
1085 #[tokio::test]
1086 async fn test_parallel_processing_with_mixed_results() {
1087 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
1088 let relayer_id = "test-relayer";
1089 let now = Utc::now();
1090
1091 let expired_tx1 = create_test_transaction(
1093 "expired-tx-1",
1094 relayer_id,
1095 TransactionStatus::Confirmed,
1096 Some((now - Duration::hours(1)).to_rfc3339()),
1097 );
1098 let expired_tx2 = create_test_transaction(
1099 "expired-tx-2",
1100 relayer_id,
1101 TransactionStatus::Failed,
1102 Some((now - Duration::hours(2)).to_rfc3339()),
1103 );
1104 let expired_tx3 = create_test_transaction(
1105 "expired-tx-3",
1106 relayer_id,
1107 TransactionStatus::Canceled,
1108 Some((now - Duration::hours(3)).to_rfc3339()),
1109 );
1110
1111 transaction_repo.create(expired_tx1.clone()).await.unwrap();
1113 transaction_repo.create(expired_tx2.clone()).await.unwrap();
1114 let transactions = vec![expired_tx1, expired_tx2, expired_tx3];
1117
1118 let cleaned_count =
1119 process_transactions_for_cleanup(transactions, &transaction_repo, relayer_id, now)
1120 .await;
1121
1122 assert_eq!(cleaned_count, 2);
1124 }
1125
1126 #[tokio::test]
1127 async fn test_report_cleanup_results_empty() {
1128 let results = vec![];
1129 let result = report_cleanup_results(results).await;
1130 assert!(result.is_ok());
1131 }
1132
1133 #[tokio::test]
1134 async fn test_fetch_final_transactions_paginated_with_mixed_statuses() {
1135 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
1136 let relayer_id = "test-relayer";
1137
1138 let confirmed_tx = create_test_transaction(
1140 "confirmed-tx",
1141 relayer_id,
1142 TransactionStatus::Confirmed,
1143 None,
1144 );
1145 let failed_tx =
1146 create_test_transaction("failed-tx", relayer_id, TransactionStatus::Failed, None);
1147 let canceled_tx =
1148 create_test_transaction("canceled-tx", relayer_id, TransactionStatus::Canceled, None);
1149 let expired_tx =
1150 create_test_transaction("expired-tx", relayer_id, TransactionStatus::Expired, None);
1151 let pending_tx =
1152 create_test_transaction("pending-tx", relayer_id, TransactionStatus::Pending, None);
1153 let sent_tx = create_test_transaction("sent-tx", relayer_id, TransactionStatus::Sent, None);
1154
1155 transaction_repo.create(confirmed_tx).await.unwrap();
1157 transaction_repo.create(failed_tx).await.unwrap();
1158 transaction_repo.create(canceled_tx).await.unwrap();
1159 transaction_repo.create(expired_tx).await.unwrap();
1160 transaction_repo.create(pending_tx).await.unwrap();
1161 transaction_repo.create(sent_tx).await.unwrap();
1162
1163 let query = PaginationQuery {
1165 page: 1,
1166 per_page: 10,
1167 };
1168 let result = fetch_final_transactions_paginated(relayer_id, &transaction_repo, query)
1169 .await
1170 .unwrap();
1171
1172 assert_eq!(result.total, 4);
1174 assert_eq!(result.items.len(), 4);
1175 let final_ids: Vec<&String> = result.items.iter().map(|tx| &tx.id).collect();
1176 assert!(final_ids.contains(&&"confirmed-tx".to_string()));
1177 assert!(final_ids.contains(&&"failed-tx".to_string()));
1178 assert!(final_ids.contains(&&"canceled-tx".to_string()));
1179 assert!(final_ids.contains(&&"expired-tx".to_string()));
1180 assert!(!final_ids.contains(&&"pending-tx".to_string()));
1181 assert!(!final_ids.contains(&&"sent-tx".to_string()));
1182 }
1183
1184 #[tokio::test]
1185 async fn test_fetch_final_transactions_paginated_pagination() {
1186 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
1187 let relayer_id = "test-relayer";
1188
1189 for i in 1..=5 {
1191 let mut tx = create_test_transaction(
1192 &format!("tx-{}", i),
1193 relayer_id,
1194 TransactionStatus::Confirmed,
1195 None,
1196 );
1197 tx.created_at = format!("2025-01-27T{:02}:00:00.000000+00:00", 10 + i);
1198 transaction_repo.create(tx).await.unwrap();
1199 }
1200
1201 let query = PaginationQuery {
1203 page: 1,
1204 per_page: 2,
1205 };
1206 let result = fetch_final_transactions_paginated(relayer_id, &transaction_repo, query)
1207 .await
1208 .unwrap();
1209
1210 assert_eq!(result.total, 5);
1211 assert_eq!(result.items.len(), 2);
1212 assert_eq!(result.page, 1);
1213
1214 let query = PaginationQuery {
1216 page: 2,
1217 per_page: 2,
1218 };
1219 let result = fetch_final_transactions_paginated(relayer_id, &transaction_repo, query)
1220 .await
1221 .unwrap();
1222
1223 assert_eq!(result.total, 5);
1224 assert_eq!(result.items.len(), 2);
1225 assert_eq!(result.page, 2);
1226
1227 let query = PaginationQuery {
1229 page: 3,
1230 per_page: 2,
1231 };
1232 let result = fetch_final_transactions_paginated(relayer_id, &transaction_repo, query)
1233 .await
1234 .unwrap();
1235
1236 assert_eq!(result.total, 5);
1237 assert_eq!(result.items.len(), 1);
1238 assert_eq!(result.page, 3);
1239 }
1240
1241 #[tokio::test]
1242 async fn test_process_status_cleanup_deletes_expired() {
1243 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
1244 let relayer_id = "test-relayer";
1245 let now = Utc::now();
1246
1247 let expired_tx = create_test_transaction(
1249 "expired-tx",
1250 relayer_id,
1251 TransactionStatus::Confirmed,
1252 Some((now - Duration::hours(1)).to_rfc3339()),
1253 );
1254 let future_tx = create_test_transaction(
1255 "future-tx",
1256 relayer_id,
1257 TransactionStatus::Confirmed,
1258 Some((now + Duration::hours(1)).to_rfc3339()),
1259 );
1260
1261 transaction_repo.create(expired_tx).await.unwrap();
1262 transaction_repo.create(future_tx).await.unwrap();
1263
1264 let cleaned = process_status_cleanup(
1265 relayer_id,
1266 &TransactionStatus::Confirmed,
1267 &transaction_repo,
1268 now,
1269 )
1270 .await
1271 .unwrap();
1272
1273 assert_eq!(cleaned, 1);
1274
1275 assert!(transaction_repo
1277 .get_by_id("expired-tx".to_string())
1278 .await
1279 .is_err());
1280 assert!(transaction_repo
1281 .get_by_id("future-tx".to_string())
1282 .await
1283 .is_ok());
1284 }
1285
1286 #[tokio::test]
1287 async fn test_process_status_cleanup_no_transactions() {
1288 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
1289 let relayer_id = "test-relayer";
1290 let now = Utc::now();
1291
1292 let cleaned = process_status_cleanup(
1293 relayer_id,
1294 &TransactionStatus::Confirmed,
1295 &transaction_repo,
1296 now,
1297 )
1298 .await
1299 .unwrap();
1300
1301 assert_eq!(cleaned, 0);
1302 }
1303
1304 #[tokio::test]
1305 async fn test_process_status_cleanup_skips_other_statuses() {
1306 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
1307 let relayer_id = "test-relayer";
1308 let now = Utc::now();
1309
1310 let tx = create_test_transaction(
1312 "failed-tx",
1313 relayer_id,
1314 TransactionStatus::Failed,
1315 Some((now - Duration::hours(1)).to_rfc3339()),
1316 );
1317 transaction_repo.create(tx).await.unwrap();
1318
1319 let cleaned = process_status_cleanup(
1321 relayer_id,
1322 &TransactionStatus::Confirmed,
1323 &transaction_repo,
1324 now,
1325 )
1326 .await
1327 .unwrap();
1328
1329 assert_eq!(cleaned, 0);
1330 assert!(transaction_repo
1331 .get_by_id("failed-tx".to_string())
1332 .await
1333 .is_ok());
1334 }
1335
1336 #[tokio::test]
1337 async fn test_process_single_relayer_processes_all_final_statuses() {
1338 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
1339 let relayer = RelayerRepoModel {
1340 id: "test-relayer".to_string(),
1341 name: "Test Relayer".to_string(),
1342 network: "ethereum".to_string(),
1343 paused: false,
1344 network_type: NetworkType::Evm,
1345 signer_id: "test-signer".to_string(),
1346 policies: RelayerNetworkPolicy::Evm(RelayerEvmPolicy::default()),
1347 address: "0x1234567890123456789012345678901234567890".to_string(),
1348 notification_id: None,
1349 system_disabled: false,
1350 custom_rpc_urls: None,
1351 ..Default::default()
1352 };
1353 let now = Utc::now();
1354 let expired_at = (now - Duration::hours(1)).to_rfc3339();
1355
1356 for (i, status) in [
1358 TransactionStatus::Confirmed,
1359 TransactionStatus::Failed,
1360 TransactionStatus::Canceled,
1361 TransactionStatus::Expired,
1362 ]
1363 .iter()
1364 .enumerate()
1365 {
1366 let tx = create_test_transaction(
1367 &format!("tx-{}", i),
1368 &relayer.id,
1369 status.clone(),
1370 Some(expired_at.clone()),
1371 );
1372 transaction_repo.create(tx).await.unwrap();
1373 }
1374
1375 let result = process_single_relayer(relayer.clone(), transaction_repo.clone(), now).await;
1376
1377 assert_eq!(result.relayer_id, relayer.id);
1378 assert_eq!(result.cleaned_count, 4);
1379 assert!(result.error.is_none());
1380
1381 assert_eq!(transaction_repo.count().await.unwrap(), 0);
1383 }
1384}