openzeppelin_relayer/jobs/handlers/
transaction_status_handler.rs

1//! Transaction status monitoring handler.
2//!
3//! Monitors the status of submitted transactions by:
4//! - Checking transaction status on the network
5//! - Updating transaction status in storage
6//! - Tracking failure counts for circuit breaker decisions (stored in Redis by tx_id)
7use actix_web::web::ThinData;
8use apalis::prelude::{Attempt, Data, TaskId, *};
9use apalis_redis::RedisContext;
10use deadpool_redis::Pool;
11use eyre::Result;
12use redis::AsyncCommands;
13use std::sync::Arc;
14use tracing::{debug, info, instrument, warn};
15
16use crate::{
17    config::ServerConfig,
18    constants::{get_max_consecutive_status_failures, get_max_total_status_failures},
19    domain::{get_relayer_transaction, get_transaction_by_id, is_final_state, Transaction},
20    jobs::{Job, JobProducerTrait, StatusCheckContext, TransactionStatusCheck},
21    models::{ApiError, DefaultAppState, TransactionRepoModel},
22    observability::request_id::set_request_id,
23};
24
25/// Redis key prefix for transaction status check metadata (failure counters).
26/// Stored separately from Apalis job data to persist across retries.
27const TX_STATUS_CHECK_METADATA_PREFIX: &str = "queue:tx_status_check_metadata";
28
29#[instrument(
30    level = "debug",
31    skip(job, state, _ctx),
32    fields(
33        request_id = ?job.request_id,
34        job_id = %job.message_id,
35        job_type = %job.job_type.to_string(),
36        attempt = %attempt.current(),
37        tx_id = %job.data.transaction_id,
38        relayer_id = %job.data.relayer_id,
39        task_id = %task_id.to_string(),
40    )
41)]
42pub async fn transaction_status_handler(
43    job: Job<TransactionStatusCheck>,
44    state: Data<ThinData<DefaultAppState>>,
45    attempt: Attempt,
46    task_id: TaskId,
47    _ctx: RedisContext,
48) -> Result<(), Error> {
49    if let Some(request_id) = job.request_id.clone() {
50        set_request_id(request_id);
51    }
52
53    // Get Redis pool from queue - uses deadpool for connection management
54    let queue = state
55        .job_producer()
56        .get_queue()
57        .await
58        .map_err(|e| Error::Failed(Arc::new(format!("Failed to get queue: {e}").into())))?;
59
60    let redis_pool = queue.redis_connections().primary().clone();
61
62    // Execute status check - all logic moved here so errors go through handle_result
63    let req_result =
64        handle_request(&job.data, &state, &redis_pool, attempt.current(), &task_id).await;
65
66    let tx_id = &job.data.transaction_id;
67
68    // Handle result and update counters in Redis
69    handle_result(
70        req_result.result,
71        &redis_pool,
72        tx_id,
73        req_result.consecutive_failures,
74        req_result.total_failures,
75        req_result.should_retry_on_error,
76    )
77    .await
78}
79
80/// Handles status check results with circuit breaker tracking.
81///
82/// # Strategy
83/// - If transaction is in final state → Clean up counters, return Ok (job completes)
84/// - If success but not final → Reset consecutive to 0 in Redis, return Err (Apalis retries)
85/// - If error with should_retry=true → Increment counters in Redis, return Err (Apalis retries)
86/// - If error with should_retry=false → Return Ok (job completes, e.g., transaction not found)
87/// - If counters are None (early failure) → Skip counter updates
88///
89/// Counters are stored in a separate Redis key by tx_id, independent of Apalis job data.
90async fn handle_result(
91    result: Result<TransactionRepoModel>,
92    redis_pool: &Arc<Pool>,
93    tx_id: &str,
94    consecutive_failures: Option<u32>,
95    total_failures: Option<u32>,
96    should_retry_on_error: bool,
97) -> Result<(), Error> {
98    match result {
99        Ok(tx) if is_final_state(&tx.status) => {
100            // Transaction reached final state - job complete, clean up counters
101            debug!(
102                tx_id = %tx.id,
103                relayer_id = %tx.relayer_id,
104                status = ?tx.status,
105                consecutive_failures = ?consecutive_failures,
106                total_failures = ?total_failures,
107                "transaction in final state, status check complete"
108            );
109
110            // Clean up the counters from Redis
111            if let Err(e) = delete_counters_from_redis(redis_pool, tx_id).await {
112                warn!(error = %e, tx_id = %tx_id, "failed to clean up counters from Redis");
113            }
114
115            Ok(())
116        }
117        Ok(tx) => {
118            // Success but not final - RESET consecutive counter, keep total unchanged
119            debug!(
120                tx_id = %tx.id,
121                relayer_id = %tx.relayer_id,
122                status = ?tx.status,
123                "transaction not in final state"
124            );
125
126            // Reset consecutive counter in Redis only if there were previous failures
127            // This avoids creating Redis entries for transactions that never failed
128            match (consecutive_failures, total_failures) {
129                (Some(consecutive), Some(total)) if consecutive > 0 || total > 0 => {
130                    if let Err(e) = update_counters_in_redis(redis_pool, tx_id, 0, total).await {
131                        warn!(error = %e, tx_id = %tx_id, relayer_id = %tx.relayer_id, "failed to reset consecutive counter in Redis");
132                    }
133                }
134                _ => {
135                    // No previous failures or counters not available - nothing to reset
136                }
137            }
138
139            // Return error to trigger Apalis retry
140            Err(Error::Failed(Arc::new(
141                format!(
142                    "transaction status: {:?} - not in final state, retrying",
143                    tx.status
144                )
145                .into(),
146            )))
147        }
148        Err(e) => {
149            // Check if this is a permanent failure that shouldn't retry
150            if !should_retry_on_error {
151                info!(
152                    error = %e,
153                    tx_id = %tx_id,
154                    "status check failed with permanent error, completing job without retry"
155                );
156                return Ok(());
157            }
158
159            // Transient error - INCREMENT both counters (only if we have values)
160            match (consecutive_failures, total_failures) {
161                (Some(consecutive), Some(total)) => {
162                    let new_consecutive = consecutive.saturating_add(1);
163                    let new_total = total.saturating_add(1);
164
165                    warn!(
166                        error = %e,
167                        tx_id = %tx_id,
168                        consecutive_failures = new_consecutive,
169                        total_failures = new_total,
170                        "status check failed, incrementing failure counters"
171                    );
172
173                    // Update counters in Redis
174                    if let Err(update_err) =
175                        update_counters_in_redis(redis_pool, tx_id, new_consecutive, new_total)
176                            .await
177                    {
178                        warn!(error = %update_err, tx_id = %tx_id, "failed to update counters in Redis");
179                    }
180                }
181                _ => {
182                    // Early failure before counters were read - skip counter update
183                    warn!(
184                        error = %e,
185                        tx_id = %tx_id,
186                        "status check failed early, counters not available"
187                    );
188                }
189            }
190
191            // Return error to trigger Apalis retry
192            Err(Error::Failed(Arc::new(format!("{e}").into())))
193        }
194    }
195}
196
197/// Builds the Redis key for storing status check metadata for a transaction.
198fn get_metadata_key(tx_id: &str) -> String {
199    let redis_key_prefix = ServerConfig::get_redis_key_prefix();
200    format!("{redis_key_prefix}:{TX_STATUS_CHECK_METADATA_PREFIX}:{tx_id}")
201}
202
203/// Reads failure counters from Redis for a given transaction.
204///
205/// Returns (consecutive_failures, total_failures), defaulting to (0, 0) if not found.
206async fn read_counters_from_redis(redis_pool: &Arc<Pool>, tx_id: &str) -> (u32, u32) {
207    let key = get_metadata_key(tx_id);
208
209    let result: Result<(u32, u32)> = async {
210        let mut conn = redis_pool
211            .get()
212            .await
213            .map_err(|e| eyre::eyre!("Failed to get Redis connection: {e}"))?;
214
215        let values: Vec<Option<String>> = conn
216            .hget(&key, &["consecutive", "total"])
217            .await
218            .map_err(|e| eyre::eyre!("Failed to read counters from Redis: {e}"))?;
219
220        let consecutive = values
221            .first()
222            .and_then(|v| v.as_ref())
223            .and_then(|v| v.parse().ok())
224            .unwrap_or(0);
225        let total = values
226            .get(1)
227            .and_then(|v| v.as_ref())
228            .and_then(|v| v.parse().ok())
229            .unwrap_or(0);
230
231        Ok((consecutive, total))
232    }
233    .await;
234
235    match result {
236        Ok(counters) => counters,
237        Err(e) => {
238            warn!(error = %e, tx_id = %tx_id, "failed to read counters from Redis, using defaults");
239            (0, 0)
240        }
241    }
242}
243
244/// Updates failure counters in Redis for a given transaction.
245///
246/// TTL is refreshed on every update to act as an "inactivity timeout".
247/// If no status checks happen for 12 hours, the metadata is considered stale.
248/// Active transactions keep their metadata fresh.
249async fn update_counters_in_redis(
250    redis_pool: &Arc<Pool>,
251    tx_id: &str,
252    consecutive: u32,
253    total: u32,
254) -> Result<()> {
255    let key = get_metadata_key(tx_id);
256
257    // Use pipeline to atomically set values and TTL
258    // hset_multiple returns "OK", expire returns 1 if TTL was set
259    let mut conn = redis_pool
260        .get()
261        .await
262        .map_err(|e| eyre::eyre!("Failed to get Redis connection: {e}"))?;
263
264    let (ttl_result,): (i64,) = redis::pipe()
265        .hset_multiple(
266            &key,
267            &[
268                ("consecutive", consecutive.to_string()),
269                ("total", total.to_string()),
270            ],
271        )
272        .ignore()
273        .expire(&key, 43200) // 12 hours TTL
274        .query_async(&mut *conn)
275        .await
276        .map_err(|e| eyre::eyre!("Failed to update counters in Redis: {e}"))?;
277
278    let ttl_set = ttl_result == 1;
279
280    debug!(
281        tx_id = %tx_id,
282        consecutive,
283        total,
284        key,
285        ttl_set,
286        "updated status check counters in Redis"
287    );
288
289    Ok(())
290}
291
292/// Deletes failure counters from Redis when transaction reaches final state.
293async fn delete_counters_from_redis(redis_pool: &Arc<Pool>, tx_id: &str) -> Result<()> {
294    let key = get_metadata_key(tx_id);
295
296    let mut conn = redis_pool
297        .get()
298        .await
299        .map_err(|e| eyre::eyre!("Failed to get Redis connection: {e}"))?;
300
301    conn.del::<_, ()>(&key)
302        .await
303        .map_err(|e| eyre::eyre!("Failed to delete counters from Redis: {e}"))?;
304
305    debug!(tx_id = %tx_id, key, "deleted status check counters from Redis");
306
307    Ok(())
308}
309
310/// Result of handle_request including whether to retry on error.
311struct HandleRequestResult {
312    result: Result<TransactionRepoModel>,
313    consecutive_failures: Option<u32>,
314    total_failures: Option<u32>,
315    /// If false, errors should not trigger retry (e.g., transaction not found)
316    should_retry_on_error: bool,
317}
318
319/// Executes the status check logic and returns the result with counter values.
320/// Returns None for counters if they couldn't be read (e.g., transaction fetch failed early).
321/// Sets should_retry_on_error=false for permanent failures like transaction not found.
322async fn handle_request(
323    status_request: &TransactionStatusCheck,
324    state: &Data<ThinData<DefaultAppState>>,
325    redis_pool: &Arc<Pool>,
326    attempt: usize,
327    task_id: &TaskId,
328) -> HandleRequestResult {
329    let tx_id = &status_request.transaction_id;
330    debug!(
331        tx_id = %tx_id,
332        relayer_id = %status_request.relayer_id,
333        "handling transaction status check"
334    );
335
336    // Fetch transaction - if this fails, we can't read counters yet
337    let transaction = match get_transaction_by_id(tx_id.clone(), state).await {
338        Ok(tx) => tx,
339        Err(ApiError::NotFound(msg)) => {
340            // Transaction not found - permanent failure, don't retry
341            warn!(tx_id = %tx_id, "transaction not found, completing job without retry: {}", msg);
342            return HandleRequestResult {
343                result: Err(eyre::eyre!("Transaction not found: {}", msg)),
344                consecutive_failures: None,
345                total_failures: None,
346                should_retry_on_error: false,
347            };
348        }
349        Err(e) => {
350            // Other errors - should retry
351            return HandleRequestResult {
352                result: Err(e.into()),
353                consecutive_failures: None,
354                total_failures: None,
355                should_retry_on_error: true,
356            };
357        }
358    };
359
360    // Read failure counters from separate Redis key (not job metadata)
361    // This persists across job retries since we store it independently
362    let (consecutive_failures, total_failures) = read_counters_from_redis(redis_pool, tx_id).await;
363
364    // Get network type from transaction (authoritative source)
365    let network_type = transaction.network_type;
366    let max_consecutive = get_max_consecutive_status_failures(network_type);
367    let max_total = get_max_total_status_failures(network_type);
368
369    debug!(
370        tx_id = %tx_id,
371        consecutive_failures,
372        total_failures,
373        max_consecutive,
374        max_total,
375        attempt,
376        task_id = %task_id.to_string(),
377        "handling transaction status check"
378    );
379
380    // Build circuit breaker context
381    let context = StatusCheckContext::new(
382        consecutive_failures,
383        total_failures,
384        attempt as u32,
385        max_consecutive,
386        max_total,
387        network_type,
388    );
389
390    // Get relayer transaction handler
391    let relayer_transaction =
392        match get_relayer_transaction(status_request.relayer_id.clone(), state).await {
393            Ok(rt) => rt,
394            Err(ApiError::NotFound(msg)) => {
395                // Relayer or signer not found - permanent failure, don't retry
396                warn!(
397                    tx_id = %tx_id,
398                    relayer_id = %status_request.relayer_id,
399                    "relayer or signer not found, completing job without retry: {}", msg
400                );
401                return HandleRequestResult {
402                    result: Err(eyre::eyre!("Relayer or signer not found: {}", msg)),
403                    consecutive_failures: Some(consecutive_failures),
404                    total_failures: Some(total_failures),
405                    should_retry_on_error: false,
406                };
407            }
408            Err(e) => {
409                // Other errors - should retry
410                return HandleRequestResult {
411                    result: Err(e.into()),
412                    consecutive_failures: Some(consecutive_failures),
413                    total_failures: Some(total_failures),
414                    should_retry_on_error: true,
415                };
416            }
417        };
418
419    // Execute status check
420    let result = relayer_transaction
421        .handle_transaction_status(transaction, Some(context))
422        .await
423        .map_err(|e| e.into());
424
425    if let Ok(tx) = result.as_ref() {
426        debug!(
427            tx_id = %tx.id,
428            status = ?tx.status,
429            "status check handled successfully"
430        );
431    }
432
433    HandleRequestResult {
434        result,
435        consecutive_failures: Some(consecutive_failures),
436        total_failures: Some(total_failures),
437        should_retry_on_error: true,
438    }
439}
440
441#[cfg(test)]
442mod tests {
443    use super::*;
444    use crate::models::{NetworkType, TransactionStatus};
445    use std::collections::HashMap;
446
447    #[test]
448    fn test_get_metadata_key() {
449        // Note: This test assumes default redis key prefix
450        let key = get_metadata_key("tx123");
451        assert!(key.contains(TX_STATUS_CHECK_METADATA_PREFIX));
452        assert!(key.contains("tx123"));
453    }
454
455    #[tokio::test]
456    async fn test_status_check_job_validation() {
457        let check_job = TransactionStatusCheck::new("tx123", "relayer-1", NetworkType::Evm);
458        let job = Job::new(crate::jobs::JobType::TransactionStatusCheck, check_job);
459
460        assert_eq!(job.data.transaction_id, "tx123");
461        assert_eq!(job.data.relayer_id, "relayer-1");
462        assert!(job.data.metadata.is_none());
463    }
464
465    #[tokio::test]
466    async fn test_status_check_with_metadata() {
467        let mut metadata = HashMap::new();
468        metadata.insert("retry_count".to_string(), "2".to_string());
469        metadata.insert("last_status".to_string(), "pending".to_string());
470
471        let check_job = TransactionStatusCheck::new("tx123", "relayer-1", NetworkType::Evm)
472            .with_metadata(metadata.clone());
473
474        assert!(check_job.metadata.is_some());
475        let job_metadata = check_job.metadata.unwrap();
476        assert_eq!(job_metadata.get("retry_count").unwrap(), "2");
477        assert_eq!(job_metadata.get("last_status").unwrap(), "pending");
478    }
479
480    #[test]
481    fn test_status_check_network_type_required() {
482        // Jobs should always have network_type set
483        let check_job = TransactionStatusCheck::new("tx123", "relayer-1", NetworkType::Evm);
484        assert!(check_job.network_type.is_some());
485
486        // Verify different network types are preserved
487        let solana_job = TransactionStatusCheck::new("tx456", "relayer-2", NetworkType::Solana);
488        assert_eq!(solana_job.network_type, Some(NetworkType::Solana));
489
490        let stellar_job = TransactionStatusCheck::new("tx789", "relayer-3", NetworkType::Stellar);
491        assert_eq!(stellar_job.network_type, Some(NetworkType::Stellar));
492    }
493
494    mod context_tests {
495        use super::*;
496
497        #[test]
498        fn test_context_should_force_finalize_below_threshold() {
499            let ctx = StatusCheckContext::new(5, 10, 15, 25, 75, NetworkType::Evm);
500            assert!(!ctx.should_force_finalize());
501        }
502
503        #[test]
504        fn test_context_should_force_finalize_consecutive_at_threshold() {
505            let ctx = StatusCheckContext::new(25, 30, 35, 25, 75, NetworkType::Evm);
506            assert!(ctx.should_force_finalize());
507        }
508
509        #[test]
510        fn test_context_should_force_finalize_total_at_threshold() {
511            let ctx = StatusCheckContext::new(10, 75, 80, 25, 75, NetworkType::Evm);
512            assert!(ctx.should_force_finalize());
513        }
514    }
515
516    mod final_state_tests {
517        use super::*;
518
519        fn verify_final_state(status: TransactionStatus) {
520            assert!(is_final_state(&status));
521        }
522
523        fn verify_not_final_state(status: TransactionStatus) {
524            assert!(!is_final_state(&status));
525        }
526
527        #[test]
528        fn test_confirmed_is_final() {
529            verify_final_state(TransactionStatus::Confirmed);
530        }
531
532        #[test]
533        fn test_failed_is_final() {
534            verify_final_state(TransactionStatus::Failed);
535        }
536
537        #[test]
538        fn test_canceled_is_final() {
539            verify_final_state(TransactionStatus::Canceled);
540        }
541
542        #[test]
543        fn test_expired_is_final() {
544            verify_final_state(TransactionStatus::Expired);
545        }
546
547        #[test]
548        fn test_pending_is_not_final() {
549            verify_not_final_state(TransactionStatus::Pending);
550        }
551
552        #[test]
553        fn test_sent_is_not_final() {
554            verify_not_final_state(TransactionStatus::Sent);
555        }
556
557        #[test]
558        fn test_submitted_is_not_final() {
559            verify_not_final_state(TransactionStatus::Submitted);
560        }
561
562        #[test]
563        fn test_mined_is_not_final() {
564            verify_not_final_state(TransactionStatus::Mined);
565        }
566    }
567
568    mod handle_result_tests {
569        use super::*;
570
571        /// Tests that counter increment uses saturating_add to prevent overflow
572        #[test]
573        fn test_counter_increment_saturating() {
574            let consecutive: u32 = u32::MAX;
575            let total: u32 = u32::MAX;
576
577            let new_consecutive = consecutive.saturating_add(1);
578            let new_total = total.saturating_add(1);
579
580            // Should not overflow, stays at MAX
581            assert_eq!(new_consecutive, u32::MAX);
582            assert_eq!(new_total, u32::MAX);
583        }
584
585        /// Tests normal counter increment
586        #[test]
587        fn test_counter_increment_normal() {
588            let consecutive: u32 = 5;
589            let total: u32 = 10;
590
591            let new_consecutive = consecutive.saturating_add(1);
592            let new_total = total.saturating_add(1);
593
594            assert_eq!(new_consecutive, 6);
595            assert_eq!(new_total, 11);
596        }
597
598        /// Tests that consecutive counter resets to 0 on success (non-final)
599        #[test]
600        fn test_consecutive_reset_on_success() {
601            // When status check succeeds but tx is not final,
602            // consecutive should reset to 0, total stays unchanged
603            let total: u32 = 20;
604
605            // On success, consecutive resets
606            let new_consecutive = 0;
607            let new_total = total; // unchanged
608
609            assert_eq!(new_consecutive, 0);
610            assert_eq!(new_total, 20);
611        }
612
613        /// Tests that final states are correctly identified for cleanup
614        #[test]
615        fn test_final_state_triggers_cleanup() {
616            let final_states = vec![
617                TransactionStatus::Confirmed,
618                TransactionStatus::Failed,
619                TransactionStatus::Canceled,
620                TransactionStatus::Expired,
621            ];
622
623            for status in final_states {
624                assert!(
625                    is_final_state(&status),
626                    "Expected {:?} to be a final state",
627                    status
628                );
629            }
630        }
631
632        /// Tests that non-final states trigger retry
633        #[test]
634        fn test_non_final_state_triggers_retry() {
635            let non_final_states = vec![
636                TransactionStatus::Pending,
637                TransactionStatus::Sent,
638                TransactionStatus::Submitted,
639                TransactionStatus::Mined,
640            ];
641
642            for status in non_final_states {
643                assert!(
644                    !is_final_state(&status),
645                    "Expected {:?} to NOT be a final state",
646                    status
647                );
648            }
649        }
650    }
651
652    mod handle_request_result_tests {
653        use super::*;
654
655        #[test]
656        fn test_handle_request_result_with_counters() {
657            let result = HandleRequestResult {
658                result: Ok(TransactionRepoModel::default()),
659                consecutive_failures: Some(5),
660                total_failures: Some(10),
661                should_retry_on_error: true,
662            };
663
664            assert!(result.result.is_ok());
665            assert_eq!(result.consecutive_failures, Some(5));
666            assert_eq!(result.total_failures, Some(10));
667            assert!(result.should_retry_on_error);
668        }
669
670        #[test]
671        fn test_handle_request_result_without_counters() {
672            // Early failure before counters could be read
673            let result = HandleRequestResult {
674                result: Err(eyre::eyre!("Transaction not found")),
675                consecutive_failures: None,
676                total_failures: None,
677                should_retry_on_error: false,
678            };
679
680            assert!(result.result.is_err());
681            assert_eq!(result.consecutive_failures, None);
682            assert_eq!(result.total_failures, None);
683            assert!(!result.should_retry_on_error);
684        }
685
686        #[test]
687        fn test_permanent_error_should_not_retry() {
688            // NotFound errors are permanent - should not retry
689            let result = HandleRequestResult {
690                result: Err(eyre::eyre!("Transaction not found")),
691                consecutive_failures: None,
692                total_failures: None,
693                should_retry_on_error: false,
694            };
695
696            // Permanent errors have should_retry_on_error = false
697            assert!(!result.should_retry_on_error);
698        }
699
700        #[test]
701        fn test_transient_error_should_retry() {
702            // Network/connection errors are transient - should retry
703            let result = HandleRequestResult {
704                result: Err(eyre::eyre!("Connection timeout")),
705                consecutive_failures: Some(3),
706                total_failures: Some(7),
707                should_retry_on_error: true,
708            };
709
710            // Transient errors have should_retry_on_error = true
711            assert!(result.should_retry_on_error);
712        }
713    }
714}