1use actix_web::web::ThinData;
8use apalis::prelude::{Attempt, Data, TaskId, *};
9use apalis_redis::RedisContext;
10use deadpool_redis::Pool;
11use eyre::Result;
12use redis::AsyncCommands;
13use std::sync::Arc;
14use tracing::{debug, info, instrument, warn};
15
16use crate::{
17 config::ServerConfig,
18 constants::{get_max_consecutive_status_failures, get_max_total_status_failures},
19 domain::{get_relayer_transaction, get_transaction_by_id, is_final_state, Transaction},
20 jobs::{Job, JobProducerTrait, StatusCheckContext, TransactionStatusCheck},
21 models::{ApiError, DefaultAppState, TransactionRepoModel},
22 observability::request_id::set_request_id,
23};
24
25const TX_STATUS_CHECK_METADATA_PREFIX: &str = "queue:tx_status_check_metadata";
28
29#[instrument(
30 level = "debug",
31 skip(job, state, _ctx),
32 fields(
33 request_id = ?job.request_id,
34 job_id = %job.message_id,
35 job_type = %job.job_type.to_string(),
36 attempt = %attempt.current(),
37 tx_id = %job.data.transaction_id,
38 relayer_id = %job.data.relayer_id,
39 task_id = %task_id.to_string(),
40 )
41)]
42pub async fn transaction_status_handler(
43 job: Job<TransactionStatusCheck>,
44 state: Data<ThinData<DefaultAppState>>,
45 attempt: Attempt,
46 task_id: TaskId,
47 _ctx: RedisContext,
48) -> Result<(), Error> {
49 if let Some(request_id) = job.request_id.clone() {
50 set_request_id(request_id);
51 }
52
53 let queue = state
55 .job_producer()
56 .get_queue()
57 .await
58 .map_err(|e| Error::Failed(Arc::new(format!("Failed to get queue: {e}").into())))?;
59
60 let redis_pool = queue.redis_connections().primary().clone();
61
62 let req_result =
64 handle_request(&job.data, &state, &redis_pool, attempt.current(), &task_id).await;
65
66 let tx_id = &job.data.transaction_id;
67
68 handle_result(
70 req_result.result,
71 &redis_pool,
72 tx_id,
73 req_result.consecutive_failures,
74 req_result.total_failures,
75 req_result.should_retry_on_error,
76 )
77 .await
78}
79
80async fn handle_result(
91 result: Result<TransactionRepoModel>,
92 redis_pool: &Arc<Pool>,
93 tx_id: &str,
94 consecutive_failures: Option<u32>,
95 total_failures: Option<u32>,
96 should_retry_on_error: bool,
97) -> Result<(), Error> {
98 match result {
99 Ok(tx) if is_final_state(&tx.status) => {
100 debug!(
102 tx_id = %tx.id,
103 relayer_id = %tx.relayer_id,
104 status = ?tx.status,
105 consecutive_failures = ?consecutive_failures,
106 total_failures = ?total_failures,
107 "transaction in final state, status check complete"
108 );
109
110 if let Err(e) = delete_counters_from_redis(redis_pool, tx_id).await {
112 warn!(error = %e, tx_id = %tx_id, "failed to clean up counters from Redis");
113 }
114
115 Ok(())
116 }
117 Ok(tx) => {
118 debug!(
120 tx_id = %tx.id,
121 relayer_id = %tx.relayer_id,
122 status = ?tx.status,
123 "transaction not in final state"
124 );
125
126 match (consecutive_failures, total_failures) {
129 (Some(consecutive), Some(total)) if consecutive > 0 || total > 0 => {
130 if let Err(e) = update_counters_in_redis(redis_pool, tx_id, 0, total).await {
131 warn!(error = %e, tx_id = %tx_id, relayer_id = %tx.relayer_id, "failed to reset consecutive counter in Redis");
132 }
133 }
134 _ => {
135 }
137 }
138
139 Err(Error::Failed(Arc::new(
141 format!(
142 "transaction status: {:?} - not in final state, retrying",
143 tx.status
144 )
145 .into(),
146 )))
147 }
148 Err(e) => {
149 if !should_retry_on_error {
151 info!(
152 error = %e,
153 tx_id = %tx_id,
154 "status check failed with permanent error, completing job without retry"
155 );
156 return Ok(());
157 }
158
159 match (consecutive_failures, total_failures) {
161 (Some(consecutive), Some(total)) => {
162 let new_consecutive = consecutive.saturating_add(1);
163 let new_total = total.saturating_add(1);
164
165 warn!(
166 error = %e,
167 tx_id = %tx_id,
168 consecutive_failures = new_consecutive,
169 total_failures = new_total,
170 "status check failed, incrementing failure counters"
171 );
172
173 if let Err(update_err) =
175 update_counters_in_redis(redis_pool, tx_id, new_consecutive, new_total)
176 .await
177 {
178 warn!(error = %update_err, tx_id = %tx_id, "failed to update counters in Redis");
179 }
180 }
181 _ => {
182 warn!(
184 error = %e,
185 tx_id = %tx_id,
186 "status check failed early, counters not available"
187 );
188 }
189 }
190
191 Err(Error::Failed(Arc::new(format!("{e}").into())))
193 }
194 }
195}
196
197fn get_metadata_key(tx_id: &str) -> String {
199 let redis_key_prefix = ServerConfig::get_redis_key_prefix();
200 format!("{redis_key_prefix}:{TX_STATUS_CHECK_METADATA_PREFIX}:{tx_id}")
201}
202
203async fn read_counters_from_redis(redis_pool: &Arc<Pool>, tx_id: &str) -> (u32, u32) {
207 let key = get_metadata_key(tx_id);
208
209 let result: Result<(u32, u32)> = async {
210 let mut conn = redis_pool
211 .get()
212 .await
213 .map_err(|e| eyre::eyre!("Failed to get Redis connection: {e}"))?;
214
215 let values: Vec<Option<String>> = conn
216 .hget(&key, &["consecutive", "total"])
217 .await
218 .map_err(|e| eyre::eyre!("Failed to read counters from Redis: {e}"))?;
219
220 let consecutive = values
221 .first()
222 .and_then(|v| v.as_ref())
223 .and_then(|v| v.parse().ok())
224 .unwrap_or(0);
225 let total = values
226 .get(1)
227 .and_then(|v| v.as_ref())
228 .and_then(|v| v.parse().ok())
229 .unwrap_or(0);
230
231 Ok((consecutive, total))
232 }
233 .await;
234
235 match result {
236 Ok(counters) => counters,
237 Err(e) => {
238 warn!(error = %e, tx_id = %tx_id, "failed to read counters from Redis, using defaults");
239 (0, 0)
240 }
241 }
242}
243
244async fn update_counters_in_redis(
250 redis_pool: &Arc<Pool>,
251 tx_id: &str,
252 consecutive: u32,
253 total: u32,
254) -> Result<()> {
255 let key = get_metadata_key(tx_id);
256
257 let mut conn = redis_pool
260 .get()
261 .await
262 .map_err(|e| eyre::eyre!("Failed to get Redis connection: {e}"))?;
263
264 let (ttl_result,): (i64,) = redis::pipe()
265 .hset_multiple(
266 &key,
267 &[
268 ("consecutive", consecutive.to_string()),
269 ("total", total.to_string()),
270 ],
271 )
272 .ignore()
273 .expire(&key, 43200) .query_async(&mut *conn)
275 .await
276 .map_err(|e| eyre::eyre!("Failed to update counters in Redis: {e}"))?;
277
278 let ttl_set = ttl_result == 1;
279
280 debug!(
281 tx_id = %tx_id,
282 consecutive,
283 total,
284 key,
285 ttl_set,
286 "updated status check counters in Redis"
287 );
288
289 Ok(())
290}
291
292async fn delete_counters_from_redis(redis_pool: &Arc<Pool>, tx_id: &str) -> Result<()> {
294 let key = get_metadata_key(tx_id);
295
296 let mut conn = redis_pool
297 .get()
298 .await
299 .map_err(|e| eyre::eyre!("Failed to get Redis connection: {e}"))?;
300
301 conn.del::<_, ()>(&key)
302 .await
303 .map_err(|e| eyre::eyre!("Failed to delete counters from Redis: {e}"))?;
304
305 debug!(tx_id = %tx_id, key, "deleted status check counters from Redis");
306
307 Ok(())
308}
309
310struct HandleRequestResult {
312 result: Result<TransactionRepoModel>,
313 consecutive_failures: Option<u32>,
314 total_failures: Option<u32>,
315 should_retry_on_error: bool,
317}
318
319async fn handle_request(
323 status_request: &TransactionStatusCheck,
324 state: &Data<ThinData<DefaultAppState>>,
325 redis_pool: &Arc<Pool>,
326 attempt: usize,
327 task_id: &TaskId,
328) -> HandleRequestResult {
329 let tx_id = &status_request.transaction_id;
330 debug!(
331 tx_id = %tx_id,
332 relayer_id = %status_request.relayer_id,
333 "handling transaction status check"
334 );
335
336 let transaction = match get_transaction_by_id(tx_id.clone(), state).await {
338 Ok(tx) => tx,
339 Err(ApiError::NotFound(msg)) => {
340 warn!(tx_id = %tx_id, "transaction not found, completing job without retry: {}", msg);
342 return HandleRequestResult {
343 result: Err(eyre::eyre!("Transaction not found: {}", msg)),
344 consecutive_failures: None,
345 total_failures: None,
346 should_retry_on_error: false,
347 };
348 }
349 Err(e) => {
350 return HandleRequestResult {
352 result: Err(e.into()),
353 consecutive_failures: None,
354 total_failures: None,
355 should_retry_on_error: true,
356 };
357 }
358 };
359
360 let (consecutive_failures, total_failures) = read_counters_from_redis(redis_pool, tx_id).await;
363
364 let network_type = transaction.network_type;
366 let max_consecutive = get_max_consecutive_status_failures(network_type);
367 let max_total = get_max_total_status_failures(network_type);
368
369 debug!(
370 tx_id = %tx_id,
371 consecutive_failures,
372 total_failures,
373 max_consecutive,
374 max_total,
375 attempt,
376 task_id = %task_id.to_string(),
377 "handling transaction status check"
378 );
379
380 let context = StatusCheckContext::new(
382 consecutive_failures,
383 total_failures,
384 attempt as u32,
385 max_consecutive,
386 max_total,
387 network_type,
388 );
389
390 let relayer_transaction =
392 match get_relayer_transaction(status_request.relayer_id.clone(), state).await {
393 Ok(rt) => rt,
394 Err(ApiError::NotFound(msg)) => {
395 warn!(
397 tx_id = %tx_id,
398 relayer_id = %status_request.relayer_id,
399 "relayer or signer not found, completing job without retry: {}", msg
400 );
401 return HandleRequestResult {
402 result: Err(eyre::eyre!("Relayer or signer not found: {}", msg)),
403 consecutive_failures: Some(consecutive_failures),
404 total_failures: Some(total_failures),
405 should_retry_on_error: false,
406 };
407 }
408 Err(e) => {
409 return HandleRequestResult {
411 result: Err(e.into()),
412 consecutive_failures: Some(consecutive_failures),
413 total_failures: Some(total_failures),
414 should_retry_on_error: true,
415 };
416 }
417 };
418
419 let result = relayer_transaction
421 .handle_transaction_status(transaction, Some(context))
422 .await
423 .map_err(|e| e.into());
424
425 if let Ok(tx) = result.as_ref() {
426 debug!(
427 tx_id = %tx.id,
428 status = ?tx.status,
429 "status check handled successfully"
430 );
431 }
432
433 HandleRequestResult {
434 result,
435 consecutive_failures: Some(consecutive_failures),
436 total_failures: Some(total_failures),
437 should_retry_on_error: true,
438 }
439}
440
441#[cfg(test)]
442mod tests {
443 use super::*;
444 use crate::models::{NetworkType, TransactionStatus};
445 use std::collections::HashMap;
446
447 #[test]
448 fn test_get_metadata_key() {
449 let key = get_metadata_key("tx123");
451 assert!(key.contains(TX_STATUS_CHECK_METADATA_PREFIX));
452 assert!(key.contains("tx123"));
453 }
454
455 #[tokio::test]
456 async fn test_status_check_job_validation() {
457 let check_job = TransactionStatusCheck::new("tx123", "relayer-1", NetworkType::Evm);
458 let job = Job::new(crate::jobs::JobType::TransactionStatusCheck, check_job);
459
460 assert_eq!(job.data.transaction_id, "tx123");
461 assert_eq!(job.data.relayer_id, "relayer-1");
462 assert!(job.data.metadata.is_none());
463 }
464
465 #[tokio::test]
466 async fn test_status_check_with_metadata() {
467 let mut metadata = HashMap::new();
468 metadata.insert("retry_count".to_string(), "2".to_string());
469 metadata.insert("last_status".to_string(), "pending".to_string());
470
471 let check_job = TransactionStatusCheck::new("tx123", "relayer-1", NetworkType::Evm)
472 .with_metadata(metadata.clone());
473
474 assert!(check_job.metadata.is_some());
475 let job_metadata = check_job.metadata.unwrap();
476 assert_eq!(job_metadata.get("retry_count").unwrap(), "2");
477 assert_eq!(job_metadata.get("last_status").unwrap(), "pending");
478 }
479
480 #[test]
481 fn test_status_check_network_type_required() {
482 let check_job = TransactionStatusCheck::new("tx123", "relayer-1", NetworkType::Evm);
484 assert!(check_job.network_type.is_some());
485
486 let solana_job = TransactionStatusCheck::new("tx456", "relayer-2", NetworkType::Solana);
488 assert_eq!(solana_job.network_type, Some(NetworkType::Solana));
489
490 let stellar_job = TransactionStatusCheck::new("tx789", "relayer-3", NetworkType::Stellar);
491 assert_eq!(stellar_job.network_type, Some(NetworkType::Stellar));
492 }
493
494 mod context_tests {
495 use super::*;
496
497 #[test]
498 fn test_context_should_force_finalize_below_threshold() {
499 let ctx = StatusCheckContext::new(5, 10, 15, 25, 75, NetworkType::Evm);
500 assert!(!ctx.should_force_finalize());
501 }
502
503 #[test]
504 fn test_context_should_force_finalize_consecutive_at_threshold() {
505 let ctx = StatusCheckContext::new(25, 30, 35, 25, 75, NetworkType::Evm);
506 assert!(ctx.should_force_finalize());
507 }
508
509 #[test]
510 fn test_context_should_force_finalize_total_at_threshold() {
511 let ctx = StatusCheckContext::new(10, 75, 80, 25, 75, NetworkType::Evm);
512 assert!(ctx.should_force_finalize());
513 }
514 }
515
516 mod final_state_tests {
517 use super::*;
518
519 fn verify_final_state(status: TransactionStatus) {
520 assert!(is_final_state(&status));
521 }
522
523 fn verify_not_final_state(status: TransactionStatus) {
524 assert!(!is_final_state(&status));
525 }
526
527 #[test]
528 fn test_confirmed_is_final() {
529 verify_final_state(TransactionStatus::Confirmed);
530 }
531
532 #[test]
533 fn test_failed_is_final() {
534 verify_final_state(TransactionStatus::Failed);
535 }
536
537 #[test]
538 fn test_canceled_is_final() {
539 verify_final_state(TransactionStatus::Canceled);
540 }
541
542 #[test]
543 fn test_expired_is_final() {
544 verify_final_state(TransactionStatus::Expired);
545 }
546
547 #[test]
548 fn test_pending_is_not_final() {
549 verify_not_final_state(TransactionStatus::Pending);
550 }
551
552 #[test]
553 fn test_sent_is_not_final() {
554 verify_not_final_state(TransactionStatus::Sent);
555 }
556
557 #[test]
558 fn test_submitted_is_not_final() {
559 verify_not_final_state(TransactionStatus::Submitted);
560 }
561
562 #[test]
563 fn test_mined_is_not_final() {
564 verify_not_final_state(TransactionStatus::Mined);
565 }
566 }
567
568 mod handle_result_tests {
569 use super::*;
570
571 #[test]
573 fn test_counter_increment_saturating() {
574 let consecutive: u32 = u32::MAX;
575 let total: u32 = u32::MAX;
576
577 let new_consecutive = consecutive.saturating_add(1);
578 let new_total = total.saturating_add(1);
579
580 assert_eq!(new_consecutive, u32::MAX);
582 assert_eq!(new_total, u32::MAX);
583 }
584
585 #[test]
587 fn test_counter_increment_normal() {
588 let consecutive: u32 = 5;
589 let total: u32 = 10;
590
591 let new_consecutive = consecutive.saturating_add(1);
592 let new_total = total.saturating_add(1);
593
594 assert_eq!(new_consecutive, 6);
595 assert_eq!(new_total, 11);
596 }
597
598 #[test]
600 fn test_consecutive_reset_on_success() {
601 let total: u32 = 20;
604
605 let new_consecutive = 0;
607 let new_total = total; assert_eq!(new_consecutive, 0);
610 assert_eq!(new_total, 20);
611 }
612
613 #[test]
615 fn test_final_state_triggers_cleanup() {
616 let final_states = vec![
617 TransactionStatus::Confirmed,
618 TransactionStatus::Failed,
619 TransactionStatus::Canceled,
620 TransactionStatus::Expired,
621 ];
622
623 for status in final_states {
624 assert!(
625 is_final_state(&status),
626 "Expected {:?} to be a final state",
627 status
628 );
629 }
630 }
631
632 #[test]
634 fn test_non_final_state_triggers_retry() {
635 let non_final_states = vec![
636 TransactionStatus::Pending,
637 TransactionStatus::Sent,
638 TransactionStatus::Submitted,
639 TransactionStatus::Mined,
640 ];
641
642 for status in non_final_states {
643 assert!(
644 !is_final_state(&status),
645 "Expected {:?} to NOT be a final state",
646 status
647 );
648 }
649 }
650 }
651
652 mod handle_request_result_tests {
653 use super::*;
654
655 #[test]
656 fn test_handle_request_result_with_counters() {
657 let result = HandleRequestResult {
658 result: Ok(TransactionRepoModel::default()),
659 consecutive_failures: Some(5),
660 total_failures: Some(10),
661 should_retry_on_error: true,
662 };
663
664 assert!(result.result.is_ok());
665 assert_eq!(result.consecutive_failures, Some(5));
666 assert_eq!(result.total_failures, Some(10));
667 assert!(result.should_retry_on_error);
668 }
669
670 #[test]
671 fn test_handle_request_result_without_counters() {
672 let result = HandleRequestResult {
674 result: Err(eyre::eyre!("Transaction not found")),
675 consecutive_failures: None,
676 total_failures: None,
677 should_retry_on_error: false,
678 };
679
680 assert!(result.result.is_err());
681 assert_eq!(result.consecutive_failures, None);
682 assert_eq!(result.total_failures, None);
683 assert!(!result.should_retry_on_error);
684 }
685
686 #[test]
687 fn test_permanent_error_should_not_retry() {
688 let result = HandleRequestResult {
690 result: Err(eyre::eyre!("Transaction not found")),
691 consecutive_failures: None,
692 total_failures: None,
693 should_retry_on_error: false,
694 };
695
696 assert!(!result.should_retry_on_error);
698 }
699
700 #[test]
701 fn test_transient_error_should_retry() {
702 let result = HandleRequestResult {
704 result: Err(eyre::eyre!("Connection timeout")),
705 consecutive_failures: Some(3),
706 total_failures: Some(7),
707 should_retry_on_error: true,
708 };
709
710 assert!(result.should_retry_on_error);
712 }
713 }
714}