1use crate::{
7 constants::WORKER_DEFAULT_MAXIMUM_RETRIES,
8 domain::{get_network_relayer, Relayer},
9 jobs::{handle_result, Job, JobProducerTrait, RelayerHealthCheck},
10 models::{
11 produce_relayer_enabled_payload, DefaultAppState, DisabledReason, NetworkRepoModel,
12 NotificationRepoModel, RelayerRepoModel, SignerRepoModel, ThinDataAppState,
13 TransactionRepoModel,
14 },
15 observability::request_id::set_request_id,
16 repositories::{
17 ApiKeyRepositoryTrait, NetworkRepository, PluginRepositoryTrait, RelayerRepository,
18 Repository, TransactionCounterTrait, TransactionRepository,
19 },
20 utils::calculate_scheduled_timestamp,
21};
22use actix_web::web::ThinData;
23use apalis::prelude::{Attempt, Data, TaskId, *};
24use eyre::Result;
25use std::time::Duration;
26use tracing::{debug, info, instrument, warn};
27
28#[instrument(
53 level = "debug",
54 skip(job, app_state),
55 fields(
56 request_id = ?job.request_id,
57 job_id = %job.message_id,
58 job_type = %job.job_type.to_string(),
59 attempt = %attempt.current(),
60 relayer_id = %job.data.relayer_id,
61 task_id = %task_id.to_string(),
62 )
63)]
64pub async fn relayer_health_check_handler(
65 job: Job<RelayerHealthCheck>,
66 app_state: Data<ThinData<DefaultAppState>>,
67 attempt: Attempt,
68 task_id: TaskId,
69) -> Result<(), Error> {
70 if let Some(request_id) = job.request_id.clone() {
71 set_request_id(request_id);
72 }
73
74 relayer_health_check_handler_impl(job, app_state, attempt).await
75}
76
77#[allow(clippy::type_complexity)]
79async fn relayer_health_check_handler_impl<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
80 job: Job<RelayerHealthCheck>,
81 app_state: Data<ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>>,
82 attempt: Attempt,
83) -> Result<(), Error>
84where
85 J: JobProducerTrait + Send + Sync + 'static,
86 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
87 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
88 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
89 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
90 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
91 TCR: TransactionCounterTrait + Send + Sync + 'static,
92 PR: PluginRepositoryTrait + Send + Sync + 'static,
93 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
94{
95 let result = check_and_reenable_relayer(job.data, &app_state).await;
96 handle_result(
97 result,
98 attempt,
99 "relayer_health_check",
100 WORKER_DEFAULT_MAXIMUM_RETRIES,
101 )
102}
103
104async fn check_and_reenable_relayer<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
105 data: RelayerHealthCheck,
106 app_state: &ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
107) -> Result<()>
108where
109 J: JobProducerTrait + Send + Sync + 'static,
110 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
111 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
112 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
113 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
114 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
115 TCR: TransactionCounterTrait + Send + Sync + 'static,
116 PR: PluginRepositoryTrait + Send + Sync + 'static,
117 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
118{
119 let relayer_id = data.relayer_id.clone();
120
121 debug!(
122 relayer_id = %relayer_id,
123 retry_count = data.retry_count,
124 "Running health check on disabled relayer"
125 );
126
127 let relayer = app_state
129 .relayer_repository
130 .get_by_id(relayer_id.clone())
131 .await
132 .map_err(|e| eyre::eyre!("Failed to get relayer: {}", e))?;
133
134 if !relayer.system_disabled {
135 info!(
136 relayer_id = %relayer_id,
137 "Relayer is not disabled, skipping health check"
138 );
139 return Ok(());
140 }
141
142 let relayer_service = get_network_relayer(relayer_id.clone(), app_state)
144 .await
145 .map_err(|e| eyre::eyre!("Failed to get relayer: {}", e))?;
146
147 match relayer_service.check_health().await {
149 Ok(_) => {
150 info!(
152 relayer_id = %relayer_id,
153 retry_count = data.retry_count,
154 "Health checks passed, re-enabling relayer"
155 );
156
157 let enabled_relayer = app_state
159 .relayer_repository
160 .enable_relayer(relayer_id.clone())
161 .await
162 .map_err(|e| eyre::eyre!("Failed to enable relayer: {}", e))?;
163
164 if let Some(notification_id) = &enabled_relayer.notification_id {
166 app_state
167 .job_producer
168 .produce_send_notification_job(
169 produce_relayer_enabled_payload(
170 notification_id,
171 &enabled_relayer,
172 data.retry_count,
173 ),
174 None,
175 )
176 .await
177 .map_err(|e| eyre::eyre!("Failed to send notification: {}", e))?;
178
179 info!(
180 relayer_id = %relayer_id,
181 notification_id = %notification_id,
182 "Sent relayer recovery notification"
183 );
184 }
185
186 Ok(())
187 }
188 Err(failures) => {
189 let reason = DisabledReason::from_health_failures(failures).unwrap_or_else(|| {
191 DisabledReason::RpcValidationFailed("Unknown error".to_string())
192 });
193
194 warn!(
195 relayer_id = %relayer_id,
196 retry_count = data.retry_count,
197 reason = %reason,
198 "Health checks failed, scheduling retry"
199 );
200
201 let should_update = match &relayer.disabled_reason {
204 Some(old_reason) => !old_reason.same_variant(&reason),
205 None => true, };
207
208 if should_update {
209 debug!(
210 relayer_id = %relayer_id,
211 old_reason = ?relayer.disabled_reason,
212 new_reason = %reason,
213 "Disabled reason variant has changed, updating"
214 );
215
216 app_state
217 .relayer_repository
218 .disable_relayer(relayer_id.clone(), reason.clone())
219 .await
220 .map_err(|e| eyre::eyre!("Failed to update disabled reason: {}", e))?;
221 } else {
222 debug!(
223 relayer_id = %relayer_id,
224 reason = %reason,
225 "Disabled reason variant unchanged, skipping update"
226 );
227 }
228
229 let delay = calculate_backoff_delay(data.retry_count);
231
232 debug!(
233 relayer_id = %relayer_id,
234 next_retry = data.retry_count + 1,
235 delay_seconds = delay.as_secs(),
236 "Scheduling next health check attempt"
237 );
238
239 app_state
241 .job_producer
242 .produce_relayer_health_check_job(
243 RelayerHealthCheck::with_retry_count(relayer_id, data.retry_count + 1),
244 Some(calculate_scheduled_timestamp(delay.as_secs() as i64)),
245 )
246 .await
247 .map_err(|e| eyre::eyre!("Failed to schedule retry: {}", e))?;
248
249 Ok(())
250 }
251 }
252}
253
254fn calculate_backoff_delay(retry_count: u32) -> Duration {
263 let seconds = match retry_count {
264 0 => 10,
265 1 => 20,
266 2 => 30,
267 3 => 45,
268 _ => 60, };
270 Duration::from_secs(seconds)
271}
272
273#[cfg(test)]
274mod tests {
275 use super::*;
276 use crate::models::{
277 DisabledReason, NetworkType, RelayerEvmPolicy, RelayerNetworkPolicy, RelayerRepoModel,
278 };
279
280 #[test]
281 fn test_calculate_backoff_delay() {
282 assert_eq!(calculate_backoff_delay(0), Duration::from_secs(10)); assert_eq!(calculate_backoff_delay(1), Duration::from_secs(20)); assert_eq!(calculate_backoff_delay(2), Duration::from_secs(30)); assert_eq!(calculate_backoff_delay(3), Duration::from_secs(45)); assert_eq!(calculate_backoff_delay(4), Duration::from_secs(60)); assert_eq!(calculate_backoff_delay(10), Duration::from_secs(60)); assert_eq!(calculate_backoff_delay(100), Duration::from_secs(60)); }
290
291 #[test]
292 fn test_relayer_health_check_creation() {
293 let health_check = RelayerHealthCheck::new("test-relayer".to_string());
294 assert_eq!(health_check.relayer_id, "test-relayer");
295 assert_eq!(health_check.retry_count, 0);
296
297 let health_check_with_retry =
298 RelayerHealthCheck::with_retry_count("test-relayer".to_string(), 3);
299 assert_eq!(health_check_with_retry.relayer_id, "test-relayer");
300 assert_eq!(health_check_with_retry.retry_count, 3);
301 }
302
303 fn create_disabled_relayer(id: &str) -> RelayerRepoModel {
304 RelayerRepoModel {
305 id: id.to_string(),
306 name: format!("Relayer {}", id),
307 network: "sepolia".to_string(),
308 paused: false,
309 network_type: NetworkType::Evm,
310 policies: RelayerNetworkPolicy::Evm(RelayerEvmPolicy {
311 gas_price_cap: None,
312 whitelist_receivers: None,
313 eip1559_pricing: Some(false),
314 private_transactions: Some(false),
315 min_balance: Some(0),
316 gas_limit_estimation: Some(false),
317 }),
318 signer_id: "test-signer".to_string(),
319 address: "0x742d35Cc6634C0532925a3b8D8C2e48a73F6ba2E".to_string(),
320 notification_id: Some("test-notification".to_string()),
321 system_disabled: true,
322 disabled_reason: Some(DisabledReason::RpcValidationFailed(
323 "RPC unavailable".to_string(),
324 )),
325 custom_rpc_urls: None,
326 }
327 }
328
329 #[tokio::test]
330 async fn test_health_check_data_structure() {
331 let health_check = RelayerHealthCheck::new("test-relayer".to_string());
333 assert_eq!(health_check.relayer_id, "test-relayer");
334 assert_eq!(health_check.retry_count, 0);
335
336 let health_check_retry =
338 RelayerHealthCheck::with_retry_count("test-relayer".to_string(), 5);
339 assert_eq!(health_check_retry.retry_count, 5);
340
341 let expected_delay = calculate_backoff_delay(5);
343 assert_eq!(expected_delay, Duration::from_secs(60)); }
345
346 #[tokio::test]
348 async fn test_relayer_health_check_handler_impl_exits_on_enabled() {
349 use crate::jobs::MockJobProducerTrait;
350 use crate::models::AppState;
351 use crate::repositories::{
352 ApiKeyRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage,
353 PluginRepositoryStorage, RelayerRepositoryStorage, Repository, SignerRepositoryStorage,
354 TransactionCounterRepositoryStorage, TransactionRepositoryStorage,
355 };
356 use std::sync::Arc;
357
358 let mock_job_producer = MockJobProducerTrait::new();
360
361 let relayer_repo = Arc::new(RelayerRepositoryStorage::new_in_memory());
363
364 let mut relayer = create_disabled_relayer("test-handler-enabled");
366 relayer.system_disabled = false;
367 relayer.disabled_reason = None;
368 relayer_repo.create(relayer).await.unwrap();
369
370 let app_state = Data::new(actix_web::web::ThinData(AppState {
372 relayer_repository: relayer_repo,
373 transaction_repository: Arc::new(TransactionRepositoryStorage::new_in_memory()),
374 signer_repository: Arc::new(SignerRepositoryStorage::new_in_memory()),
375 notification_repository: Arc::new(NotificationRepositoryStorage::new_in_memory()),
376 network_repository: Arc::new(NetworkRepositoryStorage::new_in_memory()),
377 transaction_counter_store: Arc::new(
378 TransactionCounterRepositoryStorage::new_in_memory(),
379 ),
380 job_producer: Arc::new(mock_job_producer),
381 plugin_repository: Arc::new(PluginRepositoryStorage::new_in_memory()),
382 api_key_repository: Arc::new(ApiKeyRepositoryStorage::new_in_memory()),
383 }));
384
385 let health_check = RelayerHealthCheck::new("test-handler-enabled".to_string());
387 let job = Job::new(crate::jobs::JobType::RelayerHealthCheck, health_check);
388 let attempt = Attempt::new_with_value(1);
389
390 let result = relayer_health_check_handler_impl(job, app_state, attempt).await;
392
393 assert!(result.is_ok());
395 }
396
397 #[tokio::test]
398 async fn test_relayer_health_check_backoff_progression() {
399 let delays: Vec<Duration> = (0..6).map(calculate_backoff_delay).collect();
401
402 assert_eq!(delays[0], Duration::from_secs(10)); assert_eq!(delays[1], Duration::from_secs(20)); assert_eq!(delays[2], Duration::from_secs(30)); assert_eq!(delays[3], Duration::from_secs(45)); assert_eq!(delays[4], Duration::from_secs(60)); assert_eq!(delays[5], Duration::from_secs(60)); for i in 0..4 {
412 assert!(
413 delays[i] < delays[i + 1],
414 "Delay should increase with retry count"
415 );
416 }
417
418 assert_eq!(delays[4], delays[5], "Delay should cap at 60 seconds");
420 }
421
422 #[tokio::test]
423 async fn test_disabled_reason_is_preserved() {
424 use crate::repositories::RelayerRepositoryStorage;
426 let repo = RelayerRepositoryStorage::new_in_memory();
427
428 let relayer = create_disabled_relayer("test-relayer-2");
429 let disabled_reason = relayer.disabled_reason.clone();
430
431 repo.create(relayer).await.unwrap();
432
433 let retrieved = repo.get_by_id("test-relayer-2".to_string()).await.unwrap();
435
436 assert!(retrieved.system_disabled);
437 assert_eq!(retrieved.disabled_reason, disabled_reason);
438
439 if let Some(reason) = &retrieved.disabled_reason {
441 let description = reason.description();
442 assert!(description.contains("RPC"));
443 }
444 }
445
446 #[tokio::test]
447 async fn test_check_and_reenable_relayer_exits_early_if_not_disabled() {
448 use crate::jobs::MockJobProducerTrait;
449 use crate::models::AppState;
450 use crate::repositories::{
451 ApiKeyRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage,
452 PluginRepositoryStorage, RelayerRepositoryStorage, Repository, SignerRepositoryStorage,
453 TransactionCounterRepositoryStorage, TransactionRepositoryStorage,
454 };
455 use std::sync::Arc;
456
457 let relayer_repo = Arc::new(RelayerRepositoryStorage::new_in_memory());
459
460 let mut relayer = create_disabled_relayer("test-check-enabled");
462 relayer.system_disabled = false;
463 relayer.disabled_reason = None;
464 relayer_repo.create(relayer).await.unwrap();
465
466 let mock_job_producer = MockJobProducerTrait::new();
468
469 let app_state = AppState {
471 relayer_repository: relayer_repo.clone(),
472 transaction_repository: Arc::new(TransactionRepositoryStorage::new_in_memory()),
473 signer_repository: Arc::new(SignerRepositoryStorage::new_in_memory()),
474 notification_repository: Arc::new(NotificationRepositoryStorage::new_in_memory()),
475 network_repository: Arc::new(NetworkRepositoryStorage::new_in_memory()),
476 transaction_counter_store: Arc::new(
477 TransactionCounterRepositoryStorage::new_in_memory(),
478 ),
479 job_producer: Arc::new(mock_job_producer),
480 plugin_repository: Arc::new(PluginRepositoryStorage::new_in_memory()),
481 api_key_repository: Arc::new(ApiKeyRepositoryStorage::new_in_memory()),
482 };
483
484 let health_check = RelayerHealthCheck::new("test-check-enabled".to_string());
486
487 let thin_app_state = actix_web::web::ThinData(app_state);
489
490 let result = check_and_reenable_relayer(health_check, &thin_app_state).await;
492
493 assert!(result.is_ok());
495
496 let retrieved = relayer_repo
498 .get_by_id("test-check-enabled".to_string())
499 .await
500 .unwrap();
501 assert!(!retrieved.system_disabled);
502 assert!(retrieved.disabled_reason.is_none());
503 }
504
505 #[tokio::test]
506 async fn test_check_and_reenable_variant_comparison() {
507 use crate::models::DisabledReason;
509
510 let reason1 = DisabledReason::RpcValidationFailed("Error A".to_string());
512 let reason2 = DisabledReason::RpcValidationFailed("Error B".to_string());
513 assert!(reason1.same_variant(&reason2));
514
515 let reason3 = DisabledReason::NonceSyncFailed("Error".to_string());
517 assert!(!reason1.same_variant(&reason3));
518
519 let multi1 = DisabledReason::Multiple(vec![
521 DisabledReason::RpcValidationFailed("A".to_string()),
522 DisabledReason::NonceSyncFailed("B".to_string()),
523 ]);
524 let multi2 = DisabledReason::Multiple(vec![
525 DisabledReason::RpcValidationFailed("C".to_string()),
526 DisabledReason::NonceSyncFailed("D".to_string()),
527 ]);
528 assert!(multi1.same_variant(&multi2));
529
530 let multi3 = DisabledReason::Multiple(vec![
532 DisabledReason::RpcValidationFailed("A".to_string()),
533 DisabledReason::BalanceCheckFailed("B".to_string()),
534 ]);
535 assert!(!multi1.same_variant(&multi3));
536 }
537
538 #[tokio::test]
539 async fn test_backoff_delay_calculation_edge_cases() {
540 let delay0 = calculate_backoff_delay(0);
544 assert_eq!(delay0, Duration::from_secs(10));
545
546 let delay_large = calculate_backoff_delay(100);
548 assert_eq!(delay_large, Duration::from_secs(60));
549
550 let mut prev_delay = Duration::from_secs(0);
552 for retry in 0..10 {
553 let delay = calculate_backoff_delay(retry);
554 if delay < Duration::from_secs(60) {
555 assert!(delay > prev_delay, "Retry {}: delay should increase", retry);
557 } else {
558 assert_eq!(
560 delay,
561 Duration::from_secs(60),
562 "Retry {}: should cap at 60s",
563 retry
564 );
565 }
566 prev_delay = delay;
567 }
568 }
569
570 #[tokio::test]
571 async fn test_disabled_reason_from_health_failures() {
572 use crate::models::{DisabledReason, HealthCheckFailure};
573
574 let empty_result = DisabledReason::from_health_failures(vec![]);
576 assert!(empty_result.is_none());
577
578 let single_failure = vec![HealthCheckFailure::RpcValidationFailed(
580 "RPC down".to_string(),
581 )];
582 let single_result = DisabledReason::from_health_failures(single_failure);
583 assert!(single_result.is_some());
584 match single_result.unwrap() {
585 DisabledReason::RpcValidationFailed(msg) => {
586 assert_eq!(msg, "RPC down");
587 }
588 _ => panic!("Expected RpcValidationFailed variant"),
589 }
590
591 let multiple_failures = vec![
593 HealthCheckFailure::RpcValidationFailed("RPC error".to_string()),
594 HealthCheckFailure::NonceSyncFailed("Nonce error".to_string()),
595 ];
596 let multiple_result = DisabledReason::from_health_failures(multiple_failures);
597 assert!(multiple_result.is_some());
598 match multiple_result.unwrap() {
599 DisabledReason::Multiple(reasons) => {
600 assert_eq!(reasons.len(), 2);
601 assert!(matches!(reasons[0], DisabledReason::RpcValidationFailed(_)));
602 assert!(matches!(reasons[1], DisabledReason::NonceSyncFailed(_)));
603 }
604 _ => panic!("Expected Multiple variant"),
605 }
606 }
607
608 #[tokio::test]
609 async fn test_relayer_health_check_retry_count_increments() {
610 let retry_counts = vec![0, 1, 2, 5, 10];
612
613 for retry_count in retry_counts {
614 let health_check =
615 RelayerHealthCheck::with_retry_count("test-relayer".to_string(), retry_count);
616
617 assert_eq!(health_check.retry_count, retry_count);
619
620 let next_health_check =
622 RelayerHealthCheck::with_retry_count("test-relayer".to_string(), retry_count + 1);
623 assert_eq!(next_health_check.retry_count, retry_count + 1);
624
625 let current_delay = calculate_backoff_delay(retry_count);
627 let next_delay = calculate_backoff_delay(retry_count + 1);
628
629 if current_delay < Duration::from_secs(60) {
630 assert!(next_delay >= current_delay);
631 } else {
632 assert_eq!(next_delay, Duration::from_secs(60));
633 }
634 }
635 }
636
637 #[tokio::test]
638 async fn test_repository_enable_disable_operations() {
639 use crate::models::DisabledReason;
640 use crate::repositories::{RelayerRepositoryStorage, Repository};
641
642 let repo = RelayerRepositoryStorage::new_in_memory();
643
644 let mut relayer = create_disabled_relayer("test-enable-disable");
646 relayer.system_disabled = false;
647 relayer.disabled_reason = None;
648 repo.create(relayer).await.unwrap();
649
650 let reason = DisabledReason::RpcValidationFailed("Test error".to_string());
652 let disabled = repo
653 .disable_relayer("test-enable-disable".to_string(), reason.clone())
654 .await
655 .unwrap();
656
657 assert!(disabled.system_disabled);
658 assert_eq!(disabled.disabled_reason, Some(reason));
659
660 let enabled = repo
662 .enable_relayer("test-enable-disable".to_string())
663 .await
664 .unwrap();
665
666 assert!(!enabled.system_disabled);
667 assert!(enabled.disabled_reason.is_none());
668
669 let retrieved = repo
671 .get_by_id("test-enable-disable".to_string())
672 .await
673 .unwrap();
674 assert!(!retrieved.system_disabled);
675 assert!(retrieved.disabled_reason.is_none());
676 }
677
678 #[tokio::test]
679 async fn test_disabled_reason_safe_description() {
680 use crate::models::DisabledReason;
681
682 let reasons = vec![
684 DisabledReason::NonceSyncFailed("Error with API key abc123".to_string()),
685 DisabledReason::RpcValidationFailed(
686 "RPC error: http://secret-rpc.com:8545".to_string(),
687 ),
688 DisabledReason::BalanceCheckFailed("Balance: 1.5 ETH at address 0x123...".to_string()),
689 ];
690
691 for reason in reasons {
692 let safe_desc = reason.safe_description();
693
694 assert!(!safe_desc.contains("abc123"));
696 assert!(!safe_desc.contains("http://"));
697 assert!(!safe_desc.contains("0x123"));
698 assert!(!safe_desc.contains("1.5 ETH"));
699
700 assert!(!safe_desc.is_empty());
702 }
703
704 let multiple = DisabledReason::Multiple(vec![
706 DisabledReason::RpcValidationFailed("Secret RPC info".to_string()),
707 DisabledReason::NonceSyncFailed("Secret nonce info".to_string()),
708 ]);
709
710 let safe_desc = multiple.safe_description();
711 assert!(!safe_desc.contains("Secret"));
712 assert!(safe_desc.contains("RPC endpoint validation failed"));
713 assert!(safe_desc.contains("Nonce synchronization failed"));
714 }
715}