1use crate::{
6 config::ServerConfig,
7 constants::{
8 DEFAULT_CONCURRENCY_HEALTH_CHECK, DEFAULT_CONCURRENCY_NOTIFICATION,
9 DEFAULT_CONCURRENCY_STATUS_CHECKER, DEFAULT_CONCURRENCY_STATUS_CHECKER_EVM,
10 DEFAULT_CONCURRENCY_STATUS_CHECKER_STELLAR, DEFAULT_CONCURRENCY_TOKEN_SWAP,
11 DEFAULT_CONCURRENCY_TRANSACTION_REQUEST, DEFAULT_CONCURRENCY_TRANSACTION_SENDER,
12 WORKER_NOTIFICATION_SENDER_RETRIES, WORKER_RELAYER_HEALTH_CHECK_RETRIES,
13 WORKER_SYSTEM_CLEANUP_RETRIES, WORKER_TOKEN_SWAP_REQUEST_RETRIES,
14 WORKER_TRANSACTION_CLEANUP_RETRIES, WORKER_TRANSACTION_REQUEST_RETRIES,
15 WORKER_TRANSACTION_STATUS_CHECKER_RETRIES, WORKER_TRANSACTION_SUBMIT_RETRIES,
16 },
17 jobs::{
18 notification_handler, relayer_health_check_handler, system_cleanup_handler,
19 token_swap_cron_handler, token_swap_request_handler, transaction_cleanup_handler,
20 transaction_request_handler, transaction_status_handler, transaction_submission_handler,
21 JobProducerTrait,
22 },
23 models::{
24 NetworkRepoModel, NotificationRepoModel, RelayerNetworkPolicy, RelayerRepoModel,
25 SignerRepoModel, ThinDataAppState, TransactionRepoModel,
26 },
27 repositories::{
28 ApiKeyRepositoryTrait, NetworkRepository, PluginRepositoryTrait, RelayerRepository,
29 Repository, TransactionCounterTrait, TransactionRepository,
30 },
31};
32use apalis::prelude::*;
33
34use apalis::layers::retry::backoff::MakeBackoff;
35use apalis::layers::retry::{backoff::ExponentialBackoffMaker, RetryPolicy};
36use apalis::layers::ErrorHandlingLayer;
37
38pub use tower::util::rng::HasherRng;
40
41use apalis_cron::CronStream;
42use eyre::Result;
43use std::{str::FromStr, time::Duration};
44use tokio::signal::unix::SignalKind;
45use tracing::{debug, error, info};
46
47const TRANSACTION_REQUEST: &str = "transaction_request";
48const TRANSACTION_SENDER: &str = "transaction_sender";
49const TRANSACTION_STATUS_CHECKER: &str = "transaction_status_checker";
51const TRANSACTION_STATUS_CHECKER_EVM: &str = "transaction_status_checker_evm";
53const TRANSACTION_STATUS_CHECKER_STELLAR: &str = "transaction_status_checker_stellar";
54const NOTIFICATION_SENDER: &str = "notification_sender";
55const TOKEN_SWAP_REQUEST: &str = "token_swap_request";
56const TRANSACTION_CLEANUP: &str = "transaction_cleanup";
57const RELAYER_HEALTH_CHECK: &str = "relayer_health_check";
58const SYSTEM_CLEANUP: &str = "system_cleanup";
59
60fn create_backoff(initial_ms: u64, max_ms: u64, jitter: f64) -> Result<ExponentialBackoffMaker> {
70 let maker = ExponentialBackoffMaker::new(
71 Duration::from_millis(initial_ms),
72 Duration::from_millis(max_ms),
73 jitter,
74 HasherRng::default(),
75 )?;
76
77 Ok(maker)
78}
79
80pub async fn initialize_workers<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
81 app_state: ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
82) -> Result<()>
83where
84 J: JobProducerTrait + Send + Sync + 'static,
85 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
86 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
87 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
88 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
89 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
90 TCR: TransactionCounterTrait + Send + Sync + 'static,
91 PR: PluginRepositoryTrait + Send + Sync + 'static,
92 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
93{
94 let queue = app_state.job_producer.get_queue().await?;
95
96 let transaction_request_queue_worker = WorkerBuilder::new(TRANSACTION_REQUEST)
97 .layer(ErrorHandlingLayer::new())
98 .retry(
99 RetryPolicy::retries(WORKER_TRANSACTION_REQUEST_RETRIES)
100 .with_backoff(create_backoff(500, 5000, 0.99)?.make_backoff()),
101 )
102 .enable_tracing()
103 .catch_panic()
104 .concurrency(ServerConfig::get_worker_concurrency(
105 TRANSACTION_REQUEST,
106 DEFAULT_CONCURRENCY_TRANSACTION_REQUEST,
107 ))
108 .data(app_state.clone())
109 .backend(queue.transaction_request_queue.clone())
110 .build_fn(transaction_request_handler);
111
112 let transaction_submission_queue_worker = WorkerBuilder::new(TRANSACTION_SENDER)
113 .layer(ErrorHandlingLayer::new())
114 .enable_tracing()
115 .catch_panic()
116 .retry(
117 RetryPolicy::retries(WORKER_TRANSACTION_SUBMIT_RETRIES)
118 .with_backoff(create_backoff(500, 2000, 0.99)?.make_backoff()),
119 )
120 .concurrency(ServerConfig::get_worker_concurrency(
121 TRANSACTION_SENDER,
122 DEFAULT_CONCURRENCY_TRANSACTION_SENDER,
123 ))
124 .data(app_state.clone())
125 .backend(queue.transaction_submission_queue.clone())
126 .build_fn(transaction_submission_handler);
127
128 let transaction_status_queue_worker = WorkerBuilder::new(TRANSACTION_STATUS_CHECKER)
131 .layer(ErrorHandlingLayer::new())
132 .enable_tracing()
133 .catch_panic()
134 .retry(
135 RetryPolicy::retries(WORKER_TRANSACTION_STATUS_CHECKER_RETRIES)
136 .with_backoff(create_backoff(5000, 8000, 0.99)?.make_backoff()),
137 )
138 .concurrency(ServerConfig::get_worker_concurrency(
139 TRANSACTION_STATUS_CHECKER,
140 DEFAULT_CONCURRENCY_STATUS_CHECKER,
141 ))
142 .data(app_state.clone())
143 .backend(queue.transaction_status_queue.clone())
144 .build_fn(transaction_status_handler);
145
146 let transaction_status_queue_worker_evm = WorkerBuilder::new(TRANSACTION_STATUS_CHECKER_EVM)
149 .layer(ErrorHandlingLayer::new())
150 .enable_tracing()
151 .catch_panic()
152 .retry(
153 RetryPolicy::retries(WORKER_TRANSACTION_STATUS_CHECKER_RETRIES)
154 .with_backoff(create_backoff(8000, 12000, 0.99)?.make_backoff()),
155 )
156 .concurrency(ServerConfig::get_worker_concurrency(
157 TRANSACTION_STATUS_CHECKER_EVM,
158 DEFAULT_CONCURRENCY_STATUS_CHECKER_EVM,
159 ))
160 .data(app_state.clone())
161 .backend(queue.transaction_status_queue_evm.clone())
162 .build_fn(transaction_status_handler);
163
164 let transaction_status_queue_worker_stellar =
167 WorkerBuilder::new(TRANSACTION_STATUS_CHECKER_STELLAR)
168 .layer(ErrorHandlingLayer::new())
169 .enable_tracing()
170 .catch_panic()
171 .retry(
172 RetryPolicy::retries(WORKER_TRANSACTION_STATUS_CHECKER_RETRIES)
173 .with_backoff(create_backoff(2000, 3000, 0.99)?.make_backoff()),
174 )
175 .concurrency(ServerConfig::get_worker_concurrency(
176 TRANSACTION_STATUS_CHECKER_STELLAR,
177 DEFAULT_CONCURRENCY_STATUS_CHECKER_STELLAR,
178 ))
179 .data(app_state.clone())
180 .backend(queue.transaction_status_queue_stellar.clone())
181 .build_fn(transaction_status_handler);
182
183 let notification_queue_worker = WorkerBuilder::new(NOTIFICATION_SENDER)
184 .layer(ErrorHandlingLayer::new())
185 .enable_tracing()
186 .catch_panic()
187 .retry(
188 RetryPolicy::retries(WORKER_NOTIFICATION_SENDER_RETRIES)
189 .with_backoff(create_backoff(2000, 8000, 0.99)?.make_backoff()),
190 )
191 .concurrency(ServerConfig::get_worker_concurrency(
192 NOTIFICATION_SENDER,
193 DEFAULT_CONCURRENCY_NOTIFICATION,
194 ))
195 .data(app_state.clone())
196 .backend(queue.notification_queue.clone())
197 .build_fn(notification_handler);
198
199 let token_swap_request_queue_worker = WorkerBuilder::new(TOKEN_SWAP_REQUEST)
200 .layer(ErrorHandlingLayer::new())
201 .enable_tracing()
202 .catch_panic()
203 .retry(
204 RetryPolicy::retries(WORKER_TOKEN_SWAP_REQUEST_RETRIES)
205 .with_backoff(create_backoff(5000, 20000, 0.99)?.make_backoff()),
206 )
207 .concurrency(ServerConfig::get_worker_concurrency(
208 TOKEN_SWAP_REQUEST,
209 DEFAULT_CONCURRENCY_TOKEN_SWAP,
210 ))
211 .data(app_state.clone())
212 .backend(queue.token_swap_request_queue.clone())
213 .build_fn(token_swap_request_handler);
214
215 let transaction_cleanup_queue_worker = WorkerBuilder::new(TRANSACTION_CLEANUP)
216 .layer(ErrorHandlingLayer::new())
217 .enable_tracing()
218 .catch_panic()
219 .retry(
220 RetryPolicy::retries(WORKER_TRANSACTION_CLEANUP_RETRIES)
221 .with_backoff(create_backoff(5000, 20000, 0.99)?.make_backoff()),
222 )
223 .concurrency(ServerConfig::get_worker_concurrency(TRANSACTION_CLEANUP, 1)) .data(app_state.clone())
225 .backend(CronStream::new(
226 apalis_cron::Schedule::from_str("0 */10 * * * *")?,
228 ))
229 .build_fn(transaction_cleanup_handler);
230
231 let system_cleanup_queue_worker = WorkerBuilder::new(SYSTEM_CLEANUP)
232 .layer(ErrorHandlingLayer::new())
233 .enable_tracing()
234 .catch_panic()
235 .retry(
236 RetryPolicy::retries(WORKER_SYSTEM_CLEANUP_RETRIES)
237 .with_backoff(create_backoff(5000, 20000, 0.99)?.make_backoff()),
238 )
239 .concurrency(1)
240 .data(app_state.clone())
241 .backend(CronStream::new(
242 apalis_cron::Schedule::from_str("0 */15 * * * *")?,
244 ))
245 .build_fn(system_cleanup_handler);
246
247 let relayer_health_check_worker = WorkerBuilder::new(RELAYER_HEALTH_CHECK)
248 .layer(ErrorHandlingLayer::new())
249 .enable_tracing()
250 .catch_panic()
251 .retry(
252 RetryPolicy::retries(WORKER_RELAYER_HEALTH_CHECK_RETRIES)
253 .with_backoff(create_backoff(2000, 10000, 0.99)?.make_backoff()),
254 )
255 .concurrency(ServerConfig::get_worker_concurrency(
256 RELAYER_HEALTH_CHECK,
257 DEFAULT_CONCURRENCY_HEALTH_CHECK,
258 ))
259 .data(app_state.clone())
260 .backend(queue.relayer_health_check_queue.clone())
261 .build_fn(relayer_health_check_handler);
262
263 let monitor = Monitor::new()
264 .register(transaction_request_queue_worker)
265 .register(transaction_submission_queue_worker)
266 .register(transaction_status_queue_worker)
267 .register(transaction_status_queue_worker_evm)
268 .register(transaction_status_queue_worker_stellar)
269 .register(notification_queue_worker)
270 .register(token_swap_request_queue_worker)
271 .register(transaction_cleanup_queue_worker)
272 .register(system_cleanup_queue_worker)
273 .register(relayer_health_check_worker)
274 .on_event(monitor_handle_event)
275 .shutdown_timeout(Duration::from_millis(5000));
276
277 let monitor_future = monitor.run_with_signal(async {
278 let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())
279 .expect("Failed to create SIGINT signal");
280 let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate())
281 .expect("Failed to create SIGTERM signal");
282
283 debug!("Workers monitor started");
284
285 tokio::select! {
286 _ = sigint.recv() => debug!("Received SIGINT."),
287 _ = sigterm.recv() => debug!("Received SIGTERM."),
288 };
289
290 debug!("Workers monitor shutting down");
291
292 Ok(())
293 });
294 tokio::spawn(async move {
295 if let Err(e) = monitor_future.await {
296 error!(error = %e, "monitor error");
297 }
298 });
299 debug!("Workers monitor shutdown complete");
300
301 Ok(())
302}
303
304fn filter_relayers_for_swap(relayers: Vec<RelayerRepoModel>) -> Vec<RelayerRepoModel> {
310 relayers
311 .into_iter()
312 .filter(|relayer| {
313 match &relayer.policies {
314 RelayerNetworkPolicy::Solana(policy) => {
315 let swap_config = match policy.get_swap_config() {
316 Some(config) => config,
317 None => {
318 debug!(relayer_id = %relayer.id, "No Solana swap configuration specified; skipping");
319 return false;
320 }
321 };
322
323 if swap_config.cron_schedule.is_none() {
324 debug!(relayer_id = %relayer.id, "No cron schedule specified; skipping");
325 return false;
326 }
327 true
328 }
329 RelayerNetworkPolicy::Stellar(policy) => {
330 let swap_config = match policy.get_swap_config() {
331 Some(config) => config,
332 None => {
333 debug!(relayer_id = %relayer.id, "No Stellar swap configuration specified; skipping");
334 return false;
335 }
336 };
337
338 if swap_config.cron_schedule.is_none() {
339 debug!(relayer_id = %relayer.id, "No cron schedule specified; skipping");
340 return false;
341 }
342 true
343 }
344 _ => {
345 debug!(relayer_id = %relayer.id, "Network type does not support swap; skipping");
346 false
347 }
348 }
349 })
350 .collect()
351}
352
353pub async fn initialize_token_swap_workers<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
356 app_state: ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
357) -> Result<()>
358where
359 J: JobProducerTrait + Send + Sync + 'static,
360 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
361 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
362 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
363 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
364 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
365 TCR: TransactionCounterTrait + Send + Sync + 'static,
366 PR: PluginRepositoryTrait + Send + Sync + 'static,
367 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
368{
369 let active_relayers = app_state.relayer_repository.list_active().await?;
370 let relayers_with_swap_enabled = filter_relayers_for_swap(active_relayers);
371
372 if relayers_with_swap_enabled.is_empty() {
373 debug!("No relayers with swap enabled");
374 return Ok(());
375 }
376 info!(
377 "Found {} relayers with swap enabled",
378 relayers_with_swap_enabled.len()
379 );
380
381 let mut workers = Vec::new();
382
383 let swap_backoff = create_backoff(2000, 5000, 0.99)?.make_backoff();
384
385 for relayer in relayers_with_swap_enabled {
386 debug!(relayer = ?relayer, "found relayer with swap enabled");
387
388 let (cron_schedule, network_type) = match &relayer.policies {
389 RelayerNetworkPolicy::Solana(policy) => match policy.get_swap_config() {
390 Some(config) => match config.cron_schedule {
391 Some(schedule) => (schedule, "solana".to_string()),
392 None => {
393 debug!(relayer_id = %relayer.id, "No cron schedule specified for Solana relayer; skipping");
394 continue;
395 }
396 },
397 None => {
398 debug!(relayer_id = %relayer.id, "No swap configuration specified for Solana relayer; skipping");
399 continue;
400 }
401 },
402 RelayerNetworkPolicy::Stellar(policy) => match policy.get_swap_config() {
403 Some(config) => match config.cron_schedule {
404 Some(schedule) => (schedule, "stellar".to_string()),
405 None => {
406 debug!(relayer_id = %relayer.id, "No cron schedule specified for Stellar relayer; skipping");
407 continue;
408 }
409 },
410 None => {
411 debug!(relayer_id = %relayer.id, "No swap configuration specified for Stellar relayer; skipping");
412 continue;
413 }
414 },
415 RelayerNetworkPolicy::Evm(_) => {
416 debug!(relayer_id = %relayer.id, "EVM relayers do not support swap; skipping");
417 continue;
418 }
419 };
420
421 let calendar_schedule = match apalis_cron::Schedule::from_str(&cron_schedule) {
422 Ok(schedule) => schedule,
423 Err(e) => {
424 error!(relayer_id = %relayer.id, error = %e, "Failed to parse cron schedule; skipping");
425 continue;
426 }
427 };
428
429 let worker = WorkerBuilder::new(format!(
431 "{}-swap-schedule-{}",
432 network_type,
433 relayer.id.clone()
434 ))
435 .layer(ErrorHandlingLayer::new())
436 .enable_tracing()
437 .catch_panic()
438 .retry(
439 RetryPolicy::retries(WORKER_TOKEN_SWAP_REQUEST_RETRIES)
440 .with_backoff(swap_backoff.clone()),
441 )
442 .concurrency(1)
443 .data(relayer.id.clone())
444 .data(app_state.clone())
445 .backend(CronStream::new(calendar_schedule))
446 .build_fn(token_swap_cron_handler);
447
448 workers.push(worker);
449 debug!(
450 relayer_id = %relayer.id,
451 network_type = %network_type,
452 "Created worker for relayer with swap enabled"
453 );
454 }
455
456 let mut monitor = Monitor::new()
457 .on_event(monitor_handle_event)
458 .shutdown_timeout(Duration::from_millis(5000));
459
460 for worker in workers {
462 monitor = monitor.register(worker);
463 }
464
465 let monitor_future = monitor.run_with_signal(async {
466 let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())
467 .expect("Failed to create SIGINT signal");
468 let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate())
469 .expect("Failed to create SIGTERM signal");
470
471 debug!("Swap Monitor started");
472
473 tokio::select! {
474 _ = sigint.recv() => debug!("Received SIGINT."),
475 _ = sigterm.recv() => debug!("Received SIGTERM."),
476 };
477
478 debug!("Swap Monitor shutting down");
479
480 Ok(())
481 });
482 tokio::spawn(async move {
483 if let Err(e) = monitor_future.await {
484 error!(error = %e, "monitor error");
485 }
486 });
487 Ok(())
488}
489
490fn monitor_handle_event(e: Worker<Event>) {
491 let worker_id = e.id();
492 match e.inner() {
493 Event::Engage(task_id) => {
494 debug!(worker_id = %worker_id, task_id = %task_id, "worker got a job");
495 }
496 Event::Error(e) => {
497 error!(worker_id = %worker_id, error = %e, "worker encountered an error");
498 }
499 Event::Exit => {
500 debug!(worker_id = %worker_id, "worker exited");
501 }
502 Event::Idle => {
503 debug!(worker_id = %worker_id, "worker is idle");
504 }
505 Event::Start => {
506 debug!(worker_id = %worker_id, "worker started");
507 }
508 Event::Stop => {
509 debug!(worker_id = %worker_id, "worker stopped");
510 }
511 _ => {}
512 }
513}
514
515#[cfg(test)]
516mod tests {
517 use super::*;
518 use crate::models::{
519 NetworkType, RelayerEvmPolicy, RelayerNetworkPolicy, RelayerRepoModel, RelayerSolanaPolicy,
520 RelayerSolanaSwapConfig, RelayerStellarPolicy, RelayerStellarSwapConfig,
521 StellarFeePaymentStrategy, StellarSwapStrategy,
522 };
523
524 fn create_test_evm_relayer(id: &str) -> RelayerRepoModel {
525 RelayerRepoModel {
526 id: id.to_string(),
527 name: format!("EVM Relayer {}", id),
528 network: "sepolia".to_string(),
529 paused: false,
530 network_type: NetworkType::Evm,
531 policies: RelayerNetworkPolicy::Evm(RelayerEvmPolicy::default()),
532 signer_id: "test-signer".to_string(),
533 address: "0x742d35Cc6634C0532925a3b8D8C2e48a73F6ba2E".to_string(),
534 system_disabled: false,
535 ..Default::default()
536 }
537 }
538
539 fn create_test_solana_relayer_with_swap(
540 id: &str,
541 cron_schedule: Option<String>,
542 ) -> RelayerRepoModel {
543 RelayerRepoModel {
544 id: id.to_string(),
545 name: format!("Solana Relayer {}", id),
546 network: "mainnet-beta".to_string(),
547 paused: false,
548 network_type: NetworkType::Solana,
549 policies: RelayerNetworkPolicy::Solana(RelayerSolanaPolicy {
550 min_balance: Some(1000000000),
551 allowed_tokens: None,
552 allowed_programs: None,
553 max_signatures: None,
554 max_tx_data_size: None,
555 fee_payment_strategy: None,
556 fee_margin_percentage: None,
557 allowed_accounts: None,
558 disallowed_accounts: None,
559 max_allowed_fee_lamports: None,
560 swap_config: Some(RelayerSolanaSwapConfig {
561 strategy: None,
562 cron_schedule,
563 min_balance_threshold: Some(5000000000),
564 jupiter_swap_options: None,
565 }),
566 }),
567 signer_id: "test-signer".to_string(),
568 address: "5zWma6gn4QxRfC6xZk6KfpXWXXgV3Xt6VzPpXMKCMYW5".to_string(),
569 system_disabled: false,
570 ..Default::default()
571 }
572 }
573
574 fn create_test_stellar_relayer_with_swap(
575 id: &str,
576 cron_schedule: Option<String>,
577 ) -> RelayerRepoModel {
578 RelayerRepoModel {
579 id: id.to_string(),
580 name: format!("Stellar Relayer {}", id),
581 network: "testnet".to_string(),
582 paused: false,
583 network_type: NetworkType::Stellar,
584 policies: RelayerNetworkPolicy::Stellar(RelayerStellarPolicy {
585 min_balance: Some(1000000000),
586 max_fee: None,
587 timeout_seconds: None,
588 concurrent_transactions: None,
589 allowed_tokens: None,
590 fee_payment_strategy: Some(StellarFeePaymentStrategy::User),
591 slippage_percentage: None,
592 fee_margin_percentage: None,
593 swap_config: Some(RelayerStellarSwapConfig {
594 strategies: vec![StellarSwapStrategy::OrderBook],
595 cron_schedule,
596 min_balance_threshold: Some(5000000000),
597 }),
598 }),
599 signer_id: "test-signer".to_string(),
600 address: "GABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890ABCDEFGH".to_string(),
601 system_disabled: false,
602 ..Default::default()
603 }
604 }
605
606 #[test]
607 fn test_filter_relayers_for_swap_with_empty_list() {
608 let relayers = vec![];
609 let filtered = filter_relayers_for_swap(relayers);
610
611 assert_eq!(
612 filtered.len(),
613 0,
614 "Should return empty list when no relayers provided"
615 );
616 }
617
618 #[test]
619 fn test_filter_relayers_for_swap_filters_non_solana_stellar() {
620 let relayers = vec![
621 create_test_evm_relayer("evm-1"),
622 create_test_evm_relayer("evm-2"),
623 ];
624
625 let filtered = filter_relayers_for_swap(relayers);
626
627 assert_eq!(
628 filtered.len(),
629 0,
630 "Should filter out all non-Solana/Stellar relayers"
631 );
632 }
633
634 #[test]
635 fn test_filter_relayers_for_swap_filters_no_cron_schedule() {
636 let relayers = vec![
637 create_test_solana_relayer_with_swap("solana-1", None),
638 create_test_solana_relayer_with_swap("solana-2", None),
639 create_test_stellar_relayer_with_swap("stellar-1", None),
640 create_test_stellar_relayer_with_swap("stellar-2", None),
641 ];
642
643 let filtered = filter_relayers_for_swap(relayers);
644
645 assert_eq!(
646 filtered.len(),
647 0,
648 "Should filter out Solana and Stellar relayers without cron schedule"
649 );
650 }
651
652 #[test]
653 fn test_filter_relayers_for_swap_includes_valid_relayers() {
654 let relayers = vec![
655 create_test_solana_relayer_with_swap("solana-1", Some("0 0 * * * *".to_string())),
656 create_test_solana_relayer_with_swap("solana-2", Some("0 */2 * * * *".to_string())),
657 create_test_stellar_relayer_with_swap("stellar-1", Some("0 0 * * * *".to_string())),
658 create_test_stellar_relayer_with_swap("stellar-2", Some("0 */2 * * * *".to_string())),
659 ];
660
661 let filtered = filter_relayers_for_swap(relayers);
662
663 assert_eq!(
664 filtered.len(),
665 4,
666 "Should include all Solana and Stellar relayers with cron schedule"
667 );
668 let ids: Vec<&str> = filtered.iter().map(|r| r.id.as_str()).collect();
669 assert!(ids.contains(&"solana-1"), "Should include solana-1");
670 assert!(ids.contains(&"solana-2"), "Should include solana-2");
671 assert!(ids.contains(&"stellar-1"), "Should include stellar-1");
672 assert!(ids.contains(&"stellar-2"), "Should include stellar-2");
673 }
674
675 #[test]
676 fn test_filter_relayers_for_swap_with_mixed_relayers() {
677 let relayers = vec![
678 create_test_evm_relayer("evm-1"),
679 create_test_solana_relayer_with_swap("solana-no-cron", None),
680 create_test_solana_relayer_with_swap(
681 "solana-with-cron-1",
682 Some("0 0 * * * *".to_string()),
683 ),
684 create_test_evm_relayer("evm-2"),
685 create_test_solana_relayer_with_swap(
686 "solana-with-cron-2",
687 Some("0 */3 * * * *".to_string()),
688 ),
689 create_test_stellar_relayer_with_swap("stellar-no-cron", None),
690 create_test_stellar_relayer_with_swap(
691 "stellar-with-cron-1",
692 Some("0 0 * * * *".to_string()),
693 ),
694 create_test_stellar_relayer_with_swap(
695 "stellar-with-cron-2",
696 Some("0 */3 * * * *".to_string()),
697 ),
698 ];
699
700 let filtered = filter_relayers_for_swap(relayers);
701
702 assert_eq!(
703 filtered.len(),
704 4,
705 "Should only include Solana and Stellar relayers with cron schedule"
706 );
707
708 let ids: Vec<&str> = filtered.iter().map(|r| r.id.as_str()).collect();
710 assert!(
711 ids.contains(&"solana-with-cron-1"),
712 "Should include solana-with-cron-1"
713 );
714 assert!(
715 ids.contains(&"solana-with-cron-2"),
716 "Should include solana-with-cron-2"
717 );
718 assert!(
719 ids.contains(&"stellar-with-cron-1"),
720 "Should include stellar-with-cron-1"
721 );
722 assert!(
723 ids.contains(&"stellar-with-cron-2"),
724 "Should include stellar-with-cron-2"
725 );
726 assert!(!ids.contains(&"evm-1"), "Should not include EVM relayers");
727 assert!(
728 !ids.contains(&"solana-no-cron"),
729 "Should not include Solana without cron"
730 );
731 assert!(
732 !ids.contains(&"stellar-no-cron"),
733 "Should not include Stellar without cron"
734 );
735 }
736
737 #[test]
738 fn test_filter_relayers_for_swap_preserves_solana_relayer_data() {
739 let cron = "0 1 * * * *".to_string();
740 let relayers = vec![create_test_solana_relayer_with_swap(
741 "test-relayer",
742 Some(cron.clone()),
743 )];
744
745 let filtered = filter_relayers_for_swap(relayers);
746
747 assert_eq!(filtered.len(), 1);
748
749 let relayer = &filtered[0];
750 assert_eq!(relayer.id, "test-relayer");
751 assert_eq!(relayer.name, "Solana Relayer test-relayer");
752 assert_eq!(relayer.network_type, NetworkType::Solana);
753
754 let policy = relayer.policies.get_solana_policy();
756 let swap_config = policy.get_swap_config().expect("Should have swap config");
757 assert_eq!(swap_config.cron_schedule.as_ref(), Some(&cron));
758 }
759
760 #[test]
761 fn test_filter_relayers_for_swap_preserves_stellar_relayer_data() {
762 let cron = "0 1 * * * *".to_string();
763 let relayers = vec![create_test_stellar_relayer_with_swap(
764 "test-relayer",
765 Some(cron.clone()),
766 )];
767
768 let filtered = filter_relayers_for_swap(relayers);
769
770 assert_eq!(filtered.len(), 1);
771
772 let relayer = &filtered[0];
773 assert_eq!(relayer.id, "test-relayer");
774 assert_eq!(relayer.name, "Stellar Relayer test-relayer");
775 assert_eq!(relayer.network_type, NetworkType::Stellar);
776
777 let policy = relayer.policies.get_stellar_policy();
779 let swap_config = policy.get_swap_config().expect("Should have swap config");
780 assert_eq!(swap_config.cron_schedule.as_ref(), Some(&cron));
781 }
782
783 fn create_test_solana_relayer_without_swap_config(id: &str) -> RelayerRepoModel {
784 RelayerRepoModel {
785 id: id.to_string(),
786 name: format!("Solana Relayer {}", id),
787 network: "mainnet-beta".to_string(),
788 paused: false,
789 network_type: NetworkType::Solana,
790 policies: RelayerNetworkPolicy::Solana(RelayerSolanaPolicy {
791 min_balance: Some(1000000000),
792 allowed_tokens: None,
793 allowed_programs: None,
794 max_signatures: None,
795 max_tx_data_size: None,
796 fee_payment_strategy: None,
797 fee_margin_percentage: None,
798 allowed_accounts: None,
799 disallowed_accounts: None,
800 max_allowed_fee_lamports: None,
801 swap_config: None, }),
803 signer_id: "test-signer".to_string(),
804 address: "5zWma6gn4QxRfC6xZk6KfpXWXXgV3Xt6VzPpXMKCMYW5".to_string(),
805 system_disabled: false,
806 ..Default::default()
807 }
808 }
809
810 fn create_test_stellar_relayer_without_swap_config(id: &str) -> RelayerRepoModel {
811 RelayerRepoModel {
812 id: id.to_string(),
813 name: format!("Stellar Relayer {}", id),
814 network: "testnet".to_string(),
815 paused: false,
816 network_type: NetworkType::Stellar,
817 policies: RelayerNetworkPolicy::Stellar(RelayerStellarPolicy {
818 min_balance: Some(1000000000),
819 max_fee: None,
820 timeout_seconds: None,
821 concurrent_transactions: None,
822 allowed_tokens: None,
823 fee_payment_strategy: Some(StellarFeePaymentStrategy::User),
824 slippage_percentage: None,
825 fee_margin_percentage: None,
826 swap_config: None, }),
828 signer_id: "test-signer".to_string(),
829 address: "GABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890ABCDEFGH".to_string(),
830 system_disabled: false,
831 ..Default::default()
832 }
833 }
834
835 #[test]
836 fn test_filter_relayers_for_swap_filters_solana_without_swap_config() {
837 let relayers = vec![
838 create_test_solana_relayer_without_swap_config("solana-1"),
839 create_test_solana_relayer_without_swap_config("solana-2"),
840 ];
841
842 let filtered = filter_relayers_for_swap(relayers);
843
844 assert_eq!(
845 filtered.len(),
846 0,
847 "Should filter out Solana relayers without swap config"
848 );
849 }
850
851 #[test]
852 fn test_filter_relayers_for_swap_filters_stellar_without_swap_config() {
853 let relayers = vec![
854 create_test_stellar_relayer_without_swap_config("stellar-1"),
855 create_test_stellar_relayer_without_swap_config("stellar-2"),
856 ];
857
858 let filtered = filter_relayers_for_swap(relayers);
859
860 assert_eq!(
861 filtered.len(),
862 0,
863 "Should filter out Stellar relayers without swap config"
864 );
865 }
866
867 #[test]
868 fn test_filter_relayers_for_swap_with_mixed_swap_configs() {
869 let relayers = vec![
870 create_test_solana_relayer_without_swap_config("solana-no-config"),
871 create_test_solana_relayer_with_swap("solana-no-cron", None),
872 create_test_solana_relayer_with_swap(
873 "solana-with-cron",
874 Some("0 0 * * * *".to_string()),
875 ),
876 create_test_stellar_relayer_without_swap_config("stellar-no-config"),
877 create_test_stellar_relayer_with_swap("stellar-no-cron", None),
878 create_test_stellar_relayer_with_swap(
879 "stellar-with-cron",
880 Some("0 0 * * * *".to_string()),
881 ),
882 ];
883
884 let filtered = filter_relayers_for_swap(relayers);
885
886 assert_eq!(
887 filtered.len(),
888 2,
889 "Should only include relayers with swap config and cron schedule"
890 );
891
892 let ids: Vec<&str> = filtered.iter().map(|r| r.id.as_str()).collect();
893 assert!(
894 ids.contains(&"solana-with-cron"),
895 "Should include solana-with-cron"
896 );
897 assert!(
898 ids.contains(&"stellar-with-cron"),
899 "Should include stellar-with-cron"
900 );
901 assert!(
902 !ids.contains(&"solana-no-config"),
903 "Should not include solana without config"
904 );
905 assert!(
906 !ids.contains(&"solana-no-cron"),
907 "Should not include solana without cron"
908 );
909 assert!(
910 !ids.contains(&"stellar-no-config"),
911 "Should not include stellar without config"
912 );
913 assert!(
914 !ids.contains(&"stellar-no-cron"),
915 "Should not include stellar without cron"
916 );
917 }
918
919 #[test]
920 fn test_create_backoff_with_valid_parameters() {
921 let result = create_backoff(200, 5000, 0.99);
922 assert!(
923 result.is_ok(),
924 "Should create backoff with valid parameters"
925 );
926 }
927
928 #[test]
929 fn test_create_backoff_with_zero_initial() {
930 let result = create_backoff(0, 5000, 0.99);
931 assert!(
932 result.is_ok(),
933 "Should handle zero initial delay (edge case)"
934 );
935 }
936
937 #[test]
938 fn test_create_backoff_with_equal_initial_and_max() {
939 let result = create_backoff(1000, 1000, 0.5);
940 assert!(result.is_ok(), "Should handle equal initial and max delays");
941 }
942
943 #[test]
944 fn test_create_backoff_with_zero_jitter() {
945 let result = create_backoff(500, 5000, 0.0);
946 assert!(result.is_ok(), "Should handle zero jitter");
947 }
948
949 #[test]
950 fn test_create_backoff_with_max_jitter() {
951 let result = create_backoff(500, 5000, 1.0);
952 assert!(result.is_ok(), "Should handle maximum jitter (1.0)");
953 }
954
955 #[test]
956 fn test_create_backoff_with_small_values() {
957 let result = create_backoff(1, 10, 0.5);
958 assert!(result.is_ok(), "Should handle very small delay values");
959 }
960
961 #[test]
962 fn test_create_backoff_with_large_values() {
963 let result = create_backoff(10000, 60000, 0.99);
964 assert!(result.is_ok(), "Should handle large delay values");
965 }
966
967 #[test]
968 fn test_filter_relayers_preserves_order() {
969 let relayers = vec![
970 create_test_solana_relayer_with_swap("solana-1", Some("0 0 * * * *".to_string())),
971 create_test_stellar_relayer_with_swap("stellar-1", Some("0 0 * * * *".to_string())),
972 create_test_solana_relayer_with_swap("solana-2", Some("0 0 * * * *".to_string())),
973 create_test_stellar_relayer_with_swap("stellar-2", Some("0 0 * * * *".to_string())),
974 ];
975
976 let filtered = filter_relayers_for_swap(relayers);
977
978 assert_eq!(filtered.len(), 4);
979 assert_eq!(filtered[0].id, "solana-1");
980 assert_eq!(filtered[1].id, "stellar-1");
981 assert_eq!(filtered[2].id, "solana-2");
982 assert_eq!(filtered[3].id, "stellar-2");
983 }
984
985 #[test]
986 fn test_filter_relayers_with_different_cron_formats() {
987 let relayers = vec![
988 create_test_solana_relayer_with_swap("solana-1", Some("0 0 * * * *".to_string())), create_test_solana_relayer_with_swap("solana-2", Some("*/5 * * * * *".to_string())), create_test_stellar_relayer_with_swap("stellar-1", Some("0 0 12 * * *".to_string())), create_test_stellar_relayer_with_swap("stellar-2", Some("0 */15 * * * *".to_string())), ];
993
994 let filtered = filter_relayers_for_swap(relayers);
995
996 assert_eq!(
997 filtered.len(),
998 4,
999 "Should accept various valid cron schedule formats"
1000 );
1001 }
1002
1003 #[test]
1004 fn test_filter_relayers_with_all_network_types() {
1005 let relayers = vec![
1006 create_test_evm_relayer("evm-1"),
1007 create_test_solana_relayer_with_swap("solana-1", Some("0 0 * * * *".to_string())),
1008 create_test_stellar_relayer_with_swap("stellar-1", Some("0 0 * * * *".to_string())),
1009 ];
1010
1011 let filtered = filter_relayers_for_swap(relayers);
1012
1013 assert_eq!(filtered.len(), 2, "Should only include Solana and Stellar");
1014
1015 let network_types: Vec<NetworkType> =
1016 filtered.iter().map(|r| r.network_type.clone()).collect();
1017 assert!(
1018 network_types.contains(&NetworkType::Solana),
1019 "Should include Solana"
1020 );
1021 assert!(
1022 network_types.contains(&NetworkType::Stellar),
1023 "Should include Stellar"
1024 );
1025 assert!(
1026 !network_types.contains(&NetworkType::Evm),
1027 "Should not include EVM"
1028 );
1029 }
1030}