openzeppelin_relayer/bootstrap/
initialize_workers.rs

1//! Worker initialization
2//!
3//! This module contains functions for initializing background workers,
4//! including job processors and other long-running tasks.
5use crate::{
6    config::ServerConfig,
7    constants::{
8        DEFAULT_CONCURRENCY_HEALTH_CHECK, DEFAULT_CONCURRENCY_NOTIFICATION,
9        DEFAULT_CONCURRENCY_STATUS_CHECKER, DEFAULT_CONCURRENCY_STATUS_CHECKER_EVM,
10        DEFAULT_CONCURRENCY_STATUS_CHECKER_STELLAR, DEFAULT_CONCURRENCY_TOKEN_SWAP,
11        DEFAULT_CONCURRENCY_TRANSACTION_REQUEST, DEFAULT_CONCURRENCY_TRANSACTION_SENDER,
12        WORKER_NOTIFICATION_SENDER_RETRIES, WORKER_RELAYER_HEALTH_CHECK_RETRIES,
13        WORKER_SYSTEM_CLEANUP_RETRIES, WORKER_TOKEN_SWAP_REQUEST_RETRIES,
14        WORKER_TRANSACTION_CLEANUP_RETRIES, WORKER_TRANSACTION_REQUEST_RETRIES,
15        WORKER_TRANSACTION_STATUS_CHECKER_RETRIES, WORKER_TRANSACTION_SUBMIT_RETRIES,
16    },
17    jobs::{
18        notification_handler, relayer_health_check_handler, system_cleanup_handler,
19        token_swap_cron_handler, token_swap_request_handler, transaction_cleanup_handler,
20        transaction_request_handler, transaction_status_handler, transaction_submission_handler,
21        JobProducerTrait,
22    },
23    models::{
24        NetworkRepoModel, NotificationRepoModel, RelayerNetworkPolicy, RelayerRepoModel,
25        SignerRepoModel, ThinDataAppState, TransactionRepoModel,
26    },
27    repositories::{
28        ApiKeyRepositoryTrait, NetworkRepository, PluginRepositoryTrait, RelayerRepository,
29        Repository, TransactionCounterTrait, TransactionRepository,
30    },
31};
32use apalis::prelude::*;
33
34use apalis::layers::retry::backoff::MakeBackoff;
35use apalis::layers::retry::{backoff::ExponentialBackoffMaker, RetryPolicy};
36use apalis::layers::ErrorHandlingLayer;
37
38/// Re-exports from [`tower::util`]
39pub use tower::util::rng::HasherRng;
40
41use apalis_cron::CronStream;
42use eyre::Result;
43use std::{str::FromStr, time::Duration};
44use tokio::signal::unix::SignalKind;
45use tracing::{debug, error, info};
46
47const TRANSACTION_REQUEST: &str = "transaction_request";
48const TRANSACTION_SENDER: &str = "transaction_sender";
49// Generic transaction status checker
50const TRANSACTION_STATUS_CHECKER: &str = "transaction_status_checker";
51// Network specific status checkers
52const TRANSACTION_STATUS_CHECKER_EVM: &str = "transaction_status_checker_evm";
53const TRANSACTION_STATUS_CHECKER_STELLAR: &str = "transaction_status_checker_stellar";
54const NOTIFICATION_SENDER: &str = "notification_sender";
55const TOKEN_SWAP_REQUEST: &str = "token_swap_request";
56const TRANSACTION_CLEANUP: &str = "transaction_cleanup";
57const RELAYER_HEALTH_CHECK: &str = "relayer_health_check";
58const SYSTEM_CLEANUP: &str = "system_cleanup";
59
60/// Creates an exponential backoff with configurable parameters
61///
62/// # Arguments
63/// * `initial_ms` - Initial delay in milliseconds (e.g., 200)
64/// * `max_ms` - Maximum delay in milliseconds (e.g., 5000)
65/// * `jitter` - Jitter factor 0.0-1.0 (e.g., 0.99 for high jitter)
66///
67/// # Returns
68/// A configured backoff instance ready for use with RetryPolicy
69fn create_backoff(initial_ms: u64, max_ms: u64, jitter: f64) -> Result<ExponentialBackoffMaker> {
70    let maker = ExponentialBackoffMaker::new(
71        Duration::from_millis(initial_ms),
72        Duration::from_millis(max_ms),
73        jitter,
74        HasherRng::default(),
75    )?;
76
77    Ok(maker)
78}
79
80pub async fn initialize_workers<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
81    app_state: ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
82) -> Result<()>
83where
84    J: JobProducerTrait + Send + Sync + 'static,
85    RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
86    TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
87    NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
88    NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
89    SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
90    TCR: TransactionCounterTrait + Send + Sync + 'static,
91    PR: PluginRepositoryTrait + Send + Sync + 'static,
92    AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
93{
94    let queue = app_state.job_producer.get_queue().await?;
95
96    let transaction_request_queue_worker = WorkerBuilder::new(TRANSACTION_REQUEST)
97        .layer(ErrorHandlingLayer::new())
98        .retry(
99            RetryPolicy::retries(WORKER_TRANSACTION_REQUEST_RETRIES)
100                .with_backoff(create_backoff(500, 5000, 0.99)?.make_backoff()),
101        )
102        .enable_tracing()
103        .catch_panic()
104        .concurrency(ServerConfig::get_worker_concurrency(
105            TRANSACTION_REQUEST,
106            DEFAULT_CONCURRENCY_TRANSACTION_REQUEST,
107        ))
108        .data(app_state.clone())
109        .backend(queue.transaction_request_queue.clone())
110        .build_fn(transaction_request_handler);
111
112    let transaction_submission_queue_worker = WorkerBuilder::new(TRANSACTION_SENDER)
113        .layer(ErrorHandlingLayer::new())
114        .enable_tracing()
115        .catch_panic()
116        .retry(
117            RetryPolicy::retries(WORKER_TRANSACTION_SUBMIT_RETRIES)
118                .with_backoff(create_backoff(500, 2000, 0.99)?.make_backoff()),
119        )
120        .concurrency(ServerConfig::get_worker_concurrency(
121            TRANSACTION_SENDER,
122            DEFAULT_CONCURRENCY_TRANSACTION_SENDER,
123        ))
124        .data(app_state.clone())
125        .backend(queue.transaction_submission_queue.clone())
126        .build_fn(transaction_submission_handler);
127
128    // Generic status checker
129    // Uses medium settings that work reasonably for most chains
130    let transaction_status_queue_worker = WorkerBuilder::new(TRANSACTION_STATUS_CHECKER)
131        .layer(ErrorHandlingLayer::new())
132        .enable_tracing()
133        .catch_panic()
134        .retry(
135            RetryPolicy::retries(WORKER_TRANSACTION_STATUS_CHECKER_RETRIES)
136                .with_backoff(create_backoff(5000, 8000, 0.99)?.make_backoff()),
137        )
138        .concurrency(ServerConfig::get_worker_concurrency(
139            TRANSACTION_STATUS_CHECKER,
140            DEFAULT_CONCURRENCY_STATUS_CHECKER,
141        ))
142        .data(app_state.clone())
143        .backend(queue.transaction_status_queue.clone())
144        .build_fn(transaction_status_handler);
145
146    // EVM status checker - slower retries to avoid premature resubmission
147    // EVM has longer block times (~12s) and needs time for resubmission logic
148    let transaction_status_queue_worker_evm = WorkerBuilder::new(TRANSACTION_STATUS_CHECKER_EVM)
149        .layer(ErrorHandlingLayer::new())
150        .enable_tracing()
151        .catch_panic()
152        .retry(
153            RetryPolicy::retries(WORKER_TRANSACTION_STATUS_CHECKER_RETRIES)
154                .with_backoff(create_backoff(8000, 12000, 0.99)?.make_backoff()),
155        )
156        .concurrency(ServerConfig::get_worker_concurrency(
157            TRANSACTION_STATUS_CHECKER_EVM,
158            DEFAULT_CONCURRENCY_STATUS_CHECKER_EVM,
159        ))
160        .data(app_state.clone())
161        .backend(queue.transaction_status_queue_evm.clone())
162        .build_fn(transaction_status_handler);
163
164    // Stellar status checker - fast retries for fast finality
165    // Stellar has sub-second finality, needs more frequent status checks
166    let transaction_status_queue_worker_stellar =
167        WorkerBuilder::new(TRANSACTION_STATUS_CHECKER_STELLAR)
168            .layer(ErrorHandlingLayer::new())
169            .enable_tracing()
170            .catch_panic()
171            .retry(
172                RetryPolicy::retries(WORKER_TRANSACTION_STATUS_CHECKER_RETRIES)
173                    .with_backoff(create_backoff(2000, 3000, 0.99)?.make_backoff()),
174            )
175            .concurrency(ServerConfig::get_worker_concurrency(
176                TRANSACTION_STATUS_CHECKER_STELLAR,
177                DEFAULT_CONCURRENCY_STATUS_CHECKER_STELLAR,
178            ))
179            .data(app_state.clone())
180            .backend(queue.transaction_status_queue_stellar.clone())
181            .build_fn(transaction_status_handler);
182
183    let notification_queue_worker = WorkerBuilder::new(NOTIFICATION_SENDER)
184        .layer(ErrorHandlingLayer::new())
185        .enable_tracing()
186        .catch_panic()
187        .retry(
188            RetryPolicy::retries(WORKER_NOTIFICATION_SENDER_RETRIES)
189                .with_backoff(create_backoff(2000, 8000, 0.99)?.make_backoff()),
190        )
191        .concurrency(ServerConfig::get_worker_concurrency(
192            NOTIFICATION_SENDER,
193            DEFAULT_CONCURRENCY_NOTIFICATION,
194        ))
195        .data(app_state.clone())
196        .backend(queue.notification_queue.clone())
197        .build_fn(notification_handler);
198
199    let token_swap_request_queue_worker = WorkerBuilder::new(TOKEN_SWAP_REQUEST)
200        .layer(ErrorHandlingLayer::new())
201        .enable_tracing()
202        .catch_panic()
203        .retry(
204            RetryPolicy::retries(WORKER_TOKEN_SWAP_REQUEST_RETRIES)
205                .with_backoff(create_backoff(5000, 20000, 0.99)?.make_backoff()),
206        )
207        .concurrency(ServerConfig::get_worker_concurrency(
208            TOKEN_SWAP_REQUEST,
209            DEFAULT_CONCURRENCY_TOKEN_SWAP,
210        ))
211        .data(app_state.clone())
212        .backend(queue.token_swap_request_queue.clone())
213        .build_fn(token_swap_request_handler);
214
215    let transaction_cleanup_queue_worker = WorkerBuilder::new(TRANSACTION_CLEANUP)
216        .layer(ErrorHandlingLayer::new())
217        .enable_tracing()
218        .catch_panic()
219        .retry(
220            RetryPolicy::retries(WORKER_TRANSACTION_CLEANUP_RETRIES)
221                .with_backoff(create_backoff(5000, 20000, 0.99)?.make_backoff()),
222        )
223        .concurrency(ServerConfig::get_worker_concurrency(TRANSACTION_CLEANUP, 1)) // Default to 1 to avoid DB conflicts
224        .data(app_state.clone())
225        .backend(CronStream::new(
226            // every 10 minutes
227            apalis_cron::Schedule::from_str("0 */10 * * * *")?,
228        ))
229        .build_fn(transaction_cleanup_handler);
230
231    let system_cleanup_queue_worker = WorkerBuilder::new(SYSTEM_CLEANUP)
232        .layer(ErrorHandlingLayer::new())
233        .enable_tracing()
234        .catch_panic()
235        .retry(
236            RetryPolicy::retries(WORKER_SYSTEM_CLEANUP_RETRIES)
237                .with_backoff(create_backoff(5000, 20000, 0.99)?.make_backoff()),
238        )
239        .concurrency(1)
240        .data(app_state.clone())
241        .backend(CronStream::new(
242            // Runs at the start of every 15 minutes
243            apalis_cron::Schedule::from_str("0 */15 * * * *")?,
244        ))
245        .build_fn(system_cleanup_handler);
246
247    let relayer_health_check_worker = WorkerBuilder::new(RELAYER_HEALTH_CHECK)
248        .layer(ErrorHandlingLayer::new())
249        .enable_tracing()
250        .catch_panic()
251        .retry(
252            RetryPolicy::retries(WORKER_RELAYER_HEALTH_CHECK_RETRIES)
253                .with_backoff(create_backoff(2000, 10000, 0.99)?.make_backoff()),
254        )
255        .concurrency(ServerConfig::get_worker_concurrency(
256            RELAYER_HEALTH_CHECK,
257            DEFAULT_CONCURRENCY_HEALTH_CHECK,
258        ))
259        .data(app_state.clone())
260        .backend(queue.relayer_health_check_queue.clone())
261        .build_fn(relayer_health_check_handler);
262
263    let monitor = Monitor::new()
264        .register(transaction_request_queue_worker)
265        .register(transaction_submission_queue_worker)
266        .register(transaction_status_queue_worker)
267        .register(transaction_status_queue_worker_evm)
268        .register(transaction_status_queue_worker_stellar)
269        .register(notification_queue_worker)
270        .register(token_swap_request_queue_worker)
271        .register(transaction_cleanup_queue_worker)
272        .register(system_cleanup_queue_worker)
273        .register(relayer_health_check_worker)
274        .on_event(monitor_handle_event)
275        .shutdown_timeout(Duration::from_millis(5000));
276
277    let monitor_future = monitor.run_with_signal(async {
278        let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())
279            .expect("Failed to create SIGINT signal");
280        let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate())
281            .expect("Failed to create SIGTERM signal");
282
283        debug!("Workers monitor started");
284
285        tokio::select! {
286            _ = sigint.recv() => debug!("Received SIGINT."),
287            _ = sigterm.recv() => debug!("Received SIGTERM."),
288        };
289
290        debug!("Workers monitor shutting down");
291
292        Ok(())
293    });
294    tokio::spawn(async move {
295        if let Err(e) = monitor_future.await {
296            error!(error = %e, "monitor error");
297        }
298    });
299    debug!("Workers monitor shutdown complete");
300
301    Ok(())
302}
303
304/// Filters relayers to find those eligible for swap workers (Solana or Stellar)
305/// Returns relayers that have:
306/// 1. Solana or Stellar network type
307/// 2. Swap configuration
308/// 3. Cron schedule defined
309fn filter_relayers_for_swap(relayers: Vec<RelayerRepoModel>) -> Vec<RelayerRepoModel> {
310    relayers
311        .into_iter()
312        .filter(|relayer| {
313            match &relayer.policies {
314                RelayerNetworkPolicy::Solana(policy) => {
315                    let swap_config = match policy.get_swap_config() {
316                        Some(config) => config,
317                        None => {
318                            debug!(relayer_id = %relayer.id, "No Solana swap configuration specified; skipping");
319                            return false;
320                        }
321                    };
322
323                    if swap_config.cron_schedule.is_none() {
324                        debug!(relayer_id = %relayer.id, "No cron schedule specified; skipping");
325                        return false;
326                    }
327                    true
328                }
329                RelayerNetworkPolicy::Stellar(policy) => {
330                    let swap_config = match policy.get_swap_config() {
331                        Some(config) => config,
332                        None => {
333                            debug!(relayer_id = %relayer.id, "No Stellar swap configuration specified; skipping");
334                            return false;
335                        }
336                    };
337
338                    if swap_config.cron_schedule.is_none() {
339                        debug!(relayer_id = %relayer.id, "No cron schedule specified; skipping");
340                        return false;
341                    }
342                    true
343                }
344                _ => {
345                    debug!(relayer_id = %relayer.id, "Network type does not support swap; skipping");
346                    false
347                }
348            }
349        })
350        .collect()
351}
352
353/// Initializes swap workers for Solana and Stellar relayers
354/// This function creates and registers workers for relayers that have swap enabled and cron schedule set.
355pub async fn initialize_token_swap_workers<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
356    app_state: ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
357) -> Result<()>
358where
359    J: JobProducerTrait + Send + Sync + 'static,
360    RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
361    TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
362    NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
363    NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
364    SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
365    TCR: TransactionCounterTrait + Send + Sync + 'static,
366    PR: PluginRepositoryTrait + Send + Sync + 'static,
367    AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
368{
369    let active_relayers = app_state.relayer_repository.list_active().await?;
370    let relayers_with_swap_enabled = filter_relayers_for_swap(active_relayers);
371
372    if relayers_with_swap_enabled.is_empty() {
373        debug!("No relayers with swap enabled");
374        return Ok(());
375    }
376    info!(
377        "Found {} relayers with swap enabled",
378        relayers_with_swap_enabled.len()
379    );
380
381    let mut workers = Vec::new();
382
383    let swap_backoff = create_backoff(2000, 5000, 0.99)?.make_backoff();
384
385    for relayer in relayers_with_swap_enabled {
386        debug!(relayer = ?relayer, "found relayer with swap enabled");
387
388        let (cron_schedule, network_type) = match &relayer.policies {
389            RelayerNetworkPolicy::Solana(policy) => match policy.get_swap_config() {
390                Some(config) => match config.cron_schedule {
391                    Some(schedule) => (schedule, "solana".to_string()),
392                    None => {
393                        debug!(relayer_id = %relayer.id, "No cron schedule specified for Solana relayer; skipping");
394                        continue;
395                    }
396                },
397                None => {
398                    debug!(relayer_id = %relayer.id, "No swap configuration specified for Solana relayer; skipping");
399                    continue;
400                }
401            },
402            RelayerNetworkPolicy::Stellar(policy) => match policy.get_swap_config() {
403                Some(config) => match config.cron_schedule {
404                    Some(schedule) => (schedule, "stellar".to_string()),
405                    None => {
406                        debug!(relayer_id = %relayer.id, "No cron schedule specified for Stellar relayer; skipping");
407                        continue;
408                    }
409                },
410                None => {
411                    debug!(relayer_id = %relayer.id, "No swap configuration specified for Stellar relayer; skipping");
412                    continue;
413                }
414            },
415            RelayerNetworkPolicy::Evm(_) => {
416                debug!(relayer_id = %relayer.id, "EVM relayers do not support swap; skipping");
417                continue;
418            }
419        };
420
421        let calendar_schedule = match apalis_cron::Schedule::from_str(&cron_schedule) {
422            Ok(schedule) => schedule,
423            Err(e) => {
424                error!(relayer_id = %relayer.id, error = %e, "Failed to parse cron schedule; skipping");
425                continue;
426            }
427        };
428
429        // Create worker and add to the workers vector
430        let worker = WorkerBuilder::new(format!(
431            "{}-swap-schedule-{}",
432            network_type,
433            relayer.id.clone()
434        ))
435        .layer(ErrorHandlingLayer::new())
436        .enable_tracing()
437        .catch_panic()
438        .retry(
439            RetryPolicy::retries(WORKER_TOKEN_SWAP_REQUEST_RETRIES)
440                .with_backoff(swap_backoff.clone()),
441        )
442        .concurrency(1)
443        .data(relayer.id.clone())
444        .data(app_state.clone())
445        .backend(CronStream::new(calendar_schedule))
446        .build_fn(token_swap_cron_handler);
447
448        workers.push(worker);
449        debug!(
450            relayer_id = %relayer.id,
451            network_type = %network_type,
452            "Created worker for relayer with swap enabled"
453        );
454    }
455
456    let mut monitor = Monitor::new()
457        .on_event(monitor_handle_event)
458        .shutdown_timeout(Duration::from_millis(5000));
459
460    // Register all workers with the monitor
461    for worker in workers {
462        monitor = monitor.register(worker);
463    }
464
465    let monitor_future = monitor.run_with_signal(async {
466        let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())
467            .expect("Failed to create SIGINT signal");
468        let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate())
469            .expect("Failed to create SIGTERM signal");
470
471        debug!("Swap Monitor started");
472
473        tokio::select! {
474            _ = sigint.recv() => debug!("Received SIGINT."),
475            _ = sigterm.recv() => debug!("Received SIGTERM."),
476        };
477
478        debug!("Swap Monitor shutting down");
479
480        Ok(())
481    });
482    tokio::spawn(async move {
483        if let Err(e) = monitor_future.await {
484            error!(error = %e, "monitor error");
485        }
486    });
487    Ok(())
488}
489
490fn monitor_handle_event(e: Worker<Event>) {
491    let worker_id = e.id();
492    match e.inner() {
493        Event::Engage(task_id) => {
494            debug!(worker_id = %worker_id, task_id = %task_id, "worker got a job");
495        }
496        Event::Error(e) => {
497            error!(worker_id = %worker_id, error = %e, "worker encountered an error");
498        }
499        Event::Exit => {
500            debug!(worker_id = %worker_id, "worker exited");
501        }
502        Event::Idle => {
503            debug!(worker_id = %worker_id, "worker is idle");
504        }
505        Event::Start => {
506            debug!(worker_id = %worker_id, "worker started");
507        }
508        Event::Stop => {
509            debug!(worker_id = %worker_id, "worker stopped");
510        }
511        _ => {}
512    }
513}
514
515#[cfg(test)]
516mod tests {
517    use super::*;
518    use crate::models::{
519        NetworkType, RelayerEvmPolicy, RelayerNetworkPolicy, RelayerRepoModel, RelayerSolanaPolicy,
520        RelayerSolanaSwapConfig, RelayerStellarPolicy, RelayerStellarSwapConfig,
521        StellarFeePaymentStrategy, StellarSwapStrategy,
522    };
523
524    fn create_test_evm_relayer(id: &str) -> RelayerRepoModel {
525        RelayerRepoModel {
526            id: id.to_string(),
527            name: format!("EVM Relayer {}", id),
528            network: "sepolia".to_string(),
529            paused: false,
530            network_type: NetworkType::Evm,
531            policies: RelayerNetworkPolicy::Evm(RelayerEvmPolicy::default()),
532            signer_id: "test-signer".to_string(),
533            address: "0x742d35Cc6634C0532925a3b8D8C2e48a73F6ba2E".to_string(),
534            system_disabled: false,
535            ..Default::default()
536        }
537    }
538
539    fn create_test_solana_relayer_with_swap(
540        id: &str,
541        cron_schedule: Option<String>,
542    ) -> RelayerRepoModel {
543        RelayerRepoModel {
544            id: id.to_string(),
545            name: format!("Solana Relayer {}", id),
546            network: "mainnet-beta".to_string(),
547            paused: false,
548            network_type: NetworkType::Solana,
549            policies: RelayerNetworkPolicy::Solana(RelayerSolanaPolicy {
550                min_balance: Some(1000000000),
551                allowed_tokens: None,
552                allowed_programs: None,
553                max_signatures: None,
554                max_tx_data_size: None,
555                fee_payment_strategy: None,
556                fee_margin_percentage: None,
557                allowed_accounts: None,
558                disallowed_accounts: None,
559                max_allowed_fee_lamports: None,
560                swap_config: Some(RelayerSolanaSwapConfig {
561                    strategy: None,
562                    cron_schedule,
563                    min_balance_threshold: Some(5000000000),
564                    jupiter_swap_options: None,
565                }),
566            }),
567            signer_id: "test-signer".to_string(),
568            address: "5zWma6gn4QxRfC6xZk6KfpXWXXgV3Xt6VzPpXMKCMYW5".to_string(),
569            system_disabled: false,
570            ..Default::default()
571        }
572    }
573
574    fn create_test_stellar_relayer_with_swap(
575        id: &str,
576        cron_schedule: Option<String>,
577    ) -> RelayerRepoModel {
578        RelayerRepoModel {
579            id: id.to_string(),
580            name: format!("Stellar Relayer {}", id),
581            network: "testnet".to_string(),
582            paused: false,
583            network_type: NetworkType::Stellar,
584            policies: RelayerNetworkPolicy::Stellar(RelayerStellarPolicy {
585                min_balance: Some(1000000000),
586                max_fee: None,
587                timeout_seconds: None,
588                concurrent_transactions: None,
589                allowed_tokens: None,
590                fee_payment_strategy: Some(StellarFeePaymentStrategy::User),
591                slippage_percentage: None,
592                fee_margin_percentage: None,
593                swap_config: Some(RelayerStellarSwapConfig {
594                    strategies: vec![StellarSwapStrategy::OrderBook],
595                    cron_schedule,
596                    min_balance_threshold: Some(5000000000),
597                }),
598            }),
599            signer_id: "test-signer".to_string(),
600            address: "GABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890ABCDEFGH".to_string(),
601            system_disabled: false,
602            ..Default::default()
603        }
604    }
605
606    #[test]
607    fn test_filter_relayers_for_swap_with_empty_list() {
608        let relayers = vec![];
609        let filtered = filter_relayers_for_swap(relayers);
610
611        assert_eq!(
612            filtered.len(),
613            0,
614            "Should return empty list when no relayers provided"
615        );
616    }
617
618    #[test]
619    fn test_filter_relayers_for_swap_filters_non_solana_stellar() {
620        let relayers = vec![
621            create_test_evm_relayer("evm-1"),
622            create_test_evm_relayer("evm-2"),
623        ];
624
625        let filtered = filter_relayers_for_swap(relayers);
626
627        assert_eq!(
628            filtered.len(),
629            0,
630            "Should filter out all non-Solana/Stellar relayers"
631        );
632    }
633
634    #[test]
635    fn test_filter_relayers_for_swap_filters_no_cron_schedule() {
636        let relayers = vec![
637            create_test_solana_relayer_with_swap("solana-1", None),
638            create_test_solana_relayer_with_swap("solana-2", None),
639            create_test_stellar_relayer_with_swap("stellar-1", None),
640            create_test_stellar_relayer_with_swap("stellar-2", None),
641        ];
642
643        let filtered = filter_relayers_for_swap(relayers);
644
645        assert_eq!(
646            filtered.len(),
647            0,
648            "Should filter out Solana and Stellar relayers without cron schedule"
649        );
650    }
651
652    #[test]
653    fn test_filter_relayers_for_swap_includes_valid_relayers() {
654        let relayers = vec![
655            create_test_solana_relayer_with_swap("solana-1", Some("0 0 * * * *".to_string())),
656            create_test_solana_relayer_with_swap("solana-2", Some("0 */2 * * * *".to_string())),
657            create_test_stellar_relayer_with_swap("stellar-1", Some("0 0 * * * *".to_string())),
658            create_test_stellar_relayer_with_swap("stellar-2", Some("0 */2 * * * *".to_string())),
659        ];
660
661        let filtered = filter_relayers_for_swap(relayers);
662
663        assert_eq!(
664            filtered.len(),
665            4,
666            "Should include all Solana and Stellar relayers with cron schedule"
667        );
668        let ids: Vec<&str> = filtered.iter().map(|r| r.id.as_str()).collect();
669        assert!(ids.contains(&"solana-1"), "Should include solana-1");
670        assert!(ids.contains(&"solana-2"), "Should include solana-2");
671        assert!(ids.contains(&"stellar-1"), "Should include stellar-1");
672        assert!(ids.contains(&"stellar-2"), "Should include stellar-2");
673    }
674
675    #[test]
676    fn test_filter_relayers_for_swap_with_mixed_relayers() {
677        let relayers = vec![
678            create_test_evm_relayer("evm-1"),
679            create_test_solana_relayer_with_swap("solana-no-cron", None),
680            create_test_solana_relayer_with_swap(
681                "solana-with-cron-1",
682                Some("0 0 * * * *".to_string()),
683            ),
684            create_test_evm_relayer("evm-2"),
685            create_test_solana_relayer_with_swap(
686                "solana-with-cron-2",
687                Some("0 */3 * * * *".to_string()),
688            ),
689            create_test_stellar_relayer_with_swap("stellar-no-cron", None),
690            create_test_stellar_relayer_with_swap(
691                "stellar-with-cron-1",
692                Some("0 0 * * * *".to_string()),
693            ),
694            create_test_stellar_relayer_with_swap(
695                "stellar-with-cron-2",
696                Some("0 */3 * * * *".to_string()),
697            ),
698        ];
699
700        let filtered = filter_relayers_for_swap(relayers);
701
702        assert_eq!(
703            filtered.len(),
704            4,
705            "Should only include Solana and Stellar relayers with cron schedule"
706        );
707
708        // Verify the correct relayers were included
709        let ids: Vec<&str> = filtered.iter().map(|r| r.id.as_str()).collect();
710        assert!(
711            ids.contains(&"solana-with-cron-1"),
712            "Should include solana-with-cron-1"
713        );
714        assert!(
715            ids.contains(&"solana-with-cron-2"),
716            "Should include solana-with-cron-2"
717        );
718        assert!(
719            ids.contains(&"stellar-with-cron-1"),
720            "Should include stellar-with-cron-1"
721        );
722        assert!(
723            ids.contains(&"stellar-with-cron-2"),
724            "Should include stellar-with-cron-2"
725        );
726        assert!(!ids.contains(&"evm-1"), "Should not include EVM relayers");
727        assert!(
728            !ids.contains(&"solana-no-cron"),
729            "Should not include Solana without cron"
730        );
731        assert!(
732            !ids.contains(&"stellar-no-cron"),
733            "Should not include Stellar without cron"
734        );
735    }
736
737    #[test]
738    fn test_filter_relayers_for_swap_preserves_solana_relayer_data() {
739        let cron = "0 1 * * * *".to_string();
740        let relayers = vec![create_test_solana_relayer_with_swap(
741            "test-relayer",
742            Some(cron.clone()),
743        )];
744
745        let filtered = filter_relayers_for_swap(relayers);
746
747        assert_eq!(filtered.len(), 1);
748
749        let relayer = &filtered[0];
750        assert_eq!(relayer.id, "test-relayer");
751        assert_eq!(relayer.name, "Solana Relayer test-relayer");
752        assert_eq!(relayer.network_type, NetworkType::Solana);
753
754        // Verify swap config is preserved
755        let policy = relayer.policies.get_solana_policy();
756        let swap_config = policy.get_swap_config().expect("Should have swap config");
757        assert_eq!(swap_config.cron_schedule.as_ref(), Some(&cron));
758    }
759
760    #[test]
761    fn test_filter_relayers_for_swap_preserves_stellar_relayer_data() {
762        let cron = "0 1 * * * *".to_string();
763        let relayers = vec![create_test_stellar_relayer_with_swap(
764            "test-relayer",
765            Some(cron.clone()),
766        )];
767
768        let filtered = filter_relayers_for_swap(relayers);
769
770        assert_eq!(filtered.len(), 1);
771
772        let relayer = &filtered[0];
773        assert_eq!(relayer.id, "test-relayer");
774        assert_eq!(relayer.name, "Stellar Relayer test-relayer");
775        assert_eq!(relayer.network_type, NetworkType::Stellar);
776
777        // Verify swap config is preserved
778        let policy = relayer.policies.get_stellar_policy();
779        let swap_config = policy.get_swap_config().expect("Should have swap config");
780        assert_eq!(swap_config.cron_schedule.as_ref(), Some(&cron));
781    }
782
783    fn create_test_solana_relayer_without_swap_config(id: &str) -> RelayerRepoModel {
784        RelayerRepoModel {
785            id: id.to_string(),
786            name: format!("Solana Relayer {}", id),
787            network: "mainnet-beta".to_string(),
788            paused: false,
789            network_type: NetworkType::Solana,
790            policies: RelayerNetworkPolicy::Solana(RelayerSolanaPolicy {
791                min_balance: Some(1000000000),
792                allowed_tokens: None,
793                allowed_programs: None,
794                max_signatures: None,
795                max_tx_data_size: None,
796                fee_payment_strategy: None,
797                fee_margin_percentage: None,
798                allowed_accounts: None,
799                disallowed_accounts: None,
800                max_allowed_fee_lamports: None,
801                swap_config: None, // No swap config
802            }),
803            signer_id: "test-signer".to_string(),
804            address: "5zWma6gn4QxRfC6xZk6KfpXWXXgV3Xt6VzPpXMKCMYW5".to_string(),
805            system_disabled: false,
806            ..Default::default()
807        }
808    }
809
810    fn create_test_stellar_relayer_without_swap_config(id: &str) -> RelayerRepoModel {
811        RelayerRepoModel {
812            id: id.to_string(),
813            name: format!("Stellar Relayer {}", id),
814            network: "testnet".to_string(),
815            paused: false,
816            network_type: NetworkType::Stellar,
817            policies: RelayerNetworkPolicy::Stellar(RelayerStellarPolicy {
818                min_balance: Some(1000000000),
819                max_fee: None,
820                timeout_seconds: None,
821                concurrent_transactions: None,
822                allowed_tokens: None,
823                fee_payment_strategy: Some(StellarFeePaymentStrategy::User),
824                slippage_percentage: None,
825                fee_margin_percentage: None,
826                swap_config: None, // No swap config
827            }),
828            signer_id: "test-signer".to_string(),
829            address: "GABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890ABCDEFGH".to_string(),
830            system_disabled: false,
831            ..Default::default()
832        }
833    }
834
835    #[test]
836    fn test_filter_relayers_for_swap_filters_solana_without_swap_config() {
837        let relayers = vec![
838            create_test_solana_relayer_without_swap_config("solana-1"),
839            create_test_solana_relayer_without_swap_config("solana-2"),
840        ];
841
842        let filtered = filter_relayers_for_swap(relayers);
843
844        assert_eq!(
845            filtered.len(),
846            0,
847            "Should filter out Solana relayers without swap config"
848        );
849    }
850
851    #[test]
852    fn test_filter_relayers_for_swap_filters_stellar_without_swap_config() {
853        let relayers = vec![
854            create_test_stellar_relayer_without_swap_config("stellar-1"),
855            create_test_stellar_relayer_without_swap_config("stellar-2"),
856        ];
857
858        let filtered = filter_relayers_for_swap(relayers);
859
860        assert_eq!(
861            filtered.len(),
862            0,
863            "Should filter out Stellar relayers without swap config"
864        );
865    }
866
867    #[test]
868    fn test_filter_relayers_for_swap_with_mixed_swap_configs() {
869        let relayers = vec![
870            create_test_solana_relayer_without_swap_config("solana-no-config"),
871            create_test_solana_relayer_with_swap("solana-no-cron", None),
872            create_test_solana_relayer_with_swap(
873                "solana-with-cron",
874                Some("0 0 * * * *".to_string()),
875            ),
876            create_test_stellar_relayer_without_swap_config("stellar-no-config"),
877            create_test_stellar_relayer_with_swap("stellar-no-cron", None),
878            create_test_stellar_relayer_with_swap(
879                "stellar-with-cron",
880                Some("0 0 * * * *".to_string()),
881            ),
882        ];
883
884        let filtered = filter_relayers_for_swap(relayers);
885
886        assert_eq!(
887            filtered.len(),
888            2,
889            "Should only include relayers with swap config and cron schedule"
890        );
891
892        let ids: Vec<&str> = filtered.iter().map(|r| r.id.as_str()).collect();
893        assert!(
894            ids.contains(&"solana-with-cron"),
895            "Should include solana-with-cron"
896        );
897        assert!(
898            ids.contains(&"stellar-with-cron"),
899            "Should include stellar-with-cron"
900        );
901        assert!(
902            !ids.contains(&"solana-no-config"),
903            "Should not include solana without config"
904        );
905        assert!(
906            !ids.contains(&"solana-no-cron"),
907            "Should not include solana without cron"
908        );
909        assert!(
910            !ids.contains(&"stellar-no-config"),
911            "Should not include stellar without config"
912        );
913        assert!(
914            !ids.contains(&"stellar-no-cron"),
915            "Should not include stellar without cron"
916        );
917    }
918
919    #[test]
920    fn test_create_backoff_with_valid_parameters() {
921        let result = create_backoff(200, 5000, 0.99);
922        assert!(
923            result.is_ok(),
924            "Should create backoff with valid parameters"
925        );
926    }
927
928    #[test]
929    fn test_create_backoff_with_zero_initial() {
930        let result = create_backoff(0, 5000, 0.99);
931        assert!(
932            result.is_ok(),
933            "Should handle zero initial delay (edge case)"
934        );
935    }
936
937    #[test]
938    fn test_create_backoff_with_equal_initial_and_max() {
939        let result = create_backoff(1000, 1000, 0.5);
940        assert!(result.is_ok(), "Should handle equal initial and max delays");
941    }
942
943    #[test]
944    fn test_create_backoff_with_zero_jitter() {
945        let result = create_backoff(500, 5000, 0.0);
946        assert!(result.is_ok(), "Should handle zero jitter");
947    }
948
949    #[test]
950    fn test_create_backoff_with_max_jitter() {
951        let result = create_backoff(500, 5000, 1.0);
952        assert!(result.is_ok(), "Should handle maximum jitter (1.0)");
953    }
954
955    #[test]
956    fn test_create_backoff_with_small_values() {
957        let result = create_backoff(1, 10, 0.5);
958        assert!(result.is_ok(), "Should handle very small delay values");
959    }
960
961    #[test]
962    fn test_create_backoff_with_large_values() {
963        let result = create_backoff(10000, 60000, 0.99);
964        assert!(result.is_ok(), "Should handle large delay values");
965    }
966
967    #[test]
968    fn test_filter_relayers_preserves_order() {
969        let relayers = vec![
970            create_test_solana_relayer_with_swap("solana-1", Some("0 0 * * * *".to_string())),
971            create_test_stellar_relayer_with_swap("stellar-1", Some("0 0 * * * *".to_string())),
972            create_test_solana_relayer_with_swap("solana-2", Some("0 0 * * * *".to_string())),
973            create_test_stellar_relayer_with_swap("stellar-2", Some("0 0 * * * *".to_string())),
974        ];
975
976        let filtered = filter_relayers_for_swap(relayers);
977
978        assert_eq!(filtered.len(), 4);
979        assert_eq!(filtered[0].id, "solana-1");
980        assert_eq!(filtered[1].id, "stellar-1");
981        assert_eq!(filtered[2].id, "solana-2");
982        assert_eq!(filtered[3].id, "stellar-2");
983    }
984
985    #[test]
986    fn test_filter_relayers_with_different_cron_formats() {
987        let relayers = vec![
988            create_test_solana_relayer_with_swap("solana-1", Some("0 0 * * * *".to_string())), // Every hour
989            create_test_solana_relayer_with_swap("solana-2", Some("*/5 * * * * *".to_string())), // Every 5 seconds
990            create_test_stellar_relayer_with_swap("stellar-1", Some("0 0 12 * * *".to_string())), // Daily at noon
991            create_test_stellar_relayer_with_swap("stellar-2", Some("0 */15 * * * *".to_string())), // Every 15 minutes
992        ];
993
994        let filtered = filter_relayers_for_swap(relayers);
995
996        assert_eq!(
997            filtered.len(),
998            4,
999            "Should accept various valid cron schedule formats"
1000        );
1001    }
1002
1003    #[test]
1004    fn test_filter_relayers_with_all_network_types() {
1005        let relayers = vec![
1006            create_test_evm_relayer("evm-1"),
1007            create_test_solana_relayer_with_swap("solana-1", Some("0 0 * * * *".to_string())),
1008            create_test_stellar_relayer_with_swap("stellar-1", Some("0 0 * * * *".to_string())),
1009        ];
1010
1011        let filtered = filter_relayers_for_swap(relayers);
1012
1013        assert_eq!(filtered.len(), 2, "Should only include Solana and Stellar");
1014
1015        let network_types: Vec<NetworkType> =
1016            filtered.iter().map(|r| r.network_type.clone()).collect();
1017        assert!(
1018            network_types.contains(&NetworkType::Solana),
1019            "Should include Solana"
1020        );
1021        assert!(
1022            network_types.contains(&NetworkType::Stellar),
1023            "Should include Stellar"
1024        );
1025        assert!(
1026            !network_types.contains(&NetworkType::Evm),
1027            "Should not include EVM"
1028        );
1029    }
1030}