openzeppelin_relayer/jobs/handlers/
transaction_cleanup_handler.rs

1//! Transaction cleanup worker implementation.
2//!
3//! This module implements the transaction cleanup worker that processes
4//! expired transactions marked for deletion. It runs as a cron job to
5//! automatically clean up transactions that have passed their delete_at timestamp.
6//!
7//! ## Distributed Lock
8//!
9//! Since this runs on multiple service instances simultaneously (each with its own
10//! CronStream), a distributed lock is used to ensure only one instance processes
11//! the cleanup at a time. The lock has a 9-minute TTL (the cron runs every 10 minutes),
12//! ensuring the lock expires before the next scheduled run.
13
14use actix_web::web::ThinData;
15use apalis::prelude::{Attempt, Data, *};
16use chrono::{DateTime, Utc};
17use eyre::Result;
18use std::sync::Arc;
19use std::time::Duration;
20use tracing::{debug, error, info, instrument, warn};
21
22use crate::{
23    constants::{FINAL_TRANSACTION_STATUSES, WORKER_TRANSACTION_CLEANUP_RETRIES},
24    jobs::handle_result,
25    models::{
26        DefaultAppState, NetworkTransactionData, PaginationQuery, RelayerRepoModel,
27        TransactionRepoModel, TransactionStatus,
28    },
29    repositories::{Repository, TransactionDeleteRequest, TransactionRepository},
30    utils::DistributedLock,
31};
32
33/// Maximum number of relayers to process concurrently
34const MAX_CONCURRENT_RELAYERS: usize = 10;
35
36/// Number of transactions to fetch per page during cleanup
37const CLEANUP_PAGE_SIZE: u32 = 100;
38
39/// Maximum number of transactions to delete in a single batch operation.
40/// This prevents overwhelming Redis with very large pipelines.
41const DELETE_BATCH_SIZE: usize = 100;
42
43/// Maximum page iterations per status before stopping.
44/// Prevents unbounded cleanup from exceeding the lock TTL.
45/// With CLEANUP_PAGE_SIZE=100, allows up to 150,000 transactions per status per run.
46const MAX_CLEANUP_ITERATIONS_PER_STATUS: u32 = 1500;
47
48/// Distributed lock name for transaction cleanup.
49/// Only one instance across the cluster should run cleanup at a time.
50const CLEANUP_LOCK_NAME: &str = "transaction_cleanup";
51
52/// TTL for the distributed lock (9 minutes).
53///
54/// This value should be:
55/// 1. Greater than the worst-case cleanup runtime to prevent concurrent execution
56/// 2. Less than the cron interval (10 minutes) to ensure availability for the next run
57///
58/// If cleanup consistently takes longer than this TTL, another instance may acquire
59/// the lock and run concurrently. In that case, consider:
60/// - Increasing the TTL (and cron interval accordingly)
61/// - Optimizing the cleanup process
62/// - Implementing lock refresh during long-running operations
63///
64/// The lock is automatically released when processing completes (via Drop),
65/// so the TTL primarily serves as a safety net for crashed instances.
66const CLEANUP_LOCK_TTL_SECS: u64 = 9 * 60;
67
68/// Handles periodic transaction cleanup jobs from the queue.
69///
70/// This function processes expired transactions by:
71/// 1. Fetching all relayers from the system
72/// 2. For each relayer, finding transactions with final statuses
73/// 3. Checking if their delete_at timestamp has passed
74/// 4. Validating transactions are in final states before deletion
75/// 5. Deleting transactions that have expired (in parallel)
76///
77/// # Arguments
78/// * `job` - The cron reminder job triggering the cleanup
79/// * `data` - Application state containing repositories
80/// * `attempt` - Current attempt number for retry logic
81///
82/// # Returns
83/// * `Result<(), Error>` - Success or failure of cleanup processing
84#[instrument(
85    level = "debug",
86    skip(job, data),
87    fields(
88        job_type = "transaction_cleanup",
89        attempt = %attempt.current(),
90    ),
91    err
92)]
93pub async fn transaction_cleanup_handler(
94    job: TransactionCleanupCronReminder,
95    data: Data<ThinData<DefaultAppState>>,
96    attempt: Attempt,
97) -> Result<(), Error> {
98    let result = handle_request(job, data, attempt.clone()).await;
99
100    handle_result(
101        result,
102        attempt,
103        "TransactionCleanup",
104        WORKER_TRANSACTION_CLEANUP_RETRIES,
105    )
106}
107
108/// Represents a cron reminder job for triggering cleanup operations.
109#[derive(Default, Debug, Clone)]
110pub struct TransactionCleanupCronReminder();
111
112/// Handles the actual transaction cleanup request logic.
113///
114/// This function first attempts to acquire a distributed lock to ensure only
115/// one instance processes cleanup at a time. If the lock is already held by
116/// another instance, this returns early without doing any work.
117///
118/// # Arguments
119/// * `_job` - The cron reminder job (currently unused)
120/// * `data` - Application state containing repositories
121/// * `_attempt` - Current attempt number (currently unused)
122///
123/// # Returns
124/// * `Result<()>` - Success or failure of the cleanup operation
125async fn handle_request(
126    _job: TransactionCleanupCronReminder,
127    data: Data<ThinData<DefaultAppState>>,
128    _attempt: Attempt,
129) -> Result<()> {
130    let transaction_repo = data.transaction_repository();
131
132    // Attempt to acquire distributed lock to prevent multiple instances from
133    // running cleanup simultaneously. This is necessary because CronStream
134    // is local to each instance, not distributed via Redis queues.
135    // The lock key includes the relayer prefix to support multi-tenant deployments.
136    // Key format: {prefix}:lock:{name} (e.g., "oz-relayer:lock:transaction_cleanup")
137    let lock_guard = if let Some((conn, prefix)) = transaction_repo.connection_info() {
138        let lock_key = format!("{prefix}:lock:{CLEANUP_LOCK_NAME}");
139        let lock =
140            DistributedLock::new(conn, &lock_key, Duration::from_secs(CLEANUP_LOCK_TTL_SECS));
141
142        match lock.try_acquire().await {
143            Ok(Some(guard)) => {
144                debug!(lock_key = %lock_key, "acquired distributed lock for transaction cleanup");
145                Some(guard)
146            }
147            Ok(None) => {
148                info!(lock_key = %lock_key, "transaction cleanup skipped - another instance is processing");
149                return Ok(());
150            }
151            Err(e) => {
152                // If we can't communicate with Redis for locking, log warning but proceed
153                // This maintains backwards compatibility and handles Redis connection issues
154                warn!(
155                    error = %e,
156                    lock_key = %lock_key,
157                    "failed to acquire distributed lock, proceeding with cleanup anyway"
158                );
159                None
160            }
161        }
162    } else {
163        debug!("in-memory repository detected, skipping distributed lock");
164        None
165    };
166
167    let now = Utc::now();
168    info!(
169        timestamp = %now.to_rfc3339(),
170        "executing transaction cleanup from storage"
171    );
172
173    let relayer_repo = data.relayer_repository();
174
175    // Fetch all relayers
176    let relayers = relayer_repo.list_all().await.map_err(|e| {
177        error!(
178            error = %e,
179            "failed to fetch relayers for cleanup"
180        );
181        eyre::eyre!("Failed to fetch relayers: {}", e)
182    })?;
183
184    info!(
185        relayer_count = relayers.len(),
186        "found relayers to process for cleanup"
187    );
188
189    // Process relayers in parallel batches
190    let cleanup_results = process_relayers_in_batches(relayers, transaction_repo, now).await;
191
192    // Aggregate and report results
193    let result = report_cleanup_results(cleanup_results).await;
194
195    // Lock guard is automatically released when dropped (via Drop impl).
196    // This happens regardless of whether we exit normally or via early return/error.
197    drop(lock_guard);
198
199    result
200}
201
202/// Processes multiple relayers in parallel batches for cleanup.
203///
204/// # Arguments
205/// * `relayers` - List of relayers to process
206/// * `transaction_repo` - Reference to the transaction repository
207/// * `now` - Current UTC timestamp for comparison
208///
209/// # Returns
210/// * `Vec<RelayerCleanupResult>` - Results from processing each relayer
211async fn process_relayers_in_batches(
212    relayers: Vec<RelayerRepoModel>,
213    transaction_repo: Arc<impl TransactionRepository>,
214    now: DateTime<Utc>,
215) -> Vec<RelayerCleanupResult> {
216    use futures::stream::{self, StreamExt};
217
218    // Process relayers with limited concurrency to avoid overwhelming the system
219    let results: Vec<RelayerCleanupResult> = stream::iter(relayers)
220        .map(|relayer| {
221            let repo_clone = Arc::clone(&transaction_repo);
222            async move { process_single_relayer(relayer, repo_clone, now).await }
223        })
224        .buffer_unordered(MAX_CONCURRENT_RELAYERS)
225        .collect()
226        .await;
227
228    results
229}
230
231/// Result of processing a single relayer's transactions.
232#[derive(Debug)]
233struct RelayerCleanupResult {
234    relayer_id: String,
235    cleaned_count: usize,
236    error: Option<String>,
237}
238
239/// Processes cleanup for a single relayer by iterating over each final status independently.
240///
241/// Each status is processed separately to use efficient single-key Redis ZRANGE pagination
242/// instead of the multi-status merge path which fetches all IDs into memory.
243///
244/// # Arguments
245/// * `relayer` - The relayer to process
246/// * `transaction_repo` - Reference to the transaction repository
247/// * `now` - Current UTC timestamp for comparison
248///
249/// # Returns
250/// * `RelayerCleanupResult` - Result of processing this relayer
251async fn process_single_relayer(
252    relayer: RelayerRepoModel,
253    transaction_repo: Arc<impl TransactionRepository>,
254    now: DateTime<Utc>,
255) -> RelayerCleanupResult {
256    debug!(
257        relayer_id = %relayer.id,
258        "processing cleanup for relayer"
259    );
260
261    let mut total_cleaned = 0usize;
262
263    for status in FINAL_TRANSACTION_STATUSES {
264        match process_status_cleanup(&relayer.id, status, &transaction_repo, now).await {
265            Ok(cleaned) => total_cleaned += cleaned,
266            Err(e) => {
267                error!(
268                    error = %e,
269                    relayer_id = %relayer.id,
270                    status = ?status,
271                    "failed to cleanup transactions for status"
272                );
273                return RelayerCleanupResult {
274                    relayer_id: relayer.id,
275                    cleaned_count: total_cleaned,
276                    error: Some(e.to_string()),
277                };
278            }
279        }
280    }
281
282    if total_cleaned > 0 {
283        info!(
284            cleaned_count = total_cleaned,
285            relayer_id = %relayer.id,
286            "cleaned up expired transactions"
287        );
288    }
289
290    RelayerCleanupResult {
291        relayer_id: relayer.id,
292        cleaned_count: total_cleaned,
293        error: None,
294    }
295}
296
297/// Processes cleanup for a single status of a single relayer.
298///
299/// Uses stable pagination: when items are deleted, the same page is re-queried
300/// because deletions shift subsequent items into the current page's range.
301/// Only advances the page when no deletions occurred (items remain in place).
302///
303/// # Arguments
304/// * `relayer_id` - ID of the relayer
305/// * `status` - The transaction status to process
306/// * `transaction_repo` - Reference to the transaction repository
307/// * `now` - Current UTC timestamp for comparison
308///
309/// # Returns
310/// * `Result<usize>` - Number of transactions cleaned up for this status
311async fn process_status_cleanup(
312    relayer_id: &str,
313    status: &TransactionStatus,
314    transaction_repo: &Arc<impl TransactionRepository>,
315    now: DateTime<Utc>,
316) -> Result<usize> {
317    let mut current_page = 1u32;
318    let mut total_cleaned = 0usize;
319    let mut iterations = 0u32;
320
321    loop {
322        if iterations >= MAX_CLEANUP_ITERATIONS_PER_STATUS {
323            warn!(
324                relayer_id = %relayer_id,
325                status = ?status,
326                iterations,
327                total_cleaned,
328                "reached max cleanup iterations, stopping"
329            );
330            break;
331        }
332        iterations += 1;
333
334        let query = PaginationQuery {
335            page: current_page,
336            per_page: CLEANUP_PAGE_SIZE,
337        };
338
339        let page_result = transaction_repo
340            .find_by_status_paginated(relayer_id, &[status.clone()], query, true)
341            .await
342            .map_err(|e| {
343                eyre::eyre!(
344                    "Failed to fetch {:?} transactions for relayer {}: {}",
345                    status,
346                    relayer_id,
347                    e
348                )
349            })?;
350
351        if page_result.items.is_empty() {
352            break;
353        }
354
355        debug!(
356            page = current_page,
357            page_count = page_result.items.len(),
358            total = page_result.total,
359            relayer_id = %relayer_id,
360            status = ?status,
361            "processing page of transactions for cleanup"
362        );
363
364        let cleaned_count =
365            process_transactions_for_cleanup(page_result.items, transaction_repo, relayer_id, now)
366                .await;
367
368        total_cleaned += cleaned_count;
369
370        if cleaned_count == 0 {
371            // No items were deleted on this page, so items remain in place.
372            // Advance to the next page to check further items.
373            current_page += 1;
374        }
375        // When items were deleted, stay on the same page: deletions shift
376        // subsequent items into the current page range, so re-querying
377        // the same page picks up previously-unreachable items.
378    }
379
380    if total_cleaned > 0 {
381        debug!(
382            total_cleaned,
383            relayer_id = %relayer_id,
384            status = ?status,
385            "status cleanup completed"
386        );
387    }
388
389    Ok(total_cleaned)
390}
391
392/// Fetches a page of transactions with final statuses for a specific relayer.
393/// Used in tests to verify pagination behavior across all final statuses.
394#[cfg(test)]
395async fn fetch_final_transactions_paginated(
396    relayer_id: &str,
397    transaction_repo: &Arc<impl TransactionRepository>,
398    query: PaginationQuery,
399) -> Result<crate::repositories::PaginatedResult<TransactionRepoModel>> {
400    transaction_repo
401        .find_by_status_paginated(relayer_id, FINAL_TRANSACTION_STATUSES, query, true)
402        .await
403        .map_err(|e| {
404            eyre::eyre!(
405                "Failed to fetch final transactions for relayer {}: {}",
406                relayer_id,
407                e
408            )
409        })
410}
411
412/// Processes a list of transactions for cleanup using batch delete, deleting expired ones.
413///
414/// This function validates that transactions are in final states before deletion,
415/// ensuring data integrity by preventing accidental deletion of active transactions.
416/// Uses batch deletion for improved performance with large numbers of transactions.
417///
418/// # Arguments
419/// * `transactions` - List of transactions to process
420/// * `transaction_repo` - Reference to the transaction repository
421/// * `relayer_id` - ID of the relayer (for logging)
422/// * `now` - Current UTC timestamp for comparison
423///
424/// # Returns
425/// * `usize` - Number of transactions successfully cleaned up
426async fn process_transactions_for_cleanup(
427    transactions: Vec<TransactionRepoModel>,
428    transaction_repo: &Arc<impl TransactionRepository>,
429    relayer_id: &str,
430    now: DateTime<Utc>,
431) -> usize {
432    if transactions.is_empty() {
433        return 0;
434    }
435
436    debug!(
437        transaction_count = transactions.len(),
438        relayer_id = %relayer_id,
439        "processing transactions for cleanup"
440    );
441
442    // Filter expired transactions and validate they are in final states,
443    // then convert to delete requests with pre-extracted data
444    let delete_requests: Vec<TransactionDeleteRequest> = transactions
445        .into_iter()
446        .filter(|tx| {
447            // Must be in a final state
448            if !FINAL_TRANSACTION_STATUSES.contains(&tx.status) {
449                warn!(
450                    tx_id = %tx.id,
451                    status = ?tx.status,
452                    "skipping transaction not in final state"
453                );
454                return false;
455            }
456            // Must be expired
457            should_delete_transaction(tx, now)
458        })
459        .map(|tx| {
460            // Extract nonce from network data for index cleanup
461            let nonce = extract_nonce_from_network_data(&tx.network_data);
462            TransactionDeleteRequest::new(tx.id, tx.relayer_id, nonce)
463        })
464        .collect();
465
466    if delete_requests.is_empty() {
467        debug!(
468            relayer_id = %relayer_id,
469            "no expired transactions found"
470        );
471        return 0;
472    }
473
474    let total_expired = delete_requests.len();
475    debug!(
476        expired_count = total_expired,
477        relayer_id = %relayer_id,
478        "found expired transactions to delete"
479    );
480
481    // Process deletions in batches to avoid overwhelming Redis with large pipelines
482    let mut total_deleted = 0;
483    let mut total_failed = 0;
484
485    for (batch_idx, batch) in delete_requests.chunks(DELETE_BATCH_SIZE).enumerate() {
486        let batch_requests: Vec<TransactionDeleteRequest> = batch.to_vec();
487        let batch_size = batch_requests.len();
488
489        debug!(
490            batch = batch_idx + 1,
491            batch_size = batch_size,
492            relayer_id = %relayer_id,
493            "processing delete batch"
494        );
495
496        match transaction_repo.delete_by_requests(batch_requests).await {
497            Ok(result) => {
498                if !result.failed.is_empty() {
499                    for (id, error) in &result.failed {
500                        error!(
501                            tx_id = %id,
502                            error = %error,
503                            relayer_id = %relayer_id,
504                            "failed to delete expired transaction in batch"
505                        );
506                    }
507                }
508
509                total_deleted += result.deleted_count;
510                total_failed += result.failed.len();
511            }
512            Err(e) => {
513                error!(
514                    error = %e,
515                    relayer_id = %relayer_id,
516                    batch = batch_idx + 1,
517                    batch_size = batch_size,
518                    "batch delete failed completely"
519                );
520                total_failed += batch_size;
521            }
522        }
523    }
524
525    debug!(
526        total_deleted,
527        total_failed,
528        total_expired,
529        relayer_id = %relayer_id,
530        "batch delete completed"
531    );
532
533    total_deleted
534}
535
536/// Extracts the nonce from network transaction data if available.
537/// This is used for cleaning up nonce indexes during deletion.
538fn extract_nonce_from_network_data(network_data: &NetworkTransactionData) -> Option<u64> {
539    match network_data {
540        NetworkTransactionData::Evm(evm_data) => evm_data.nonce,
541        _ => None,
542    }
543}
544
545/// Determines if a transaction should be deleted based on its delete_at timestamp.
546///
547/// # Arguments
548/// * `transaction` - The transaction to check
549/// * `now` - Current UTC timestamp for comparison
550///
551/// # Returns
552/// * `bool` - True if the transaction should be deleted, false otherwise
553fn should_delete_transaction(transaction: &TransactionRepoModel, now: DateTime<Utc>) -> bool {
554    transaction
555        .delete_at
556        .as_ref()
557        .and_then(|delete_at_str| DateTime::parse_from_rfc3339(delete_at_str).ok())
558        .map(|delete_at| {
559            let is_expired = now >= delete_at.with_timezone(&Utc);
560            if is_expired {
561                debug!(
562                    tx_id = %transaction.id,
563                    expired_at = %delete_at.to_rfc3339(),
564                    "transaction is expired"
565                );
566            }
567            is_expired
568        })
569        .unwrap_or_else(|| {
570            if transaction.delete_at.is_some() {
571                warn!(
572                    tx_id = %transaction.id,
573                    "transaction has invalid delete_at timestamp"
574                );
575            }
576            false
577        })
578}
579
580/// Reports the aggregated results of the cleanup operation.
581///
582/// # Arguments
583/// * `cleanup_results` - Results from processing all relayers
584///
585/// # Returns
586/// * `Result<()>` - Success if all went well, error if there were failures
587async fn report_cleanup_results(cleanup_results: Vec<RelayerCleanupResult>) -> Result<()> {
588    let total_cleaned: usize = cleanup_results.iter().map(|r| r.cleaned_count).sum();
589    let total_errors = cleanup_results.iter().filter(|r| r.error.is_some()).count();
590    let total_relayers = cleanup_results.len();
591
592    // Log detailed results for relayers with errors
593    for result in &cleanup_results {
594        if let Some(error) = &result.error {
595            error!(
596                relayer_id = %result.relayer_id,
597                error = %error,
598                "failed to cleanup transactions for relayer"
599            );
600        }
601    }
602
603    if total_errors > 0 {
604        warn!(
605            total_errors,
606            total_relayers, total_cleaned, "transaction cleanup completed with errors"
607        );
608
609        // Return error if there were failures, but don't fail the entire job
610        // This allows for partial success and retry of failed relayers
611        Err(eyre::eyre!(
612            "Cleanup completed with {} errors out of {} relayers",
613            total_errors,
614            total_relayers
615        ))
616    } else {
617        info!(
618            total_cleaned,
619            total_relayers, "transaction cleanup completed successfully"
620        );
621        Ok(())
622    }
623}
624
625#[cfg(test)]
626mod tests {
627
628    use super::*;
629    use crate::{
630        models::{
631            NetworkType, RelayerEvmPolicy, RelayerNetworkPolicy, RelayerRepoModel,
632            TransactionRepoModel, TransactionStatus,
633        },
634        repositories::{InMemoryTransactionRepository, Repository},
635        utils::mocks::mockutils::create_mock_transaction,
636    };
637    use chrono::{Duration, Utc};
638
639    fn create_test_transaction(
640        id: &str,
641        relayer_id: &str,
642        status: TransactionStatus,
643        delete_at: Option<String>,
644    ) -> TransactionRepoModel {
645        let mut tx = create_mock_transaction();
646        tx.id = id.to_string();
647        tx.relayer_id = relayer_id.to_string();
648        tx.status = status;
649        tx.delete_at = delete_at;
650        tx
651    }
652
653    #[tokio::test]
654    async fn test_should_delete_transaction_expired() {
655        let now = Utc::now();
656        let expired_delete_at = (now - Duration::hours(1)).to_rfc3339();
657
658        let transaction = create_test_transaction(
659            "test-tx",
660            "test-relayer",
661            TransactionStatus::Confirmed,
662            Some(expired_delete_at),
663        );
664
665        assert!(should_delete_transaction(&transaction, now));
666    }
667
668    #[tokio::test]
669    async fn test_should_delete_transaction_not_expired() {
670        let now = Utc::now();
671        let future_delete_at = (now + Duration::hours(1)).to_rfc3339();
672
673        let transaction = create_test_transaction(
674            "test-tx",
675            "test-relayer",
676            TransactionStatus::Confirmed,
677            Some(future_delete_at),
678        );
679
680        assert!(!should_delete_transaction(&transaction, now));
681    }
682
683    #[tokio::test]
684    async fn test_should_delete_transaction_no_delete_at() {
685        let now = Utc::now();
686
687        let transaction = create_test_transaction(
688            "test-tx",
689            "test-relayer",
690            TransactionStatus::Confirmed,
691            None,
692        );
693
694        assert!(!should_delete_transaction(&transaction, now));
695    }
696
697    #[tokio::test]
698    async fn test_should_delete_transaction_invalid_timestamp() {
699        let now = Utc::now();
700
701        let transaction = create_test_transaction(
702            "test-tx",
703            "test-relayer",
704            TransactionStatus::Confirmed,
705            Some("invalid-timestamp".to_string()),
706        );
707
708        assert!(!should_delete_transaction(&transaction, now));
709    }
710
711    #[tokio::test]
712    async fn test_process_transactions_for_cleanup_parallel() {
713        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
714        let relayer_id = "test-relayer";
715        let now = Utc::now();
716
717        // Create test transactions
718        let expired_delete_at = (now - Duration::hours(1)).to_rfc3339();
719        let future_delete_at = (now + Duration::hours(1)).to_rfc3339();
720
721        let expired_tx = create_test_transaction(
722            "expired-tx",
723            relayer_id,
724            TransactionStatus::Confirmed,
725            Some(expired_delete_at),
726        );
727        let future_tx = create_test_transaction(
728            "future-tx",
729            relayer_id,
730            TransactionStatus::Failed,
731            Some(future_delete_at),
732        );
733        let no_delete_tx = create_test_transaction(
734            "no-delete-tx",
735            relayer_id,
736            TransactionStatus::Canceled,
737            None,
738        );
739
740        // Store transactions
741        transaction_repo.create(expired_tx.clone()).await.unwrap();
742        transaction_repo.create(future_tx.clone()).await.unwrap();
743        transaction_repo.create(no_delete_tx.clone()).await.unwrap();
744
745        let transactions = vec![expired_tx, future_tx, no_delete_tx];
746
747        // Process transactions
748        let cleaned_count =
749            process_transactions_for_cleanup(transactions, &transaction_repo, relayer_id, now)
750                .await;
751
752        // Should have cleaned up 1 expired transaction
753        assert_eq!(cleaned_count, 1);
754
755        // Verify expired transaction was deleted
756        assert!(transaction_repo
757            .get_by_id("expired-tx".to_string())
758            .await
759            .is_err());
760
761        // Verify non-expired transactions still exist
762        assert!(transaction_repo
763            .get_by_id("future-tx".to_string())
764            .await
765            .is_ok());
766        assert!(transaction_repo
767            .get_by_id("no-delete-tx".to_string())
768            .await
769            .is_ok());
770    }
771
772    #[tokio::test]
773    async fn test_batch_delete_expired_transactions() {
774        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
775        let relayer_id = "test-relayer";
776        let now = Utc::now();
777
778        // Create multiple expired transactions
779        let expired_delete_at = (now - Duration::hours(1)).to_rfc3339();
780
781        for i in 0..5 {
782            let tx = create_test_transaction(
783                &format!("expired-tx-{}", i),
784                relayer_id,
785                TransactionStatus::Confirmed,
786                Some(expired_delete_at.clone()),
787            );
788            transaction_repo.create(tx).await.unwrap();
789        }
790
791        // Verify they exist
792        assert_eq!(transaction_repo.count().await.unwrap(), 5);
793
794        // Delete them using batch delete
795        let ids: Vec<String> = (0..5).map(|i| format!("expired-tx-{}", i)).collect();
796        let result = transaction_repo.delete_by_ids(ids).await.unwrap();
797
798        assert_eq!(result.deleted_count, 5);
799        assert!(result.failed.is_empty());
800
801        // Verify they were deleted
802        assert_eq!(transaction_repo.count().await.unwrap(), 0);
803    }
804
805    #[tokio::test]
806    async fn test_batch_delete_with_nonexistent_ids() {
807        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
808        let relayer_id = "test-relayer";
809
810        // Create one transaction
811        let tx = create_test_transaction(
812            "existing-tx",
813            relayer_id,
814            TransactionStatus::Confirmed,
815            Some(Utc::now().to_rfc3339()),
816        );
817        transaction_repo.create(tx).await.unwrap();
818
819        // Try to delete existing and non-existing transactions
820        let ids = vec![
821            "existing-tx".to_string(),
822            "nonexistent-1".to_string(),
823            "nonexistent-2".to_string(),
824        ];
825        let result = transaction_repo.delete_by_ids(ids).await.unwrap();
826
827        // Should delete the existing one and report failures for the others
828        assert_eq!(result.deleted_count, 1);
829        assert_eq!(result.failed.len(), 2);
830
831        // Verify the existing one was deleted
832        assert!(transaction_repo
833            .get_by_id("existing-tx".to_string())
834            .await
835            .is_err());
836    }
837
838    #[tokio::test]
839    async fn test_process_transactions_skips_non_final_status() {
840        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
841        let relayer_id = "test-relayer";
842        let now = Utc::now();
843
844        // Create a transaction with non-final status but expired delete_at
845        let expired_delete_at = (now - Duration::hours(1)).to_rfc3339();
846        let pending_tx = create_test_transaction(
847            "pending-tx",
848            relayer_id,
849            TransactionStatus::Pending, // Non-final status
850            Some(expired_delete_at),
851        );
852        transaction_repo.create(pending_tx.clone()).await.unwrap();
853
854        let transactions = vec![pending_tx];
855
856        // Process should skip non-final status transactions
857        let cleaned_count =
858            process_transactions_for_cleanup(transactions, &transaction_repo, relayer_id, now)
859                .await;
860
861        // Should not have cleaned any transactions
862        assert_eq!(cleaned_count, 0);
863
864        // Transaction should still exist
865        assert!(transaction_repo
866            .get_by_id("pending-tx".to_string())
867            .await
868            .is_ok());
869    }
870
871    #[tokio::test]
872    async fn test_fetch_final_transactions_paginated() {
873        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
874        let relayer_id = "test-relayer";
875
876        // Create transactions with different statuses
877        let confirmed_tx = create_test_transaction(
878            "confirmed-tx",
879            relayer_id,
880            TransactionStatus::Confirmed,
881            None,
882        );
883        let pending_tx =
884            create_test_transaction("pending-tx", relayer_id, TransactionStatus::Pending, None);
885        let failed_tx =
886            create_test_transaction("failed-tx", relayer_id, TransactionStatus::Failed, None);
887
888        // Store transactions
889        transaction_repo.create(confirmed_tx).await.unwrap();
890        transaction_repo.create(pending_tx).await.unwrap();
891        transaction_repo.create(failed_tx).await.unwrap();
892
893        // Fetch final transactions with pagination
894        let query = PaginationQuery {
895            page: 1,
896            per_page: 10,
897        };
898        let result = fetch_final_transactions_paginated(relayer_id, &transaction_repo, query)
899            .await
900            .unwrap();
901
902        // Should only return transactions with final statuses (Confirmed, Failed)
903        assert_eq!(result.total, 2);
904        assert_eq!(result.items.len(), 2);
905        let final_ids: Vec<&String> = result.items.iter().map(|tx| &tx.id).collect();
906        assert!(final_ids.contains(&&"confirmed-tx".to_string()));
907        assert!(final_ids.contains(&&"failed-tx".to_string()));
908        assert!(!final_ids.contains(&&"pending-tx".to_string()));
909    }
910
911    #[tokio::test]
912    async fn test_report_cleanup_results_success() {
913        let results = vec![
914            RelayerCleanupResult {
915                relayer_id: "relayer-1".to_string(),
916                cleaned_count: 2,
917                error: None,
918            },
919            RelayerCleanupResult {
920                relayer_id: "relayer-2".to_string(),
921                cleaned_count: 1,
922                error: None,
923            },
924        ];
925
926        let result = report_cleanup_results(results).await;
927        assert!(result.is_ok());
928    }
929
930    #[tokio::test]
931    async fn test_report_cleanup_results_with_errors() {
932        let results = vec![
933            RelayerCleanupResult {
934                relayer_id: "relayer-1".to_string(),
935                cleaned_count: 2,
936                error: None,
937            },
938            RelayerCleanupResult {
939                relayer_id: "relayer-2".to_string(),
940                cleaned_count: 0,
941                error: Some("Database error".to_string()),
942            },
943        ];
944
945        let result = report_cleanup_results(results).await;
946        assert!(result.is_err());
947    }
948
949    #[tokio::test]
950    async fn test_process_single_relayer_success() {
951        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
952        let relayer = RelayerRepoModel {
953            id: "test-relayer".to_string(),
954            name: "Test Relayer".to_string(),
955            network: "ethereum".to_string(),
956            paused: false,
957            network_type: NetworkType::Evm,
958            signer_id: "test-signer".to_string(),
959            policies: RelayerNetworkPolicy::Evm(RelayerEvmPolicy::default()),
960            address: "0x1234567890123456789012345678901234567890".to_string(),
961            notification_id: None,
962            system_disabled: false,
963            custom_rpc_urls: None,
964            ..Default::default()
965        };
966        let now = Utc::now();
967
968        // Create expired and non-expired transactions
969        let expired_tx = create_test_transaction(
970            "expired-tx",
971            &relayer.id,
972            TransactionStatus::Confirmed,
973            Some((now - Duration::hours(1)).to_rfc3339()),
974        );
975        let future_tx = create_test_transaction(
976            "future-tx",
977            &relayer.id,
978            TransactionStatus::Failed,
979            Some((now + Duration::hours(1)).to_rfc3339()),
980        );
981
982        transaction_repo.create(expired_tx).await.unwrap();
983        transaction_repo.create(future_tx).await.unwrap();
984
985        let result = process_single_relayer(relayer.clone(), transaction_repo.clone(), now).await;
986
987        assert_eq!(result.relayer_id, relayer.id);
988        assert_eq!(result.cleaned_count, 1);
989        assert!(result.error.is_none());
990    }
991
992    #[tokio::test]
993    async fn test_process_single_relayer_no_transactions() {
994        // Create a relayer with no transactions in the repo
995        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
996        let relayer = RelayerRepoModel {
997            id: "empty-relayer".to_string(),
998            name: "Empty Relayer".to_string(),
999            network: "ethereum".to_string(),
1000            paused: false,
1001            network_type: NetworkType::Evm,
1002            signer_id: "test-signer".to_string(),
1003            policies: RelayerNetworkPolicy::Evm(RelayerEvmPolicy::default()),
1004            address: "0x1234567890123456789012345678901234567890".to_string(),
1005            notification_id: None,
1006            system_disabled: false,
1007            custom_rpc_urls: None,
1008            ..Default::default()
1009        };
1010        let now = Utc::now();
1011
1012        // This should succeed but find no transactions
1013        let result = process_single_relayer(relayer.clone(), transaction_repo, now).await;
1014
1015        assert_eq!(result.relayer_id, relayer.id);
1016        assert_eq!(result.cleaned_count, 0);
1017        assert!(result.error.is_none()); // No error, just no transactions found
1018    }
1019
1020    #[tokio::test]
1021    async fn test_process_transactions_with_empty_list() {
1022        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
1023        let relayer_id = "test-relayer";
1024        let now = Utc::now();
1025        let transactions = vec![];
1026
1027        let cleaned_count =
1028            process_transactions_for_cleanup(transactions, &transaction_repo, relayer_id, now)
1029                .await;
1030
1031        assert_eq!(cleaned_count, 0);
1032    }
1033
1034    #[tokio::test]
1035    async fn test_process_transactions_with_no_expired() {
1036        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
1037        let relayer_id = "test-relayer";
1038        let now = Utc::now();
1039
1040        // Create only non-expired transactions
1041        let future_tx1 = create_test_transaction(
1042            "future-tx-1",
1043            relayer_id,
1044            TransactionStatus::Confirmed,
1045            Some((now + Duration::hours(1)).to_rfc3339()),
1046        );
1047        let future_tx2 = create_test_transaction(
1048            "future-tx-2",
1049            relayer_id,
1050            TransactionStatus::Failed,
1051            Some((now + Duration::hours(2)).to_rfc3339()),
1052        );
1053        let no_delete_tx = create_test_transaction(
1054            "no-delete-tx",
1055            relayer_id,
1056            TransactionStatus::Canceled,
1057            None,
1058        );
1059
1060        let transactions = vec![future_tx1, future_tx2, no_delete_tx];
1061
1062        let cleaned_count =
1063            process_transactions_for_cleanup(transactions, &transaction_repo, relayer_id, now)
1064                .await;
1065
1066        assert_eq!(cleaned_count, 0);
1067    }
1068
1069    #[tokio::test]
1070    async fn test_should_delete_transaction_exactly_at_expiry_time() {
1071        let now = Utc::now();
1072        let exact_expiry_time = now.to_rfc3339();
1073
1074        let transaction = create_test_transaction(
1075            "test-tx",
1076            "test-relayer",
1077            TransactionStatus::Confirmed,
1078            Some(exact_expiry_time),
1079        );
1080
1081        // Should be considered expired when exactly at expiry time
1082        assert!(should_delete_transaction(&transaction, now));
1083    }
1084
1085    #[tokio::test]
1086    async fn test_parallel_processing_with_mixed_results() {
1087        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
1088        let relayer_id = "test-relayer";
1089        let now = Utc::now();
1090
1091        // Create multiple expired transactions
1092        let expired_tx1 = create_test_transaction(
1093            "expired-tx-1",
1094            relayer_id,
1095            TransactionStatus::Confirmed,
1096            Some((now - Duration::hours(1)).to_rfc3339()),
1097        );
1098        let expired_tx2 = create_test_transaction(
1099            "expired-tx-2",
1100            relayer_id,
1101            TransactionStatus::Failed,
1102            Some((now - Duration::hours(2)).to_rfc3339()),
1103        );
1104        let expired_tx3 = create_test_transaction(
1105            "expired-tx-3",
1106            relayer_id,
1107            TransactionStatus::Canceled,
1108            Some((now - Duration::hours(3)).to_rfc3339()),
1109        );
1110
1111        // Store only some transactions (others will fail deletion due to NotFound)
1112        transaction_repo.create(expired_tx1.clone()).await.unwrap();
1113        transaction_repo.create(expired_tx2.clone()).await.unwrap();
1114        // Don't store expired_tx3 - it will fail deletion
1115
1116        let transactions = vec![expired_tx1, expired_tx2, expired_tx3];
1117
1118        let cleaned_count =
1119            process_transactions_for_cleanup(transactions, &transaction_repo, relayer_id, now)
1120                .await;
1121
1122        // Should have cleaned 2 out of 3 transactions (one failed due to NotFound)
1123        assert_eq!(cleaned_count, 2);
1124    }
1125
1126    #[tokio::test]
1127    async fn test_report_cleanup_results_empty() {
1128        let results = vec![];
1129        let result = report_cleanup_results(results).await;
1130        assert!(result.is_ok());
1131    }
1132
1133    #[tokio::test]
1134    async fn test_fetch_final_transactions_paginated_with_mixed_statuses() {
1135        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
1136        let relayer_id = "test-relayer";
1137
1138        // Create transactions with all possible statuses
1139        let confirmed_tx = create_test_transaction(
1140            "confirmed-tx",
1141            relayer_id,
1142            TransactionStatus::Confirmed,
1143            None,
1144        );
1145        let failed_tx =
1146            create_test_transaction("failed-tx", relayer_id, TransactionStatus::Failed, None);
1147        let canceled_tx =
1148            create_test_transaction("canceled-tx", relayer_id, TransactionStatus::Canceled, None);
1149        let expired_tx =
1150            create_test_transaction("expired-tx", relayer_id, TransactionStatus::Expired, None);
1151        let pending_tx =
1152            create_test_transaction("pending-tx", relayer_id, TransactionStatus::Pending, None);
1153        let sent_tx = create_test_transaction("sent-tx", relayer_id, TransactionStatus::Sent, None);
1154
1155        // Store all transactions
1156        transaction_repo.create(confirmed_tx).await.unwrap();
1157        transaction_repo.create(failed_tx).await.unwrap();
1158        transaction_repo.create(canceled_tx).await.unwrap();
1159        transaction_repo.create(expired_tx).await.unwrap();
1160        transaction_repo.create(pending_tx).await.unwrap();
1161        transaction_repo.create(sent_tx).await.unwrap();
1162
1163        // Fetch final transactions with pagination
1164        let query = PaginationQuery {
1165            page: 1,
1166            per_page: 10,
1167        };
1168        let result = fetch_final_transactions_paginated(relayer_id, &transaction_repo, query)
1169            .await
1170            .unwrap();
1171
1172        // Should only return the 4 final status transactions
1173        assert_eq!(result.total, 4);
1174        assert_eq!(result.items.len(), 4);
1175        let final_ids: Vec<&String> = result.items.iter().map(|tx| &tx.id).collect();
1176        assert!(final_ids.contains(&&"confirmed-tx".to_string()));
1177        assert!(final_ids.contains(&&"failed-tx".to_string()));
1178        assert!(final_ids.contains(&&"canceled-tx".to_string()));
1179        assert!(final_ids.contains(&&"expired-tx".to_string()));
1180        assert!(!final_ids.contains(&&"pending-tx".to_string()));
1181        assert!(!final_ids.contains(&&"sent-tx".to_string()));
1182    }
1183
1184    #[tokio::test]
1185    async fn test_fetch_final_transactions_paginated_pagination() {
1186        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
1187        let relayer_id = "test-relayer";
1188
1189        // Create 5 confirmed transactions
1190        for i in 1..=5 {
1191            let mut tx = create_test_transaction(
1192                &format!("tx-{}", i),
1193                relayer_id,
1194                TransactionStatus::Confirmed,
1195                None,
1196            );
1197            tx.created_at = format!("2025-01-27T{:02}:00:00.000000+00:00", 10 + i);
1198            transaction_repo.create(tx).await.unwrap();
1199        }
1200
1201        // Test first page with 2 items
1202        let query = PaginationQuery {
1203            page: 1,
1204            per_page: 2,
1205        };
1206        let result = fetch_final_transactions_paginated(relayer_id, &transaction_repo, query)
1207            .await
1208            .unwrap();
1209
1210        assert_eq!(result.total, 5);
1211        assert_eq!(result.items.len(), 2);
1212        assert_eq!(result.page, 1);
1213
1214        // Test second page
1215        let query = PaginationQuery {
1216            page: 2,
1217            per_page: 2,
1218        };
1219        let result = fetch_final_transactions_paginated(relayer_id, &transaction_repo, query)
1220            .await
1221            .unwrap();
1222
1223        assert_eq!(result.total, 5);
1224        assert_eq!(result.items.len(), 2);
1225        assert_eq!(result.page, 2);
1226
1227        // Test last page (partial)
1228        let query = PaginationQuery {
1229            page: 3,
1230            per_page: 2,
1231        };
1232        let result = fetch_final_transactions_paginated(relayer_id, &transaction_repo, query)
1233            .await
1234            .unwrap();
1235
1236        assert_eq!(result.total, 5);
1237        assert_eq!(result.items.len(), 1);
1238        assert_eq!(result.page, 3);
1239    }
1240
1241    #[tokio::test]
1242    async fn test_process_status_cleanup_deletes_expired() {
1243        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
1244        let relayer_id = "test-relayer";
1245        let now = Utc::now();
1246
1247        // Create expired and non-expired transactions of the same status
1248        let expired_tx = create_test_transaction(
1249            "expired-tx",
1250            relayer_id,
1251            TransactionStatus::Confirmed,
1252            Some((now - Duration::hours(1)).to_rfc3339()),
1253        );
1254        let future_tx = create_test_transaction(
1255            "future-tx",
1256            relayer_id,
1257            TransactionStatus::Confirmed,
1258            Some((now + Duration::hours(1)).to_rfc3339()),
1259        );
1260
1261        transaction_repo.create(expired_tx).await.unwrap();
1262        transaction_repo.create(future_tx).await.unwrap();
1263
1264        let cleaned = process_status_cleanup(
1265            relayer_id,
1266            &TransactionStatus::Confirmed,
1267            &transaction_repo,
1268            now,
1269        )
1270        .await
1271        .unwrap();
1272
1273        assert_eq!(cleaned, 1);
1274
1275        // Expired one deleted, future one remains
1276        assert!(transaction_repo
1277            .get_by_id("expired-tx".to_string())
1278            .await
1279            .is_err());
1280        assert!(transaction_repo
1281            .get_by_id("future-tx".to_string())
1282            .await
1283            .is_ok());
1284    }
1285
1286    #[tokio::test]
1287    async fn test_process_status_cleanup_no_transactions() {
1288        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
1289        let relayer_id = "test-relayer";
1290        let now = Utc::now();
1291
1292        let cleaned = process_status_cleanup(
1293            relayer_id,
1294            &TransactionStatus::Confirmed,
1295            &transaction_repo,
1296            now,
1297        )
1298        .await
1299        .unwrap();
1300
1301        assert_eq!(cleaned, 0);
1302    }
1303
1304    #[tokio::test]
1305    async fn test_process_status_cleanup_skips_other_statuses() {
1306        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
1307        let relayer_id = "test-relayer";
1308        let now = Utc::now();
1309
1310        // Create expired transaction with Failed status
1311        let tx = create_test_transaction(
1312            "failed-tx",
1313            relayer_id,
1314            TransactionStatus::Failed,
1315            Some((now - Duration::hours(1)).to_rfc3339()),
1316        );
1317        transaction_repo.create(tx).await.unwrap();
1318
1319        // Cleanup for Confirmed status should not touch Failed transactions
1320        let cleaned = process_status_cleanup(
1321            relayer_id,
1322            &TransactionStatus::Confirmed,
1323            &transaction_repo,
1324            now,
1325        )
1326        .await
1327        .unwrap();
1328
1329        assert_eq!(cleaned, 0);
1330        assert!(transaction_repo
1331            .get_by_id("failed-tx".to_string())
1332            .await
1333            .is_ok());
1334    }
1335
1336    #[tokio::test]
1337    async fn test_process_single_relayer_processes_all_final_statuses() {
1338        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
1339        let relayer = RelayerRepoModel {
1340            id: "test-relayer".to_string(),
1341            name: "Test Relayer".to_string(),
1342            network: "ethereum".to_string(),
1343            paused: false,
1344            network_type: NetworkType::Evm,
1345            signer_id: "test-signer".to_string(),
1346            policies: RelayerNetworkPolicy::Evm(RelayerEvmPolicy::default()),
1347            address: "0x1234567890123456789012345678901234567890".to_string(),
1348            notification_id: None,
1349            system_disabled: false,
1350            custom_rpc_urls: None,
1351            ..Default::default()
1352        };
1353        let now = Utc::now();
1354        let expired_at = (now - Duration::hours(1)).to_rfc3339();
1355
1356        // Create one expired transaction per final status
1357        for (i, status) in [
1358            TransactionStatus::Confirmed,
1359            TransactionStatus::Failed,
1360            TransactionStatus::Canceled,
1361            TransactionStatus::Expired,
1362        ]
1363        .iter()
1364        .enumerate()
1365        {
1366            let tx = create_test_transaction(
1367                &format!("tx-{}", i),
1368                &relayer.id,
1369                status.clone(),
1370                Some(expired_at.clone()),
1371            );
1372            transaction_repo.create(tx).await.unwrap();
1373        }
1374
1375        let result = process_single_relayer(relayer.clone(), transaction_repo.clone(), now).await;
1376
1377        assert_eq!(result.relayer_id, relayer.id);
1378        assert_eq!(result.cleaned_count, 4);
1379        assert!(result.error.is_none());
1380
1381        // All should be deleted
1382        assert_eq!(transaction_repo.count().await.unwrap(), 0);
1383    }
1384}