1use crate::constants::get_stellar_sponsored_transaction_validity_duration;
2use crate::domain::relayer::evm::create_error_response;
3use crate::services::stellar_dex::StellarDexService;
4use crate::utils::{map_provider_error, sanitize_error_description};
5use crate::{
28 constants::{
29 transactions::PENDING_TRANSACTION_STATUSES, STELLAR_SMALLEST_UNIT_NAME,
30 STELLAR_STATUS_CHECK_INITIAL_DELAY_SECONDS,
31 },
32 domain::{
33 create_success_response, transaction::stellar::fetch_next_sequence_from_chain,
34 BalanceResponse, SignDataRequest, SignDataResponse, SignTransactionExternalResponse,
35 SignTransactionExternalResponseStellar, SignTransactionRequest, SignTypedDataRequest,
36 },
37 jobs::{JobProducerTrait, RelayerHealthCheck, TransactionRequest, TransactionStatusCheck},
38 models::{
39 produce_relayer_disabled_payload, DeletePendingTransactionsResponse, DisabledReason,
40 GetStatusOptions, HealthCheckFailure, JsonRpcRequest, JsonRpcResponse, NetworkRepoModel,
41 NetworkRpcRequest, NetworkRpcResult, NetworkTransactionRequest, NetworkType,
42 PaginationQuery, RelayerNetworkPolicy, RelayerRepoModel, RelayerStatus,
43 RelayerStellarPolicy, RepositoryError, RpcErrorCodes, StellarAllowedTokensPolicy,
44 StellarFeePaymentStrategy, StellarNetwork, StellarRpcRequest, TransactionRepoModel,
45 TransactionStatus, TransactionUpdateRequest,
46 },
47 repositories::{NetworkRepository, RelayerRepository, Repository, TransactionRepository},
48 services::{
49 provider::{StellarProvider, StellarProviderTrait},
50 signer::{StellarSignTrait, StellarSigner},
51 stellar_dex::StellarDexServiceTrait,
52 TransactionCounterService, TransactionCounterServiceTrait,
53 },
54 utils::calculate_scheduled_timestamp,
55};
56use async_trait::async_trait;
57use eyre::Result;
58use futures::future::try_join_all;
59use std::sync::Arc;
60use tracing::{debug, error, info, instrument, warn};
61
62use crate::domain::relayer::stellar::xdr_utils::parse_transaction_xdr;
63use crate::domain::relayer::{Relayer, RelayerError, StellarRelayerDexTrait};
64use crate::domain::transaction::stellar::token::get_token_metadata;
65use crate::domain::transaction::stellar::StellarTransactionValidator;
66
67pub struct StellarRelayerDependencies<RR, NR, TR, J, TCS>
69where
70 RR: Repository<RelayerRepoModel, String> + RelayerRepository + Send + Sync + 'static,
71 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
72 TR: Repository<TransactionRepoModel, String> + TransactionRepository + Send + Sync + 'static,
73 J: JobProducerTrait + Send + Sync + 'static,
74 TCS: TransactionCounterServiceTrait + Send + Sync + 'static,
75{
76 pub relayer_repository: Arc<RR>,
77 pub network_repository: Arc<NR>,
78 pub transaction_repository: Arc<TR>,
79 pub transaction_counter_service: Arc<TCS>,
80 pub job_producer: Arc<J>,
81}
82
83impl<RR, NR, TR, J, TCS> StellarRelayerDependencies<RR, NR, TR, J, TCS>
84where
85 RR: Repository<RelayerRepoModel, String> + RelayerRepository + Send + Sync + 'static,
86 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
87 TR: Repository<TransactionRepoModel, String> + TransactionRepository + Send + Sync + 'static,
88 J: JobProducerTrait + Send + Sync,
89 TCS: TransactionCounterServiceTrait + Send + Sync + 'static,
90{
91 pub fn new(
105 relayer_repository: Arc<RR>,
106 network_repository: Arc<NR>,
107 transaction_repository: Arc<TR>,
108 transaction_counter_service: Arc<TCS>,
109 job_producer: Arc<J>,
110 ) -> Self {
111 Self {
112 relayer_repository,
113 network_repository,
114 transaction_repository,
115 transaction_counter_service,
116 job_producer,
117 }
118 }
119}
120
121#[allow(dead_code)]
122pub struct StellarRelayer<P, RR, NR, TR, J, TCS, S, D>
123where
124 P: StellarProviderTrait + Send + Sync + 'static,
125 RR: Repository<RelayerRepoModel, String> + RelayerRepository + Send + Sync + 'static,
126 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
127 TR: Repository<TransactionRepoModel, String> + TransactionRepository + Send + Sync + 'static,
128 J: JobProducerTrait + Send + Sync + 'static,
129 TCS: TransactionCounterServiceTrait + Send + Sync + 'static,
130 S: StellarSignTrait + Send + Sync + 'static,
131 D: StellarDexServiceTrait + Send + Sync + 'static,
132{
133 pub(crate) relayer: RelayerRepoModel,
134 pub(crate) signer: Arc<S>,
135 pub(crate) network: StellarNetwork,
136 pub(crate) provider: P,
137 pub(crate) relayer_repository: Arc<RR>,
138 network_repository: Arc<NR>,
139 transaction_repository: Arc<TR>,
140 transaction_counter_service: Arc<TCS>,
141 pub(crate) job_producer: Arc<J>,
142 pub(crate) dex_service: Arc<D>,
143}
144
145pub type DefaultStellarRelayer<J, TR, NR, RR, TCR> = StellarRelayer<
146 StellarProvider,
147 RR,
148 NR,
149 TR,
150 J,
151 TransactionCounterService<TCR>,
152 StellarSigner,
153 StellarDexService<StellarProvider, StellarSigner>,
154>;
155
156impl<P, RR, NR, TR, J, TCS, S, D> StellarRelayer<P, RR, NR, TR, J, TCS, S, D>
157where
158 P: StellarProviderTrait + Send + Sync,
159 RR: Repository<RelayerRepoModel, String> + RelayerRepository + Send + Sync + 'static,
160 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
161 TR: Repository<TransactionRepoModel, String> + TransactionRepository + Send + Sync + 'static,
162 J: JobProducerTrait + Send + Sync + 'static,
163 TCS: TransactionCounterServiceTrait + Send + Sync + 'static,
164 S: StellarSignTrait + Send + Sync + 'static,
165 D: StellarDexServiceTrait + Send + Sync + 'static,
166{
167 #[allow(clippy::too_many_arguments)]
186 pub async fn new(
187 relayer: RelayerRepoModel,
188 signer: Arc<S>,
189 provider: P,
190 dependencies: StellarRelayerDependencies<RR, NR, TR, J, TCS>,
191 dex_service: Arc<D>,
192 ) -> Result<Self, RelayerError> {
193 let network_repo = dependencies
194 .network_repository
195 .get_by_name(NetworkType::Stellar, &relayer.network)
196 .await
197 .ok()
198 .flatten()
199 .ok_or_else(|| {
200 RelayerError::NetworkConfiguration(format!("Network {} not found", relayer.network))
201 })?;
202
203 let network = StellarNetwork::try_from(network_repo.clone())?;
204
205 Ok(Self {
206 relayer,
207 signer,
208 network,
209 provider,
210 relayer_repository: dependencies.relayer_repository,
211 network_repository: dependencies.network_repository,
212 transaction_repository: dependencies.transaction_repository,
213 transaction_counter_service: dependencies.transaction_counter_service,
214 job_producer: dependencies.job_producer,
215 dex_service,
216 })
217 }
218
219 #[instrument(
220 level = "debug",
221 skip(self),
222 fields(
223 request_id = ?crate::observability::request_id::get_request_id(),
224 relayer_id = %self.relayer.id,
225 )
226 )]
227 async fn sync_sequence(&self) -> Result<(), RelayerError> {
228 info!(
229 address = %self.relayer.address,
230 "syncing sequence from chain"
231 );
232
233 let next = fetch_next_sequence_from_chain(&self.provider, &self.relayer.address)
234 .await
235 .map_err(RelayerError::ProviderError)?;
236
237 info!(
238 next_sequence = %next,
239 "setting next sequence"
240 );
241 self.transaction_counter_service
242 .set(next)
243 .await
244 .map_err(RelayerError::from)?;
245 Ok(())
246 }
247
248 #[instrument(
258 level = "debug",
259 skip(self),
260 fields(
261 request_id = ?crate::observability::request_id::get_request_id(),
262 relayer_id = %self.relayer.id,
263 )
264 )]
265 async fn populate_allowed_tokens_metadata(&self) -> Result<RelayerStellarPolicy, RelayerError> {
266 let mut policy = self.relayer.policies.get_stellar_policy();
267 let allowed_tokens = match policy.allowed_tokens.as_ref() {
269 Some(tokens) if !tokens.is_empty() => tokens,
270 _ => {
271 info!("No allowed tokens specified; skipping token metadata population.");
272 return Ok(policy);
273 }
274 };
275
276 let token_metadata_futures = allowed_tokens.iter().map(|token| {
277 let asset_id = token.asset.clone();
278 let provider = &self.provider;
279 async move {
280 let metadata = get_token_metadata(provider, &asset_id)
281 .await
282 .map_err(RelayerError::from)?;
283
284 Ok::<StellarAllowedTokensPolicy, RelayerError>(StellarAllowedTokensPolicy {
285 asset: asset_id,
286 metadata: Some(metadata),
287 max_allowed_fee: token.max_allowed_fee,
288 swap_config: token.swap_config.clone(),
289 })
290 }
291 });
292
293 let updated_allowed_tokens = try_join_all(token_metadata_futures).await?;
294
295 policy.allowed_tokens = Some(updated_allowed_tokens.clone());
296
297 self.relayer_repository
298 .update_policy(
299 self.relayer.id.clone(),
300 RelayerNetworkPolicy::Stellar(policy.clone()),
301 )
302 .await?;
303
304 Ok(policy)
305 }
306
307 #[instrument(
316 level = "debug",
317 skip(self),
318 fields(
319 request_id = ?crate::observability::request_id::get_request_id(),
320 relayer_id = %self.relayer.id,
321 )
322 )]
323 async fn migrate_fee_payment_strategy_if_needed(&self) -> Result<(), RelayerError> {
324 if !self.relayer_repository.is_persistent_storage() {
327 debug!(
328 relayer_id = %self.relayer.id,
329 "Skipping migration: using in-memory storage"
330 );
331 return Ok(());
332 }
333
334 let policy = self.relayer.policies.get_stellar_policy();
335
336 if policy.fee_payment_strategy.is_some() {
338 return Ok(());
339 }
340
341 info!(
343 relayer_id = %self.relayer.id,
344 "Migrating Stellar relayer: setting fee_payment_strategy to 'Relayer' (old default behavior)"
345 );
346
347 let mut updated_policy = policy;
349 updated_policy.fee_payment_strategy = Some(StellarFeePaymentStrategy::Relayer);
350
351 self.relayer_repository
353 .update_policy(
354 self.relayer.id.clone(),
355 RelayerNetworkPolicy::Stellar(updated_policy),
356 )
357 .await
358 .map_err(|e| {
359 RelayerError::PolicyConfigurationError(format!(
360 "Failed to migrate fee_payment_strategy policy: {e}"
361 ))
362 })?;
363
364 debug!(
365 relayer_id = %self.relayer.id,
366 "Successfully migrated fee_payment_strategy policy"
367 );
368
369 Ok(())
370 }
371
372 #[instrument(
376 level = "debug",
377 skip(self),
378 fields(
379 request_id = ?crate::observability::request_id::get_request_id(),
380 relayer_id = %self.relayer.id,
381 )
382 )]
383 async fn check_balance_and_trigger_token_swap_if_needed(&self) -> Result<(), RelayerError> {
384 let policy = self.relayer.policies.get_stellar_policy();
385
386 let swap_config = match policy.get_swap_config() {
388 Some(config) => config,
389 None => {
390 debug!(
391 relayer_id = %self.relayer.id,
392 "No swap configuration specified; skipping balance check"
393 );
394 return Ok(());
395 }
396 };
397
398 let threshold = match swap_config.min_balance_threshold {
400 Some(threshold) => threshold,
401 None => {
402 debug!(
403 relayer_id = %self.relayer.id,
404 "No swap min balance threshold specified; skipping validation"
405 );
406 return Ok(());
407 }
408 };
409
410 let balance_response = self.get_balance().await?;
412 let current_balance = u64::try_from(balance_response.balance).map_err(|_| {
413 RelayerError::Internal("Account balance exceeds u64 maximum value".to_string())
414 })?;
415
416 if current_balance < threshold {
418 debug!(
419 relayer_id = %self.relayer.id,
420 balance = current_balance,
421 threshold = threshold,
422 "XLM balance is below threshold, triggering token swap"
423 );
424
425 let _swap_results = self
426 .handle_token_swap_request(self.relayer.id.clone())
427 .await?;
428 } else {
429 debug!(
430 relayer_id = %self.relayer.id,
431 balance = current_balance,
432 threshold = threshold,
433 "XLM balance is above threshold, no swap needed"
434 );
435 }
436
437 Ok(())
438 }
439}
440
441#[async_trait]
442impl<P, RR, NR, TR, J, TCS, S, D> Relayer for StellarRelayer<P, RR, NR, TR, J, TCS, S, D>
443where
444 P: StellarProviderTrait + Send + Sync + 'static,
445 D: StellarDexServiceTrait + Send + Sync + 'static,
446 RR: Repository<RelayerRepoModel, String> + RelayerRepository + Send + Sync + 'static,
447 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
448 TR: Repository<TransactionRepoModel, String> + TransactionRepository + Send + Sync + 'static,
449 J: JobProducerTrait + Send + Sync + 'static,
450 TCS: TransactionCounterServiceTrait + Send + Sync + 'static,
451 S: StellarSignTrait + Send + Sync + 'static,
452{
453 #[instrument(
454 level = "debug",
455 skip(self, network_transaction),
456 fields(
457 request_id = ?crate::observability::request_id::get_request_id(),
458 relayer_id = %self.relayer.id,
459 network_type = ?self.relayer.network_type,
460 )
461 )]
462 async fn process_transaction_request(
463 &self,
464 network_transaction: NetworkTransactionRequest,
465 ) -> Result<TransactionRepoModel, RelayerError> {
466 let network_model = self
467 .network_repository
468 .get_by_name(NetworkType::Stellar, &self.relayer.network)
469 .await?
470 .ok_or_else(|| {
471 RelayerError::NetworkConfiguration(format!(
472 "Network {} not found",
473 self.relayer.network
474 ))
475 })?;
476 let transaction =
477 TransactionRepoModel::try_from((&network_transaction, &self.relayer, &network_model))?;
478
479 self.transaction_repository
480 .create(transaction.clone())
481 .await
482 .map_err(|e| RepositoryError::TransactionFailure(e.to_string()))?;
483
484 if let Err(e) = self
488 .job_producer
489 .produce_check_transaction_status_job(
490 TransactionStatusCheck::new(
491 transaction.id.clone(),
492 transaction.relayer_id.clone(),
493 NetworkType::Stellar,
494 ),
495 Some(calculate_scheduled_timestamp(
496 STELLAR_STATUS_CHECK_INITIAL_DELAY_SECONDS,
497 )),
498 )
499 .await
500 {
501 error!(
503 relayer_id = %self.relayer.id,
504 transaction_id = %transaction.id,
505 error = %e,
506 "Status check queue push failed - marking transaction as failed"
507 );
508 if let Err(update_err) = self
509 .transaction_repository
510 .partial_update(
511 transaction.id.clone(),
512 TransactionUpdateRequest {
513 status: Some(TransactionStatus::Failed),
514 status_reason: Some("Queue unavailable".to_string()),
515 ..Default::default()
516 },
517 )
518 .await
519 {
520 warn!(
521 relayer_id = %self.relayer.id,
522 transaction_id = %transaction.id,
523 error = %update_err,
524 "Failed to mark transaction as failed after queue push failure"
525 );
526 }
527 return Err(e.into());
528 }
529
530 self.job_producer
533 .produce_transaction_request_job(
534 TransactionRequest::new(transaction.id.clone(), transaction.relayer_id.clone()),
535 None,
536 )
537 .await?;
538
539 Ok(transaction)
540 }
541
542 #[instrument(
543 level = "debug",
544 skip(self),
545 fields(
546 request_id = ?crate::observability::request_id::get_request_id(),
547 relayer_id = %self.relayer.id,
548 )
549 )]
550 async fn get_balance(&self) -> Result<BalanceResponse, RelayerError> {
551 let account_entry = self
552 .provider
553 .get_account(&self.relayer.address)
554 .await
555 .map_err(|e| {
556 warn!(
557 relayer_id = %self.relayer.id,
558 address = %self.relayer.address,
559 error = %e,
560 "get_account failed in get_balance (called before transaction creation)"
561 );
562 crate::metrics::API_RPC_FAILURES
564 .with_label_values(&[
565 self.relayer.id.as_str(),
566 "stellar",
567 "get_balance",
568 "get_account_failed",
569 ])
570 .inc();
571 RelayerError::ProviderError(format!("Failed to fetch account for balance: {e}"))
572 })?;
573
574 Ok(BalanceResponse {
575 balance: account_entry.balance as u128,
576 unit: STELLAR_SMALLEST_UNIT_NAME.to_string(),
577 })
578 }
579
580 #[instrument(
581 level = "debug",
582 skip(self),
583 fields(
584 request_id = ?crate::observability::request_id::get_request_id(),
585 relayer_id = %self.relayer.id,
586 )
587 )]
588 async fn get_status(&self, options: GetStatusOptions) -> Result<RelayerStatus, RelayerError> {
589 let relayer_model = &self.relayer;
590
591 let sequence_number = self
593 .transaction_counter_service
594 .get()
595 .await
596 .ok()
597 .flatten()
598 .unwrap_or(0);
599 let sequence_number_str = sequence_number.to_string();
600
601 let balance = if options.include_balance {
602 Some(self.get_balance().await?.balance.to_string())
603 } else {
604 None
605 };
606
607 let pending_transactions_count = if options.include_pending_count {
608 Some(
610 self.transaction_repository
611 .count_by_status(&relayer_model.id, PENDING_TRANSACTION_STATUSES)
612 .await
613 .map_err(RelayerError::from)?,
614 )
615 } else {
616 None
617 };
618
619 let last_confirmed_transaction_timestamp = if options.include_last_confirmed_tx {
620 self.transaction_repository
622 .find_by_status_paginated(
623 &relayer_model.id,
624 &[TransactionStatus::Confirmed],
625 PaginationQuery {
626 page: 1,
627 per_page: 1,
628 },
629 false, )
631 .await
632 .map_err(RelayerError::from)?
633 .items
634 .into_iter()
635 .next()
636 .and_then(|tx| tx.confirmed_at)
637 } else {
638 None
639 };
640
641 Ok(RelayerStatus::Stellar {
642 balance,
643 pending_transactions_count,
644 last_confirmed_transaction_timestamp,
645 system_disabled: relayer_model.system_disabled,
646 paused: relayer_model.paused,
647 sequence_number: sequence_number_str,
648 })
649 }
650
651 #[instrument(
652 level = "debug",
653 skip(self),
654 fields(
655 request_id = ?crate::observability::request_id::get_request_id(),
656 relayer_id = %self.relayer.id,
657 )
658 )]
659 async fn delete_pending_transactions(
660 &self,
661 ) -> Result<DeletePendingTransactionsResponse, RelayerError> {
662 println!("Stellar delete_pending_transactions...");
663 Ok(DeletePendingTransactionsResponse {
664 queued_for_cancellation_transaction_ids: vec![],
665 failed_to_queue_transaction_ids: vec![],
666 total_processed: 0,
667 })
668 }
669
670 #[instrument(
671 level = "debug",
672 skip(self, _request),
673 fields(
674 request_id = ?crate::observability::request_id::get_request_id(),
675 relayer_id = %self.relayer.id,
676 )
677 )]
678 async fn sign_data(&self, _request: SignDataRequest) -> Result<SignDataResponse, RelayerError> {
679 Err(RelayerError::NotSupported(
680 "Signing data not supported for Stellar".to_string(),
681 ))
682 }
683
684 #[instrument(
685 level = "debug",
686 skip(self, _request),
687 fields(
688 request_id = ?crate::observability::request_id::get_request_id(),
689 relayer_id = %self.relayer.id,
690 )
691 )]
692 async fn sign_typed_data(
693 &self,
694 _request: SignTypedDataRequest,
695 ) -> Result<SignDataResponse, RelayerError> {
696 Err(RelayerError::NotSupported(
697 "Signing typed data not supported for Stellar".to_string(),
698 ))
699 }
700
701 #[instrument(
702 level = "debug",
703 skip(self, request),
704 fields(
705 request_id = ?crate::observability::request_id::get_request_id(),
706 relayer_id = %self.relayer.id,
707 )
708 )]
709 async fn rpc(
710 &self,
711 request: JsonRpcRequest<NetworkRpcRequest>,
712 ) -> Result<JsonRpcResponse<NetworkRpcResult>, RelayerError> {
713 let JsonRpcRequest { id, params, .. } = request;
714 let stellar_request = match params {
715 NetworkRpcRequest::Stellar(stellar_req) => stellar_req,
716 _ => {
717 return Ok(create_error_response(
718 id.clone(),
719 RpcErrorCodes::INVALID_PARAMS,
720 "Invalid params",
721 "Expected Stellar network request",
722 ))
723 }
724 };
725
726 let (method, params_json) = match stellar_request {
728 StellarRpcRequest::RawRpcRequest { method, params } => (method, params),
729 };
730
731 match self
732 .provider
733 .raw_request_dyn(&method, params_json, id.clone())
734 .await
735 {
736 Ok(result_value) => Ok(create_success_response(id.clone(), result_value)),
737 Err(provider_error) => {
738 tracing::error!(
740 error = %provider_error,
741 "RPC provider error occurred"
742 );
743 let (error_code, error_message) = map_provider_error(&provider_error);
744 let sanitized_description = sanitize_error_description(&provider_error);
745 Ok(create_error_response(
746 id.clone(),
747 error_code,
748 error_message,
749 &sanitized_description,
750 ))
751 }
752 }
753 }
754
755 #[instrument(
756 level = "debug",
757 skip(self),
758 fields(
759 request_id = ?crate::observability::request_id::get_request_id(),
760 relayer_id = %self.relayer.id,
761 )
762 )]
763 async fn validate_min_balance(&self) -> Result<(), RelayerError> {
764 Ok(())
765 }
766
767 #[instrument(
768 level = "debug",
769 skip(self),
770 fields(
771 request_id = ?crate::observability::request_id::get_request_id(),
772 relayer_id = %self.relayer.id,
773 )
774 )]
775 async fn initialize_relayer(&self) -> Result<(), RelayerError> {
776 debug!("initializing Stellar relayer");
777
778 self.migrate_fee_payment_strategy_if_needed().await?;
782
783 self.populate_allowed_tokens_metadata().await.map_err(|e| {
786 RelayerError::PolicyConfigurationError(format!(
787 "Error while processing allowed tokens policy: {e}"
788 ))
789 })?;
790
791 match self.check_health().await {
792 Ok(_) => {
793 if self.relayer.system_disabled {
795 self.relayer_repository
797 .enable_relayer(self.relayer.id.clone())
798 .await?;
799 }
800 }
801 Err(failures) => {
802 let reason = DisabledReason::from_health_failures(failures).unwrap_or_else(|| {
804 DisabledReason::SequenceSyncFailed("Unknown error".to_string())
805 });
806
807 warn!(reason = %reason, "disabling relayer");
808 let updated_relayer = self
809 .relayer_repository
810 .disable_relayer(self.relayer.id.clone(), reason.clone())
811 .await?;
812
813 if let Some(notification_id) = &self.relayer.notification_id {
815 self.job_producer
816 .produce_send_notification_job(
817 produce_relayer_disabled_payload(
818 notification_id,
819 &updated_relayer,
820 &reason.safe_description(),
821 ),
822 None,
823 )
824 .await?;
825 }
826
827 self.job_producer
829 .produce_relayer_health_check_job(
830 RelayerHealthCheck::new(self.relayer.id.clone()),
831 Some(calculate_scheduled_timestamp(10)),
832 )
833 .await?;
834 }
835 }
836 debug!(
837 "Stellar relayer initialized successfully: {}",
838 self.relayer.id
839 );
840 Ok(())
841 }
842
843 #[instrument(
844 level = "debug",
845 skip(self),
846 fields(
847 request_id = ?crate::observability::request_id::get_request_id(),
848 relayer_id = %self.relayer.id,
849 )
850 )]
851 async fn check_health(&self) -> Result<(), Vec<HealthCheckFailure>> {
852 debug!(
853 "running health checks for Stellar relayer {}",
854 self.relayer.id
855 );
856
857 let mut failures = Vec::new();
858
859 match self.sync_sequence().await {
861 Ok(_) => {
862 debug!(
863 "sequence sync passed for Stellar relayer {}",
864 self.relayer.id
865 );
866 }
867 Err(e) => {
868 let reason = HealthCheckFailure::SequenceSyncFailed(e.to_string());
869 warn!("sequence sync failed: {:?}", reason);
870 failures.push(reason);
871 }
872 }
873
874 let policy = self.relayer.policies.get_stellar_policy();
878 if matches!(
879 policy.fee_payment_strategy,
880 Some(StellarFeePaymentStrategy::User)
881 ) {
882 debug!(
883 "checking balance and attempting token swap for user fee payment strategy relayer {}",
884 self.relayer.id
885 );
886 if let Err(e) = self.check_balance_and_trigger_token_swap_if_needed().await {
887 warn!(
888 relayer_id = %self.relayer.id,
889 error = %e,
890 "Balance check or token swap failed, but not treating as health check failure"
891 );
892 } else {
893 debug!(
894 "balance check and token swap completed for Stellar relayer {}",
895 self.relayer.id
896 );
897 }
898 }
899
900 if failures.is_empty() {
901 debug!(
902 "all health checks passed for Stellar relayer {}",
903 self.relayer.id
904 );
905 Ok(())
906 } else {
907 warn!(
908 "health checks failed for Stellar relayer {}: {:?}",
909 self.relayer.id, failures
910 );
911 Err(failures)
912 }
913 }
914
915 #[instrument(
916 level = "debug",
917 skip(self, request),
918 fields(
919 request_id = ?crate::observability::request_id::get_request_id(),
920 relayer_id = %self.relayer.id,
921 )
922 )]
923 async fn sign_transaction(
924 &self,
925 request: &SignTransactionRequest,
926 ) -> Result<SignTransactionExternalResponse, RelayerError> {
927 let stellar_req = match request {
928 SignTransactionRequest::Stellar(req) => req,
929 _ => {
930 return Err(RelayerError::NotSupported(
931 "Invalid request type for Stellar relayer".to_string(),
932 ))
933 }
934 };
935
936 let policy = self.relayer.policies.get_stellar_policy();
937 let user_pays_fee = matches!(
938 policy.fee_payment_strategy,
939 Some(StellarFeePaymentStrategy::User)
940 );
941
942 if user_pays_fee {
944 let envelope = parse_transaction_xdr(&stellar_req.unsigned_xdr, false)
946 .map_err(|e| RelayerError::ValidationError(format!("Failed to parse XDR: {e}")))?;
947
948 StellarTransactionValidator::validate_user_fee_payment_transaction(
951 &envelope,
952 &self.relayer.address,
953 &policy,
954 &self.provider,
955 self.dex_service.as_ref(),
956 Some(get_stellar_sponsored_transaction_validity_duration()), )
958 .await
959 .map_err(|e| {
960 RelayerError::ValidationError(format!("Failed to validate transaction: {e}"))
961 })?;
962 }
963
964 let response = self
966 .signer
967 .sign_xdr_transaction(&stellar_req.unsigned_xdr, &self.network.passphrase)
968 .await
969 .map_err(RelayerError::SignerError)?;
970
971 let signature_bytes = &response.signature.signature.0;
973 let signature_string =
974 base64::Engine::encode(&base64::engine::general_purpose::STANDARD, signature_bytes);
975
976 Ok(SignTransactionExternalResponse::Stellar(
977 SignTransactionExternalResponseStellar {
978 signed_xdr: response.signed_xdr,
979 signature: signature_string,
980 },
981 ))
982 }
983}
984
985#[cfg(test)]
986mod tests {
987 use super::*;
988 use crate::{
989 config::{NetworkConfigCommon, StellarNetworkConfig},
990 constants::STELLAR_SMALLEST_UNIT_NAME,
991 domain::{SignTransactionRequestStellar, SignXdrTransactionResponseStellar},
992 jobs::MockJobProducerTrait,
993 models::{
994 NetworkConfigData, NetworkRepoModel, NetworkType, RelayerNetworkPolicy,
995 RelayerRepoModel, RelayerStellarPolicy, RpcConfig, SignerError,
996 },
997 repositories::{
998 InMemoryNetworkRepository, MockRelayerRepository, MockTransactionRepository,
999 },
1000 services::{
1001 provider::{MockStellarProviderTrait, ProviderError},
1002 signer::MockStellarSignTrait,
1003 stellar_dex::MockStellarDexServiceTrait,
1004 MockTransactionCounterServiceTrait,
1005 },
1006 };
1007 use mockall::predicate::*;
1008 use soroban_rs::xdr::{
1009 AccountEntry, AccountEntryExt, AccountId, DecoratedSignature, PublicKey, SequenceNumber,
1010 Signature, SignatureHint, String32, Thresholds, Uint256, VecM,
1011 };
1012 use std::future::ready;
1013 use std::sync::Arc;
1014
1015 fn create_mock_dex_service() -> Arc<MockStellarDexServiceTrait> {
1017 let mut mock_dex = MockStellarDexServiceTrait::new();
1018 mock_dex.expect_supported_asset_types().returning(|| {
1019 use crate::services::stellar_dex::AssetType;
1020 std::collections::HashSet::from([AssetType::Native, AssetType::Classic])
1021 });
1022 Arc::new(mock_dex)
1023 }
1024
1025 struct TestCtx {
1027 relayer_model: RelayerRepoModel,
1028 network_repository: Arc<InMemoryNetworkRepository>,
1029 }
1030
1031 impl Default for TestCtx {
1032 fn default() -> Self {
1033 let network_repository = Arc::new(InMemoryNetworkRepository::new());
1034
1035 let relayer_model = RelayerRepoModel {
1036 id: "test-relayer-id".to_string(),
1037 name: "Test Relayer".to_string(),
1038 network: "testnet".to_string(),
1039 paused: false,
1040 network_type: NetworkType::Stellar,
1041 signer_id: "signer-id".to_string(),
1042 policies: RelayerNetworkPolicy::Stellar(RelayerStellarPolicy::default()),
1043 address: "GAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAWHF".to_string(),
1044 notification_id: Some("notification-id".to_string()),
1045 system_disabled: false,
1046 custom_rpc_urls: None,
1047 ..Default::default()
1048 };
1049
1050 TestCtx {
1051 relayer_model,
1052 network_repository,
1053 }
1054 }
1055 }
1056
1057 impl TestCtx {
1058 async fn setup_network(&self) {
1059 let test_network = NetworkRepoModel {
1060 id: "stellar:testnet".to_string(),
1061 name: "testnet".to_string(),
1062 network_type: NetworkType::Stellar,
1063 config: NetworkConfigData::Stellar(StellarNetworkConfig {
1064 common: NetworkConfigCommon {
1065 network: "testnet".to_string(),
1066 from: None,
1067 rpc_urls: Some(vec![RpcConfig::new(
1068 "https://horizon-testnet.stellar.org".to_string(),
1069 )]),
1070 explorer_urls: None,
1071 average_blocktime_ms: Some(5000),
1072 is_testnet: Some(true),
1073 tags: None,
1074 },
1075 passphrase: Some("Test SDF Network ; September 2015".to_string()),
1076 horizon_url: Some("https://horizon-testnet.stellar.org".to_string()),
1077 }),
1078 };
1079
1080 self.network_repository.create(test_network).await.unwrap();
1081 }
1082 }
1083
1084 #[tokio::test]
1085 async fn test_sync_sequence_success() {
1086 let ctx = TestCtx::default();
1087 ctx.setup_network().await;
1088 let relayer_model = ctx.relayer_model.clone();
1089 let mut provider = MockStellarProviderTrait::new();
1090 provider
1091 .expect_get_account()
1092 .with(eq(relayer_model.address.clone()))
1093 .returning(|_| {
1094 Box::pin(async {
1095 Ok(AccountEntry {
1096 account_id: AccountId(PublicKey::PublicKeyTypeEd25519(Uint256([0; 32]))),
1097 balance: 0,
1098 ext: AccountEntryExt::V0,
1099 flags: 0,
1100 home_domain: String32::default(),
1101 inflation_dest: None,
1102 seq_num: SequenceNumber(5),
1103 num_sub_entries: 0,
1104 signers: VecM::default(),
1105 thresholds: Thresholds([0, 0, 0, 0]),
1106 })
1107 })
1108 });
1109 let mut counter = MockTransactionCounterServiceTrait::new();
1110 counter
1111 .expect_set()
1112 .with(eq(6u64))
1113 .returning(|_| Box::pin(async { Ok(()) }));
1114 let relayer_repo = MockRelayerRepository::new();
1115 let tx_repo = MockTransactionRepository::new();
1116 let job_producer = MockJobProducerTrait::new();
1117 let signer = Arc::new(MockStellarSignTrait::new());
1118 let dex_service = create_mock_dex_service();
1119
1120 let relayer = StellarRelayer::new(
1121 relayer_model.clone(),
1122 signer,
1123 provider,
1124 StellarRelayerDependencies::new(
1125 Arc::new(relayer_repo),
1126 ctx.network_repository.clone(),
1127 Arc::new(tx_repo),
1128 Arc::new(counter),
1129 Arc::new(job_producer),
1130 ),
1131 dex_service,
1132 )
1133 .await
1134 .unwrap();
1135
1136 let result = relayer.sync_sequence().await;
1137 assert!(result.is_ok());
1138 }
1139
1140 #[tokio::test]
1141 async fn test_sync_sequence_provider_error() {
1142 let ctx = TestCtx::default();
1143 ctx.setup_network().await;
1144 let relayer_model = ctx.relayer_model.clone();
1145 let mut provider = MockStellarProviderTrait::new();
1146 provider
1147 .expect_get_account()
1148 .with(eq(relayer_model.address.clone()))
1149 .returning(|_| Box::pin(async { Err(ProviderError::Other("fail".to_string())) }));
1150 let counter = MockTransactionCounterServiceTrait::new();
1151 let relayer_repo = MockRelayerRepository::new();
1152 let tx_repo = MockTransactionRepository::new();
1153 let job_producer = MockJobProducerTrait::new();
1154 let signer = Arc::new(MockStellarSignTrait::new());
1155 let dex_service = create_mock_dex_service();
1156
1157 let relayer = StellarRelayer::new(
1158 relayer_model.clone(),
1159 signer,
1160 provider,
1161 StellarRelayerDependencies::new(
1162 Arc::new(relayer_repo),
1163 ctx.network_repository.clone(),
1164 Arc::new(tx_repo),
1165 Arc::new(counter),
1166 Arc::new(job_producer),
1167 ),
1168 dex_service,
1169 )
1170 .await
1171 .unwrap();
1172
1173 let result = relayer.sync_sequence().await;
1174 assert!(matches!(result, Err(RelayerError::ProviderError(_))));
1175 }
1176
1177 #[tokio::test]
1178 async fn test_get_status_success_stellar() {
1179 let ctx = TestCtx::default();
1180 ctx.setup_network().await;
1181 let relayer_model = ctx.relayer_model.clone();
1182 let mut provider_mock = MockStellarProviderTrait::new();
1183 let mut tx_repo_mock = MockTransactionRepository::new();
1184 let relayer_repo_mock = MockRelayerRepository::new();
1185 let job_producer_mock = MockJobProducerTrait::new();
1186 let mut counter_mock = MockTransactionCounterServiceTrait::new();
1187
1188 provider_mock.expect_get_account().times(1).returning(|_| {
1190 Box::pin(ready(Ok(AccountEntry {
1191 account_id: AccountId(PublicKey::PublicKeyTypeEd25519(Uint256([0; 32]))),
1192 balance: 10000000,
1193 seq_num: SequenceNumber(12345),
1194 ext: AccountEntryExt::V0,
1195 flags: 0,
1196 home_domain: String32::default(),
1197 inflation_dest: None,
1198 num_sub_entries: 0,
1199 signers: VecM::default(),
1200 thresholds: Thresholds([0, 0, 0, 0]),
1201 })))
1202 });
1203
1204 counter_mock
1206 .expect_get()
1207 .returning(|| Box::pin(ready(Ok(Some(12345)))));
1208
1209 tx_repo_mock
1211 .expect_count_by_status()
1212 .withf(|relayer_id, statuses| {
1213 relayer_id == "test-relayer-id"
1214 && statuses
1215 == [
1216 TransactionStatus::Pending,
1217 TransactionStatus::Sent,
1218 TransactionStatus::Submitted,
1219 ]
1220 })
1221 .returning(|_, _| Ok(0u64))
1222 .once();
1223
1224 let confirmed_tx = TransactionRepoModel {
1226 id: "tx1_stellar".to_string(),
1227 relayer_id: relayer_model.id.clone(),
1228 status: TransactionStatus::Confirmed,
1229 confirmed_at: Some("2023-02-01T12:00:00Z".to_string()),
1230 ..TransactionRepoModel::default()
1231 };
1232 let relayer_id_clone = relayer_model.id.clone();
1233 tx_repo_mock
1234 .expect_find_by_status_paginated()
1235 .withf(move |relayer_id, statuses, query, oldest_first| {
1236 *relayer_id == relayer_id_clone
1237 && statuses == [TransactionStatus::Confirmed]
1238 && query.page == 1
1239 && query.per_page == 1
1240 && *oldest_first == false
1241 })
1242 .returning(move |_, _, _, _| {
1243 Ok(crate::repositories::PaginatedResult {
1244 items: vec![confirmed_tx.clone()],
1245 total: 1,
1246 page: 1,
1247 per_page: 1,
1248 })
1249 })
1250 .once();
1251 let signer = Arc::new(MockStellarSignTrait::new());
1252 let dex_service = create_mock_dex_service();
1253
1254 let stellar_relayer = StellarRelayer::new(
1255 relayer_model.clone(),
1256 signer,
1257 provider_mock,
1258 StellarRelayerDependencies::new(
1259 Arc::new(relayer_repo_mock),
1260 ctx.network_repository.clone(),
1261 Arc::new(tx_repo_mock),
1262 Arc::new(counter_mock),
1263 Arc::new(job_producer_mock),
1264 ),
1265 dex_service,
1266 )
1267 .await
1268 .unwrap();
1269
1270 let status = stellar_relayer
1271 .get_status(GetStatusOptions::default())
1272 .await
1273 .unwrap();
1274
1275 match status {
1276 RelayerStatus::Stellar {
1277 balance,
1278 pending_transactions_count,
1279 last_confirmed_transaction_timestamp,
1280 system_disabled,
1281 paused,
1282 sequence_number,
1283 } => {
1284 assert_eq!(balance, Some("10000000".to_string()));
1285 assert_eq!(pending_transactions_count, Some(0));
1286 assert_eq!(
1287 last_confirmed_transaction_timestamp,
1288 Some("2023-02-01T12:00:00Z".to_string())
1289 );
1290 assert_eq!(system_disabled, relayer_model.system_disabled);
1291 assert_eq!(paused, relayer_model.paused);
1292 assert_eq!(sequence_number, "12345");
1293 }
1294 _ => panic!("Expected Stellar RelayerStatus"),
1295 }
1296 }
1297
1298 #[tokio::test]
1299 async fn test_get_status_skip_all_optional_fields() {
1300 let ctx = TestCtx::default();
1301 ctx.setup_network().await;
1302 let relayer_model = ctx.relayer_model.clone();
1303
1304 let provider_mock = MockStellarProviderTrait::new();
1306 let tx_repo_mock = MockTransactionRepository::new();
1308 let relayer_repo_mock = MockRelayerRepository::new();
1309 let job_producer_mock = MockJobProducerTrait::new();
1310 let mut counter_mock = MockTransactionCounterServiceTrait::new();
1311
1312 counter_mock
1314 .expect_get()
1315 .returning(|| Box::pin(ready(Ok(Some(99u64)))));
1316
1317 let signer = Arc::new(MockStellarSignTrait::new());
1318 let dex_service = create_mock_dex_service();
1319
1320 let stellar_relayer = StellarRelayer::new(
1321 relayer_model.clone(),
1322 signer,
1323 provider_mock,
1324 StellarRelayerDependencies::new(
1325 Arc::new(relayer_repo_mock),
1326 ctx.network_repository.clone(),
1327 Arc::new(tx_repo_mock),
1328 Arc::new(counter_mock),
1329 Arc::new(job_producer_mock),
1330 ),
1331 dex_service,
1332 )
1333 .await
1334 .unwrap();
1335
1336 let options = GetStatusOptions {
1337 include_balance: false,
1338 include_pending_count: false,
1339 include_last_confirmed_tx: false,
1340 };
1341 let status = stellar_relayer.get_status(options).await.unwrap();
1342
1343 match status {
1344 RelayerStatus::Stellar {
1345 balance,
1346 pending_transactions_count,
1347 last_confirmed_transaction_timestamp,
1348 system_disabled,
1349 paused,
1350 sequence_number,
1351 } => {
1352 assert_eq!(balance, None);
1353 assert_eq!(pending_transactions_count, None);
1354 assert_eq!(last_confirmed_transaction_timestamp, None);
1355 assert_eq!(system_disabled, relayer_model.system_disabled);
1356 assert_eq!(paused, relayer_model.paused);
1357 assert_eq!(sequence_number, "99");
1358 }
1359 _ => panic!("Expected Stellar RelayerStatus"),
1360 }
1361 }
1362
1363 #[tokio::test]
1364 async fn test_get_status_partial_options() {
1365 let ctx = TestCtx::default();
1366 ctx.setup_network().await;
1367 let relayer_model = ctx.relayer_model.clone();
1368
1369 let mut provider_mock = MockStellarProviderTrait::new();
1370 let mut tx_repo_mock = MockTransactionRepository::new();
1371 let relayer_repo_mock = MockRelayerRepository::new();
1372 let job_producer_mock = MockJobProducerTrait::new();
1373 let mut counter_mock = MockTransactionCounterServiceTrait::new();
1374
1375 counter_mock
1376 .expect_get()
1377 .returning(|| Box::pin(ready(Ok(Some(50u64)))));
1378
1379 provider_mock.expect_get_account().times(1).returning(|_| {
1381 Box::pin(ready(Ok(AccountEntry {
1382 account_id: AccountId(PublicKey::PublicKeyTypeEd25519(Uint256([0; 32]))),
1383 balance: 5000000,
1384 seq_num: SequenceNumber(50),
1385 ext: AccountEntryExt::V0,
1386 flags: 0,
1387 home_domain: String32::default(),
1388 inflation_dest: None,
1389 num_sub_entries: 0,
1390 signers: VecM::default(),
1391 thresholds: Thresholds([0, 0, 0, 0]),
1392 })))
1393 });
1394
1395 tx_repo_mock
1397 .expect_count_by_status()
1398 .returning(|_, _| Ok(7u64))
1399 .once();
1400
1401 let signer = Arc::new(MockStellarSignTrait::new());
1404 let dex_service = create_mock_dex_service();
1405
1406 let stellar_relayer = StellarRelayer::new(
1407 relayer_model.clone(),
1408 signer,
1409 provider_mock,
1410 StellarRelayerDependencies::new(
1411 Arc::new(relayer_repo_mock),
1412 ctx.network_repository.clone(),
1413 Arc::new(tx_repo_mock),
1414 Arc::new(counter_mock),
1415 Arc::new(job_producer_mock),
1416 ),
1417 dex_service,
1418 )
1419 .await
1420 .unwrap();
1421
1422 let options = GetStatusOptions {
1423 include_balance: true,
1424 include_pending_count: true,
1425 include_last_confirmed_tx: false,
1426 };
1427 let status = stellar_relayer.get_status(options).await.unwrap();
1428
1429 match status {
1430 RelayerStatus::Stellar {
1431 balance,
1432 pending_transactions_count,
1433 last_confirmed_transaction_timestamp,
1434 sequence_number,
1435 ..
1436 } => {
1437 assert_eq!(balance, Some("5000000".to_string()));
1438 assert_eq!(pending_transactions_count, Some(7));
1439 assert_eq!(last_confirmed_transaction_timestamp, None);
1440 assert_eq!(sequence_number, "50");
1441 }
1442 _ => panic!("Expected Stellar RelayerStatus"),
1443 }
1444 }
1445
1446 #[tokio::test]
1447 async fn test_get_status_stellar_provider_error() {
1448 let ctx = TestCtx::default();
1449 ctx.setup_network().await;
1450 let relayer_model = ctx.relayer_model.clone();
1451 let mut provider_mock = MockStellarProviderTrait::new();
1452 let tx_repo_mock = MockTransactionRepository::new();
1453 let relayer_repo_mock = MockRelayerRepository::new();
1454 let job_producer_mock = MockJobProducerTrait::new();
1455 let mut counter_mock = MockTransactionCounterServiceTrait::new();
1456
1457 provider_mock
1459 .expect_get_account()
1460 .with(eq(relayer_model.address.clone()))
1461 .returning(|_| {
1462 Box::pin(async { Err(ProviderError::Other("Stellar provider down".to_string())) })
1463 });
1464
1465 counter_mock
1467 .expect_get()
1468 .returning(|| Box::pin(ready(Ok(Some(100)))));
1469
1470 let signer = Arc::new(MockStellarSignTrait::new());
1471 let dex_service = create_mock_dex_service();
1472
1473 let stellar_relayer = StellarRelayer::new(
1474 relayer_model.clone(),
1475 signer,
1476 provider_mock,
1477 StellarRelayerDependencies::new(
1478 Arc::new(relayer_repo_mock),
1479 ctx.network_repository.clone(),
1480 Arc::new(tx_repo_mock),
1481 Arc::new(counter_mock),
1482 Arc::new(job_producer_mock),
1483 ),
1484 dex_service,
1485 )
1486 .await
1487 .unwrap();
1488
1489 let result = stellar_relayer
1490 .get_status(GetStatusOptions::default())
1491 .await;
1492 assert!(result.is_err());
1493 match result.err().unwrap() {
1494 RelayerError::ProviderError(msg) => {
1495 assert!(msg.contains("Failed to fetch account for balance"))
1496 }
1497 _ => panic!("Expected ProviderError for get_balance failure"),
1498 }
1499 }
1500
1501 #[tokio::test]
1502 async fn test_get_balance_success() {
1503 let ctx = TestCtx::default();
1504 ctx.setup_network().await;
1505 let relayer_model = ctx.relayer_model.clone();
1506 let mut provider = MockStellarProviderTrait::new();
1507 let expected_balance = 100_000_000i64; provider
1510 .expect_get_account()
1511 .with(eq(relayer_model.address.clone()))
1512 .returning(move |_| {
1513 Box::pin(async move {
1514 Ok(AccountEntry {
1515 account_id: AccountId(PublicKey::PublicKeyTypeEd25519(Uint256([0; 32]))),
1516 balance: expected_balance,
1517 ext: AccountEntryExt::V0,
1518 flags: 0,
1519 home_domain: String32::default(),
1520 inflation_dest: None,
1521 seq_num: SequenceNumber(5),
1522 num_sub_entries: 0,
1523 signers: VecM::default(),
1524 thresholds: Thresholds([0, 0, 0, 0]),
1525 })
1526 })
1527 });
1528
1529 let relayer_repo = Arc::new(MockRelayerRepository::new());
1530 let tx_repo = Arc::new(MockTransactionRepository::new());
1531 let job_producer = Arc::new(MockJobProducerTrait::new());
1532 let counter = Arc::new(MockTransactionCounterServiceTrait::new());
1533 let signer = Arc::new(MockStellarSignTrait::new());
1534 let dex_service = create_mock_dex_service();
1535
1536 let relayer = StellarRelayer::new(
1537 relayer_model,
1538 signer,
1539 provider,
1540 StellarRelayerDependencies::new(
1541 relayer_repo,
1542 ctx.network_repository.clone(),
1543 tx_repo,
1544 counter,
1545 job_producer,
1546 ),
1547 dex_service,
1548 )
1549 .await
1550 .unwrap();
1551
1552 let result = relayer.get_balance().await;
1553 assert!(result.is_ok());
1554 let balance_response = result.unwrap();
1555 assert_eq!(balance_response.balance, expected_balance as u128);
1556 assert_eq!(balance_response.unit, STELLAR_SMALLEST_UNIT_NAME);
1557 }
1558
1559 #[tokio::test]
1560 async fn test_get_balance_provider_error() {
1561 let ctx = TestCtx::default();
1562 ctx.setup_network().await;
1563 let relayer_model = ctx.relayer_model.clone();
1564 let mut provider = MockStellarProviderTrait::new();
1565
1566 provider
1567 .expect_get_account()
1568 .with(eq(relayer_model.address.clone()))
1569 .returning(|_| {
1570 Box::pin(async { Err(ProviderError::Other("provider failed".to_string())) })
1571 });
1572
1573 let relayer_repo = Arc::new(MockRelayerRepository::new());
1574 let tx_repo = Arc::new(MockTransactionRepository::new());
1575 let job_producer = Arc::new(MockJobProducerTrait::new());
1576 let counter = Arc::new(MockTransactionCounterServiceTrait::new());
1577 let signer = Arc::new(MockStellarSignTrait::new());
1578 let dex_service = create_mock_dex_service();
1579
1580 let relayer = StellarRelayer::new(
1581 relayer_model,
1582 signer,
1583 provider,
1584 StellarRelayerDependencies::new(
1585 relayer_repo,
1586 ctx.network_repository.clone(),
1587 tx_repo,
1588 counter,
1589 job_producer,
1590 ),
1591 dex_service,
1592 )
1593 .await
1594 .unwrap();
1595
1596 let result = relayer.get_balance().await;
1597 assert!(result.is_err());
1598 match result.err().unwrap() {
1599 RelayerError::ProviderError(msg) => {
1600 assert!(msg.contains("Failed to fetch account for balance"));
1601 }
1602 _ => panic!("Unexpected error type"),
1603 }
1604 }
1605
1606 #[tokio::test]
1607 async fn test_sign_transaction_success() {
1608 let ctx = TestCtx::default();
1609 ctx.setup_network().await;
1610 let relayer_model = ctx.relayer_model.clone();
1611 let provider = MockStellarProviderTrait::new();
1612 let mut signer = MockStellarSignTrait::new();
1613
1614 let unsigned_xdr = "AAAAAgAAAAD///8AAAAAAAAAAQAAAAAAAAACAAAAAQAAAAAAAAAB";
1615 let expected_signed_xdr =
1616 "AAAAAgAAAAD///8AAAAAAAABAAAAAAAAAAIAAAABAAAAAAAAAAEAAAABAAAAA...";
1617 let expected_signature = DecoratedSignature {
1618 hint: SignatureHint([1, 2, 3, 4]),
1619 signature: Signature([5u8; 64].try_into().unwrap()),
1620 };
1621 let expected_signature_for_closure = expected_signature.clone();
1622
1623 signer
1624 .expect_sign_xdr_transaction()
1625 .with(eq(unsigned_xdr), eq("Test SDF Network ; September 2015"))
1626 .returning(move |_, _| {
1627 Ok(SignXdrTransactionResponseStellar {
1628 signed_xdr: expected_signed_xdr.to_string(),
1629 signature: expected_signature_for_closure.clone(),
1630 })
1631 });
1632
1633 let relayer_repo = Arc::new(MockRelayerRepository::new());
1634 let tx_repo = Arc::new(MockTransactionRepository::new());
1635 let job_producer = Arc::new(MockJobProducerTrait::new());
1636 let counter = Arc::new(MockTransactionCounterServiceTrait::new());
1637 let dex_service = create_mock_dex_service();
1638
1639 let relayer = StellarRelayer::new(
1640 relayer_model,
1641 Arc::new(signer),
1642 provider,
1643 StellarRelayerDependencies::new(
1644 relayer_repo,
1645 ctx.network_repository.clone(),
1646 tx_repo,
1647 counter,
1648 job_producer,
1649 ),
1650 dex_service,
1651 )
1652 .await
1653 .unwrap();
1654
1655 let request = SignTransactionRequest::Stellar(SignTransactionRequestStellar {
1656 unsigned_xdr: unsigned_xdr.to_string(),
1657 });
1658 let result = relayer.sign_transaction(&request).await;
1659 assert!(result.is_ok());
1660
1661 match result.unwrap() {
1662 SignTransactionExternalResponse::Stellar(response) => {
1663 assert_eq!(response.signed_xdr, expected_signed_xdr);
1664 let expected_signature_base64 = base64::Engine::encode(
1666 &base64::engine::general_purpose::STANDARD,
1667 &expected_signature.signature.0,
1668 );
1669 assert_eq!(response.signature, expected_signature_base64);
1670 }
1671 _ => panic!("Expected Stellar response"),
1672 }
1673 }
1674
1675 #[tokio::test]
1676 async fn test_sign_transaction_signer_error() {
1677 let ctx = TestCtx::default();
1678 ctx.setup_network().await;
1679 let relayer_model = ctx.relayer_model.clone();
1680 let provider = MockStellarProviderTrait::new();
1681 let mut signer = MockStellarSignTrait::new();
1682
1683 let unsigned_xdr = "INVALID_XDR";
1684
1685 signer
1686 .expect_sign_xdr_transaction()
1687 .with(eq(unsigned_xdr), eq("Test SDF Network ; September 2015"))
1688 .returning(|_, _| Err(SignerError::SigningError("Invalid XDR format".to_string())));
1689
1690 let relayer_repo = Arc::new(MockRelayerRepository::new());
1691 let tx_repo = Arc::new(MockTransactionRepository::new());
1692 let job_producer = Arc::new(MockJobProducerTrait::new());
1693 let counter = Arc::new(MockTransactionCounterServiceTrait::new());
1694 let dex_service = create_mock_dex_service();
1695
1696 let relayer = StellarRelayer::new(
1697 relayer_model,
1698 Arc::new(signer),
1699 provider,
1700 StellarRelayerDependencies::new(
1701 relayer_repo,
1702 ctx.network_repository.clone(),
1703 tx_repo,
1704 counter,
1705 job_producer,
1706 ),
1707 dex_service,
1708 )
1709 .await
1710 .unwrap();
1711
1712 let request = SignTransactionRequest::Stellar(SignTransactionRequestStellar {
1713 unsigned_xdr: unsigned_xdr.to_string(),
1714 });
1715 let result = relayer.sign_transaction(&request).await;
1716 assert!(result.is_err());
1717
1718 match result.err().unwrap() {
1719 RelayerError::SignerError(err) => match err {
1720 SignerError::SigningError(msg) => {
1721 assert_eq!(msg, "Invalid XDR format");
1722 }
1723 _ => panic!("Expected SigningError"),
1724 },
1725 _ => panic!("Expected RelayerError::SignerError"),
1726 }
1727 }
1728
1729 #[tokio::test]
1730 async fn test_sign_transaction_with_different_network_passphrase() {
1731 let ctx = TestCtx::default();
1732 let custom_network = NetworkRepoModel {
1734 id: "stellar:mainnet".to_string(),
1735 name: "mainnet".to_string(),
1736 network_type: NetworkType::Stellar,
1737 config: NetworkConfigData::Stellar(StellarNetworkConfig {
1738 common: NetworkConfigCommon {
1739 network: "mainnet".to_string(),
1740 from: None,
1741 rpc_urls: Some(vec![RpcConfig::new(
1742 "https://horizon.stellar.org".to_string(),
1743 )]),
1744 explorer_urls: None,
1745 average_blocktime_ms: Some(5000),
1746 is_testnet: Some(false),
1747 tags: None,
1748 },
1749 passphrase: Some("Public Global Stellar Network ; September 2015".to_string()),
1750 horizon_url: Some("https://horizon.stellar.org".to_string()),
1751 }),
1752 };
1753 ctx.network_repository.create(custom_network).await.unwrap();
1754
1755 let mut relayer_model = ctx.relayer_model.clone();
1756 relayer_model.network = "mainnet".to_string();
1757
1758 let provider = MockStellarProviderTrait::new();
1759 let mut signer = MockStellarSignTrait::new();
1760
1761 let unsigned_xdr = "AAAAAgAAAAD///8AAAAAAAAAAQAAAAAAAAACAAAAAQAAAAAAAAAB";
1762 let expected_signature = DecoratedSignature {
1763 hint: SignatureHint([10, 20, 30, 40]),
1764 signature: Signature([15u8; 64].try_into().unwrap()),
1765 };
1766 let expected_signature_for_closure = expected_signature.clone();
1767
1768 signer
1769 .expect_sign_xdr_transaction()
1770 .with(
1771 eq(unsigned_xdr),
1772 eq("Public Global Stellar Network ; September 2015"),
1773 )
1774 .returning(move |_, _| {
1775 Ok(SignXdrTransactionResponseStellar {
1776 signed_xdr: "mainnet_signed_xdr".to_string(),
1777 signature: expected_signature_for_closure.clone(),
1778 })
1779 });
1780
1781 let relayer_repo = Arc::new(MockRelayerRepository::new());
1782 let tx_repo = Arc::new(MockTransactionRepository::new());
1783 let job_producer = Arc::new(MockJobProducerTrait::new());
1784 let counter = Arc::new(MockTransactionCounterServiceTrait::new());
1785 let dex_service = create_mock_dex_service();
1786
1787 let relayer = StellarRelayer::new(
1788 relayer_model,
1789 Arc::new(signer),
1790 provider,
1791 StellarRelayerDependencies::new(
1792 relayer_repo,
1793 ctx.network_repository.clone(),
1794 tx_repo,
1795 counter,
1796 job_producer,
1797 ),
1798 dex_service,
1799 )
1800 .await
1801 .unwrap();
1802
1803 let request = SignTransactionRequest::Stellar(SignTransactionRequestStellar {
1804 unsigned_xdr: unsigned_xdr.to_string(),
1805 });
1806 let result = relayer.sign_transaction(&request).await;
1807 assert!(result.is_ok());
1808
1809 match result.unwrap() {
1810 SignTransactionExternalResponse::Stellar(response) => {
1811 assert_eq!(response.signed_xdr, "mainnet_signed_xdr");
1812 let expected_signature_string = base64::Engine::encode(
1814 &base64::engine::general_purpose::STANDARD,
1815 &expected_signature.signature.0,
1816 );
1817 assert_eq!(response.signature, expected_signature_string);
1818 }
1819 _ => panic!("Expected Stellar response"),
1820 }
1821 }
1822
1823 #[tokio::test]
1824 async fn test_initialize_relayer_disables_when_validation_fails() {
1825 let ctx = TestCtx::default();
1826 ctx.setup_network().await;
1827 let mut relayer_model = ctx.relayer_model.clone();
1828 relayer_model.system_disabled = false; relayer_model.notification_id = Some("test-notification-id".to_string());
1830
1831 let mut provider = MockStellarProviderTrait::new();
1832 let mut relayer_repo = MockRelayerRepository::new();
1833 let mut job_producer = MockJobProducerTrait::new();
1834
1835 relayer_repo
1836 .expect_is_persistent_storage()
1837 .returning(|| false);
1838
1839 provider
1841 .expect_get_account()
1842 .returning(|_| Box::pin(ready(Err(ProviderError::Other("RPC error".to_string())))));
1843
1844 let mut disabled_relayer = relayer_model.clone();
1846 disabled_relayer.system_disabled = true;
1847 relayer_repo
1848 .expect_disable_relayer()
1849 .withf(|id, reason| {
1850 id == "test-relayer-id"
1851 && matches!(reason, crate::models::DisabledReason::SequenceSyncFailed(_))
1852 })
1853 .returning(move |_, _| Ok(disabled_relayer.clone()));
1854
1855 job_producer
1857 .expect_produce_send_notification_job()
1858 .returning(|_, _| Box::pin(async { Ok(()) }));
1859
1860 job_producer
1862 .expect_produce_relayer_health_check_job()
1863 .returning(|_, _| Box::pin(async { Ok(()) }));
1864
1865 let tx_repo = MockTransactionRepository::new();
1866 let counter = MockTransactionCounterServiceTrait::new();
1867 let signer = Arc::new(MockStellarSignTrait::new());
1868 let dex_service = create_mock_dex_service();
1869
1870 let relayer = StellarRelayer::new(
1871 relayer_model.clone(),
1872 signer,
1873 provider,
1874 StellarRelayerDependencies::new(
1875 Arc::new(relayer_repo),
1876 ctx.network_repository.clone(),
1877 Arc::new(tx_repo),
1878 Arc::new(counter),
1879 Arc::new(job_producer),
1880 ),
1881 dex_service,
1882 )
1883 .await
1884 .unwrap();
1885
1886 let result = relayer.initialize_relayer().await;
1887 assert!(result.is_ok());
1888 }
1889
1890 #[tokio::test]
1891 async fn test_initialize_relayer_enables_when_validation_passes_and_was_disabled() {
1892 let ctx = TestCtx::default();
1893 ctx.setup_network().await;
1894 let mut relayer_model = ctx.relayer_model.clone();
1895 relayer_model.system_disabled = true; let mut provider = MockStellarProviderTrait::new();
1898 let mut relayer_repo = MockRelayerRepository::new();
1899
1900 relayer_repo
1901 .expect_is_persistent_storage()
1902 .returning(|| false);
1903
1904 provider.expect_get_account().returning(|_| {
1906 Box::pin(ready(Ok(AccountEntry {
1907 account_id: AccountId(PublicKey::PublicKeyTypeEd25519(Uint256([0; 32]))),
1908 balance: 1000000000, seq_num: SequenceNumber(1),
1910 num_sub_entries: 0,
1911 inflation_dest: None,
1912 flags: 0,
1913 home_domain: String32::default(),
1914 thresholds: Thresholds([0; 4]),
1915 signers: VecM::default(),
1916 ext: AccountEntryExt::V0,
1917 })))
1918 });
1919
1920 let mut enabled_relayer = relayer_model.clone();
1922 enabled_relayer.system_disabled = false;
1923 relayer_repo
1924 .expect_enable_relayer()
1925 .with(eq("test-relayer-id".to_string()))
1926 .returning(move |_| Ok(enabled_relayer.clone()));
1927
1928 let tx_repo = MockTransactionRepository::new();
1929 let mut counter = MockTransactionCounterServiceTrait::new();
1930 counter
1931 .expect_set()
1932 .returning(|_| Box::pin(async { Ok(()) }));
1933 let signer = Arc::new(MockStellarSignTrait::new());
1934 let dex_service = create_mock_dex_service();
1935 let job_producer = MockJobProducerTrait::new();
1936
1937 let relayer = StellarRelayer::new(
1938 relayer_model.clone(),
1939 signer,
1940 provider,
1941 StellarRelayerDependencies::new(
1942 Arc::new(relayer_repo),
1943 ctx.network_repository.clone(),
1944 Arc::new(tx_repo),
1945 Arc::new(counter),
1946 Arc::new(job_producer),
1947 ),
1948 dex_service,
1949 )
1950 .await
1951 .unwrap();
1952
1953 let result = relayer.initialize_relayer().await;
1954 assert!(result.is_ok());
1955 }
1956
1957 #[tokio::test]
1958 async fn test_initialize_relayer_no_action_when_enabled_and_validation_passes() {
1959 let ctx = TestCtx::default();
1960 ctx.setup_network().await;
1961 let mut relayer_model = ctx.relayer_model.clone();
1962 relayer_model.system_disabled = false; let mut provider = MockStellarProviderTrait::new();
1965
1966 provider.expect_get_account().returning(|_| {
1968 Box::pin(ready(Ok(AccountEntry {
1969 account_id: AccountId(PublicKey::PublicKeyTypeEd25519(Uint256([0; 32]))),
1970 balance: 1000000000, seq_num: SequenceNumber(1),
1972 num_sub_entries: 0,
1973 inflation_dest: None,
1974 flags: 0,
1975 home_domain: String32::default(),
1976 thresholds: Thresholds([0; 4]),
1977 signers: VecM::default(),
1978 ext: AccountEntryExt::V0,
1979 })))
1980 });
1981
1982 let tx_repo = MockTransactionRepository::new();
1985 let mut counter = MockTransactionCounterServiceTrait::new();
1986 counter
1987 .expect_set()
1988 .returning(|_| Box::pin(async { Ok(()) }));
1989 let signer = Arc::new(MockStellarSignTrait::new());
1990 let dex_service = create_mock_dex_service();
1991 let job_producer = MockJobProducerTrait::new();
1992 let mut relayer_repo = MockRelayerRepository::new();
1993
1994 relayer_repo
1995 .expect_is_persistent_storage()
1996 .returning(|| false);
1997
1998 let relayer = StellarRelayer::new(
1999 relayer_model.clone(),
2000 signer,
2001 provider,
2002 StellarRelayerDependencies::new(
2003 Arc::new(relayer_repo),
2004 ctx.network_repository.clone(),
2005 Arc::new(tx_repo),
2006 Arc::new(counter),
2007 Arc::new(job_producer),
2008 ),
2009 dex_service,
2010 )
2011 .await
2012 .unwrap();
2013
2014 let result = relayer.initialize_relayer().await;
2015 assert!(result.is_ok());
2016 }
2017
2018 #[tokio::test]
2019 async fn test_initialize_relayer_sends_notification_when_disabled() {
2020 let ctx = TestCtx::default();
2021 ctx.setup_network().await;
2022 let mut relayer_model = ctx.relayer_model.clone();
2023 relayer_model.system_disabled = false; relayer_model.notification_id = Some("test-notification-id".to_string());
2025
2026 let mut provider = MockStellarProviderTrait::new();
2027 let mut relayer_repo = MockRelayerRepository::new();
2028 let mut job_producer = MockJobProducerTrait::new();
2029
2030 relayer_repo
2031 .expect_is_persistent_storage()
2032 .returning(|| false);
2033
2034 provider.expect_get_account().returning(|_| {
2036 Box::pin(ready(Err(ProviderError::Other(
2037 "Sequence sync failed".to_string(),
2038 ))))
2039 });
2040
2041 let mut disabled_relayer = relayer_model.clone();
2043 disabled_relayer.system_disabled = true;
2044 relayer_repo
2045 .expect_disable_relayer()
2046 .withf(|id, reason| {
2047 id == "test-relayer-id"
2048 && matches!(reason, crate::models::DisabledReason::SequenceSyncFailed(_))
2049 })
2050 .returning(move |_, _| Ok(disabled_relayer.clone()));
2051
2052 job_producer
2054 .expect_produce_send_notification_job()
2055 .returning(|_, _| Box::pin(async { Ok(()) }));
2056
2057 job_producer
2059 .expect_produce_relayer_health_check_job()
2060 .returning(|_, _| Box::pin(async { Ok(()) }));
2061
2062 let tx_repo = MockTransactionRepository::new();
2063 let counter = MockTransactionCounterServiceTrait::new();
2064 let signer = Arc::new(MockStellarSignTrait::new());
2065 let dex_service = create_mock_dex_service();
2066
2067 let relayer = StellarRelayer::new(
2068 relayer_model.clone(),
2069 signer,
2070 provider,
2071 StellarRelayerDependencies::new(
2072 Arc::new(relayer_repo),
2073 ctx.network_repository.clone(),
2074 Arc::new(tx_repo),
2075 Arc::new(counter),
2076 Arc::new(job_producer),
2077 ),
2078 dex_service,
2079 )
2080 .await
2081 .unwrap();
2082
2083 let result = relayer.initialize_relayer().await;
2084 assert!(result.is_ok());
2085 }
2086
2087 #[tokio::test]
2088 async fn test_initialize_relayer_no_notification_when_no_notification_id() {
2089 let ctx = TestCtx::default();
2090 ctx.setup_network().await;
2091 let mut relayer_model = ctx.relayer_model.clone();
2092 relayer_model.system_disabled = false; relayer_model.notification_id = None; let mut provider = MockStellarProviderTrait::new();
2096 let mut relayer_repo = MockRelayerRepository::new();
2097 relayer_repo
2098 .expect_is_persistent_storage()
2099 .returning(|| false);
2100
2101 provider.expect_get_account().returning(|_| {
2103 Box::pin(ready(Err(ProviderError::Other(
2104 "Sequence sync failed".to_string(),
2105 ))))
2106 });
2107
2108 let mut disabled_relayer = relayer_model.clone();
2110 disabled_relayer.system_disabled = true;
2111 relayer_repo
2112 .expect_disable_relayer()
2113 .withf(|id, reason| {
2114 id == "test-relayer-id"
2115 && matches!(reason, crate::models::DisabledReason::SequenceSyncFailed(_))
2116 })
2117 .returning(move |_, _| Ok(disabled_relayer.clone()));
2118
2119 let mut job_producer = MockJobProducerTrait::new();
2122 job_producer
2123 .expect_produce_relayer_health_check_job()
2124 .returning(|_, _| Box::pin(async { Ok(()) }));
2125
2126 let tx_repo = MockTransactionRepository::new();
2127 let counter = MockTransactionCounterServiceTrait::new();
2128 let signer = Arc::new(MockStellarSignTrait::new());
2129 let dex_service = create_mock_dex_service();
2130
2131 let relayer = StellarRelayer::new(
2132 relayer_model.clone(),
2133 signer,
2134 provider,
2135 StellarRelayerDependencies::new(
2136 Arc::new(relayer_repo),
2137 ctx.network_repository.clone(),
2138 Arc::new(tx_repo),
2139 Arc::new(counter),
2140 Arc::new(job_producer),
2141 ),
2142 dex_service,
2143 )
2144 .await
2145 .unwrap();
2146
2147 let result = relayer.initialize_relayer().await;
2148 assert!(result.is_ok());
2149 }
2150
2151 mod process_transaction_request_tests {
2152 use super::*;
2153 use crate::constants::STELLAR_STATUS_CHECK_INITIAL_DELAY_SECONDS;
2154 use crate::models::{
2155 NetworkTransactionRequest, NetworkType, StellarTransactionRequest, TransactionStatus,
2156 };
2157 use chrono::Utc;
2158
2159 fn create_test_transaction_request() -> NetworkTransactionRequest {
2161 NetworkTransactionRequest::Stellar(StellarTransactionRequest {
2162 source_account: None,
2163 network: "testnet".to_string(),
2164 operations: None,
2165 memo: None,
2166 valid_until: None,
2167 transaction_xdr: Some("AAAAAgAAAACige4lTdwSB/sto4SniEdJ2kOa2X65s5bqkd40J4DjSwAAAAEAAHAkAAAADwAAAAAAAAAAAAAAAQAAAAAAAAABAAAAAKKB7iVN3BIH+y2jhKeIR0naQ5rZfrmzluqR3jQngONLAAAAAAAAAAAAD0JAAAAAAAAAAAA=".to_string()),
2168 fee_bump: None,
2169 max_fee: None,
2170 signed_auth_entry: None,
2171 })
2172 }
2173
2174 #[tokio::test]
2175 async fn test_process_transaction_request_calls_job_producer_methods() {
2176 let ctx = TestCtx::default();
2177 ctx.setup_network().await;
2178 let relayer_model = ctx.relayer_model.clone();
2179
2180 let provider = MockStellarProviderTrait::new();
2181 let signer = Arc::new(MockStellarSignTrait::new());
2182 let dex_service = create_mock_dex_service();
2183
2184 let tx_request = create_test_transaction_request();
2186
2187 let mut tx_repo = MockTransactionRepository::new();
2189 tx_repo.expect_create().returning(|t| Ok(t.clone()));
2190
2191 let mut job_producer = MockJobProducerTrait::new();
2193
2194 job_producer
2196 .expect_produce_transaction_request_job()
2197 .withf(|req, delay| {
2198 !req.transaction_id.is_empty() && !req.relayer_id.is_empty() && delay.is_none()
2199 })
2200 .times(1)
2201 .returning(|_, _| Box::pin(async { Ok(()) }));
2202
2203 job_producer
2205 .expect_produce_check_transaction_status_job()
2206 .withf(|check, delay| {
2207 !check.transaction_id.is_empty()
2208 && !check.relayer_id.is_empty()
2209 && check.network_type == Some(NetworkType::Stellar)
2210 && delay.is_some()
2211 })
2212 .times(1)
2213 .returning(|_, _| Box::pin(async { Ok(()) }));
2214
2215 let relayer_repo = Arc::new(MockRelayerRepository::new());
2216 let counter = MockTransactionCounterServiceTrait::new();
2217
2218 let relayer = StellarRelayer::new(
2219 relayer_model,
2220 signer,
2221 provider,
2222 StellarRelayerDependencies::new(
2223 relayer_repo,
2224 ctx.network_repository.clone(),
2225 Arc::new(tx_repo),
2226 Arc::new(counter),
2227 Arc::new(job_producer),
2228 ),
2229 dex_service,
2230 )
2231 .await
2232 .unwrap();
2233
2234 let result = relayer.process_transaction_request(tx_request).await;
2235 if let Err(e) = &result {
2236 panic!("process_transaction_request failed: {}", e);
2237 }
2238 assert!(result.is_ok());
2239 }
2240
2241 #[tokio::test]
2242 async fn test_process_transaction_request_with_scheduled_delay() {
2243 let ctx = TestCtx::default();
2244 ctx.setup_network().await;
2245 let relayer_model = ctx.relayer_model.clone();
2246
2247 let provider = MockStellarProviderTrait::new();
2248 let signer = Arc::new(MockStellarSignTrait::new());
2249 let dex_service = create_mock_dex_service();
2250
2251 let tx_request = create_test_transaction_request();
2252
2253 let mut tx_repo = MockTransactionRepository::new();
2254 tx_repo.expect_create().returning(|t| Ok(t.clone()));
2255
2256 let mut job_producer = MockJobProducerTrait::new();
2257
2258 job_producer
2259 .expect_produce_transaction_request_job()
2260 .returning(|_, _| Box::pin(async { Ok(()) }));
2261
2262 job_producer
2264 .expect_produce_check_transaction_status_job()
2265 .withf(|_, delay| {
2266 if let Some(scheduled_at) = delay {
2268 let now = Utc::now().timestamp();
2270 let diff = scheduled_at - now;
2271 diff >= (STELLAR_STATUS_CHECK_INITIAL_DELAY_SECONDS - 2)
2273 && diff <= (STELLAR_STATUS_CHECK_INITIAL_DELAY_SECONDS + 2)
2274 } else {
2275 false
2276 }
2277 })
2278 .times(1)
2279 .returning(|_, _| Box::pin(async { Ok(()) }));
2280
2281 let relayer_repo = Arc::new(MockRelayerRepository::new());
2282 let counter = MockTransactionCounterServiceTrait::new();
2283
2284 let relayer = StellarRelayer::new(
2285 relayer_model,
2286 signer,
2287 provider,
2288 StellarRelayerDependencies::new(
2289 relayer_repo,
2290 ctx.network_repository.clone(),
2291 Arc::new(tx_repo),
2292 Arc::new(counter),
2293 Arc::new(job_producer),
2294 ),
2295 dex_service,
2296 )
2297 .await
2298 .unwrap();
2299
2300 let result = relayer.process_transaction_request(tx_request).await;
2301 assert!(result.is_ok());
2302 }
2303
2304 #[tokio::test]
2305 async fn test_process_transaction_request_repository_failure() {
2306 let ctx = TestCtx::default();
2307 ctx.setup_network().await;
2308 let relayer_model = ctx.relayer_model.clone();
2309
2310 let provider = MockStellarProviderTrait::new();
2311 let signer = Arc::new(MockStellarSignTrait::new());
2312 let dex_service = create_mock_dex_service();
2313
2314 let tx_request = create_test_transaction_request();
2315
2316 let mut tx_repo = MockTransactionRepository::new();
2318 tx_repo.expect_create().returning(|_| {
2319 Err(RepositoryError::TransactionFailure(
2320 "Database connection failed".to_string(),
2321 ))
2322 });
2323
2324 let job_producer = MockJobProducerTrait::new();
2326
2327 let relayer_repo = Arc::new(MockRelayerRepository::new());
2328 let counter = MockTransactionCounterServiceTrait::new();
2329
2330 let relayer = StellarRelayer::new(
2331 relayer_model,
2332 signer,
2333 provider,
2334 StellarRelayerDependencies::new(
2335 relayer_repo,
2336 ctx.network_repository.clone(),
2337 Arc::new(tx_repo),
2338 Arc::new(counter),
2339 Arc::new(job_producer),
2340 ),
2341 dex_service,
2342 )
2343 .await
2344 .unwrap();
2345
2346 let result = relayer.process_transaction_request(tx_request).await;
2347 assert!(result.is_err());
2348 let err_msg = result.err().unwrap().to_string();
2350 assert!(
2351 err_msg.contains("Database connection failed"),
2352 "Error was: {}",
2353 err_msg
2354 );
2355 }
2356
2357 #[tokio::test]
2358 async fn test_process_transaction_request_job_producer_request_failure() {
2359 let ctx = TestCtx::default();
2360 ctx.setup_network().await;
2361 let relayer_model = ctx.relayer_model.clone();
2362
2363 let provider = MockStellarProviderTrait::new();
2364 let signer = Arc::new(MockStellarSignTrait::new());
2365 let dex_service = create_mock_dex_service();
2366
2367 let tx_request = create_test_transaction_request();
2368
2369 let mut tx_repo = MockTransactionRepository::new();
2370 tx_repo.expect_create().returning(|t| Ok(t.clone()));
2371
2372 let mut job_producer = MockJobProducerTrait::new();
2373
2374 job_producer
2376 .expect_produce_check_transaction_status_job()
2377 .returning(|_, _| Box::pin(async { Ok(()) }));
2378
2379 job_producer
2382 .expect_produce_transaction_request_job()
2383 .returning(|_, _| {
2384 Box::pin(async {
2385 Err(crate::jobs::JobProducerError::QueueError(
2386 "Queue is full".to_string(),
2387 ))
2388 })
2389 });
2390
2391 let relayer_repo = Arc::new(MockRelayerRepository::new());
2392 let counter = MockTransactionCounterServiceTrait::new();
2393
2394 let relayer = StellarRelayer::new(
2395 relayer_model,
2396 signer,
2397 provider,
2398 StellarRelayerDependencies::new(
2399 relayer_repo,
2400 ctx.network_repository.clone(),
2401 Arc::new(tx_repo),
2402 Arc::new(counter),
2403 Arc::new(job_producer),
2404 ),
2405 dex_service,
2406 )
2407 .await
2408 .unwrap();
2409
2410 let result = relayer.process_transaction_request(tx_request).await;
2411 assert!(result.is_err());
2412 }
2413
2414 #[tokio::test]
2415 async fn test_process_transaction_request_job_producer_status_check_failure() {
2416 let ctx = TestCtx::default();
2417 ctx.setup_network().await;
2418 let relayer_model = ctx.relayer_model.clone();
2419
2420 let provider = MockStellarProviderTrait::new();
2421 let signer = Arc::new(MockStellarSignTrait::new());
2422 let dex_service = create_mock_dex_service();
2423
2424 let tx_request = create_test_transaction_request();
2425
2426 let mut tx_repo = MockTransactionRepository::new();
2427 tx_repo.expect_create().returning(|t| Ok(t.clone()));
2428 tx_repo
2430 .expect_partial_update()
2431 .returning(|_, _| Ok(TransactionRepoModel::default()));
2432
2433 let mut job_producer = MockJobProducerTrait::new();
2434
2435 job_producer
2438 .expect_produce_check_transaction_status_job()
2439 .returning(|_, _| {
2440 Box::pin(async {
2441 Err(crate::jobs::JobProducerError::QueueError(
2442 "Failed to queue job".to_string(),
2443 ))
2444 })
2445 });
2446
2447 let relayer_repo = Arc::new(MockRelayerRepository::new());
2451 let counter = MockTransactionCounterServiceTrait::new();
2452
2453 let relayer = StellarRelayer::new(
2454 relayer_model,
2455 signer,
2456 provider,
2457 StellarRelayerDependencies::new(
2458 relayer_repo,
2459 ctx.network_repository.clone(),
2460 Arc::new(tx_repo),
2461 Arc::new(counter),
2462 Arc::new(job_producer),
2463 ),
2464 dex_service,
2465 )
2466 .await
2467 .unwrap();
2468
2469 let result = relayer.process_transaction_request(tx_request).await;
2470 assert!(result.is_err());
2471 }
2472
2473 #[tokio::test]
2474 async fn test_process_transaction_request_status_check_failure_marks_tx_failed() {
2475 let ctx = TestCtx::default();
2478 ctx.setup_network().await;
2479 let relayer_model = ctx.relayer_model.clone();
2480
2481 let provider = MockStellarProviderTrait::new();
2482 let signer = Arc::new(MockStellarSignTrait::new());
2483 let dex_service = create_mock_dex_service();
2484
2485 let tx_request = create_test_transaction_request();
2486
2487 let mut tx_repo = MockTransactionRepository::new();
2488 tx_repo.expect_create().returning(|t| Ok(t.clone()));
2489
2490 tx_repo
2492 .expect_partial_update()
2493 .withf(|_tx_id, update| {
2494 update.status == Some(TransactionStatus::Failed)
2495 && update.status_reason == Some("Queue unavailable".to_string())
2496 })
2497 .returning(|_, _| Ok(TransactionRepoModel::default()));
2498
2499 let mut job_producer = MockJobProducerTrait::new();
2500 job_producer
2501 .expect_produce_check_transaction_status_job()
2502 .returning(|_, _| {
2503 Box::pin(async {
2504 Err(crate::jobs::JobProducerError::QueueError(
2505 "Redis timeout".to_string(),
2506 ))
2507 })
2508 });
2509
2510 let relayer_repo = Arc::new(MockRelayerRepository::new());
2511 let counter = MockTransactionCounterServiceTrait::new();
2512
2513 let relayer = StellarRelayer::new(
2514 relayer_model,
2515 signer,
2516 provider,
2517 StellarRelayerDependencies::new(
2518 relayer_repo,
2519 ctx.network_repository.clone(),
2520 Arc::new(tx_repo),
2521 Arc::new(counter),
2522 Arc::new(job_producer),
2523 ),
2524 dex_service,
2525 )
2526 .await
2527 .unwrap();
2528
2529 let result = relayer.process_transaction_request(tx_request).await;
2530 assert!(result.is_err());
2531 }
2533
2534 #[tokio::test]
2535 async fn test_process_transaction_request_preserves_transaction_data() {
2536 let ctx = TestCtx::default();
2537 ctx.setup_network().await;
2538 let relayer_model = ctx.relayer_model.clone();
2539
2540 let provider = MockStellarProviderTrait::new();
2541 let signer = Arc::new(MockStellarSignTrait::new());
2542 let dex_service = create_mock_dex_service();
2543
2544 let tx_request = create_test_transaction_request();
2545
2546 let mut tx_repo = MockTransactionRepository::new();
2547 tx_repo.expect_create().returning(|t| Ok(t.clone()));
2548
2549 let mut job_producer = MockJobProducerTrait::new();
2550 job_producer
2551 .expect_produce_transaction_request_job()
2552 .returning(|_, _| Box::pin(async { Ok(()) }));
2553 job_producer
2554 .expect_produce_check_transaction_status_job()
2555 .returning(|_, _| Box::pin(async { Ok(()) }));
2556
2557 let relayer_repo = Arc::new(MockRelayerRepository::new());
2558 let counter = MockTransactionCounterServiceTrait::new();
2559
2560 let relayer = StellarRelayer::new(
2561 relayer_model.clone(),
2562 signer,
2563 provider,
2564 StellarRelayerDependencies::new(
2565 relayer_repo,
2566 ctx.network_repository.clone(),
2567 Arc::new(tx_repo),
2568 Arc::new(counter),
2569 Arc::new(job_producer),
2570 ),
2571 dex_service,
2572 )
2573 .await
2574 .unwrap();
2575
2576 let result = relayer.process_transaction_request(tx_request).await;
2577 assert!(result.is_ok());
2578
2579 let returned_tx = result.unwrap();
2580 assert_eq!(returned_tx.relayer_id, relayer_model.id);
2581 assert_eq!(returned_tx.network_type, NetworkType::Stellar);
2582 assert_eq!(returned_tx.status, TransactionStatus::Pending);
2583 }
2584 }
2585
2586 mod populate_allowed_tokens_metadata_tests {
2588 use super::*;
2589 use crate::models::StellarTokenKind;
2590
2591 #[tokio::test]
2592 async fn test_populate_allowed_tokens_metadata_no_tokens() {
2593 let ctx = TestCtx::default();
2594 ctx.setup_network().await;
2595 let relayer_model = ctx.relayer_model.clone();
2596
2597 let provider = MockStellarProviderTrait::new();
2598 let signer = Arc::new(MockStellarSignTrait::new());
2599 let dex_service = create_mock_dex_service();
2600
2601 let mut relayer_repo = MockRelayerRepository::new();
2602 relayer_repo.expect_update_policy().times(0);
2604
2605 let tx_repo = MockTransactionRepository::new();
2606 let job_producer = MockJobProducerTrait::new();
2607 let counter = MockTransactionCounterServiceTrait::new();
2608
2609 let relayer = StellarRelayer::new(
2610 relayer_model.clone(),
2611 signer,
2612 provider,
2613 StellarRelayerDependencies::new(
2614 Arc::new(relayer_repo),
2615 ctx.network_repository.clone(),
2616 Arc::new(tx_repo),
2617 Arc::new(counter),
2618 Arc::new(job_producer),
2619 ),
2620 dex_service,
2621 )
2622 .await
2623 .unwrap();
2624
2625 let result = relayer.populate_allowed_tokens_metadata().await;
2626 assert!(result.is_ok());
2627 }
2628
2629 #[tokio::test]
2630 async fn test_populate_allowed_tokens_metadata_empty_tokens() {
2631 let ctx = TestCtx::default();
2632 ctx.setup_network().await;
2633 let mut relayer_model = ctx.relayer_model.clone();
2634
2635 let mut policy = RelayerStellarPolicy::default();
2637 policy.allowed_tokens = Some(vec![]);
2638 relayer_model.policies = RelayerNetworkPolicy::Stellar(policy);
2639
2640 let provider = MockStellarProviderTrait::new();
2641 let signer = Arc::new(MockStellarSignTrait::new());
2642 let dex_service = create_mock_dex_service();
2643
2644 let mut relayer_repo = MockRelayerRepository::new();
2645 relayer_repo.expect_update_policy().times(0);
2647
2648 let tx_repo = MockTransactionRepository::new();
2649 let job_producer = MockJobProducerTrait::new();
2650 let counter = MockTransactionCounterServiceTrait::new();
2651
2652 let relayer = StellarRelayer::new(
2653 relayer_model.clone(),
2654 signer,
2655 provider,
2656 StellarRelayerDependencies::new(
2657 Arc::new(relayer_repo),
2658 ctx.network_repository.clone(),
2659 Arc::new(tx_repo),
2660 Arc::new(counter),
2661 Arc::new(job_producer),
2662 ),
2663 dex_service,
2664 )
2665 .await
2666 .unwrap();
2667
2668 let result = relayer.populate_allowed_tokens_metadata().await;
2669 assert!(result.is_ok());
2670 }
2671
2672 #[tokio::test]
2673 async fn test_populate_allowed_tokens_metadata_classic_asset_success() {
2674 let ctx = TestCtx::default();
2675 ctx.setup_network().await;
2676 let mut relayer_model = ctx.relayer_model.clone();
2677
2678 let mut policy = RelayerStellarPolicy::default();
2680 policy.allowed_tokens = Some(vec![crate::models::StellarAllowedTokensPolicy {
2681 asset: "USDC:GBBD47IF6LWK7P7MDEVSCWR7DPUWV3NY3DTQEVFL4NAT4AQH3ZLLFLA5".to_string(),
2682 metadata: None,
2683 max_allowed_fee: None,
2684 swap_config: None,
2685 }]);
2686 relayer_model.policies = RelayerNetworkPolicy::Stellar(policy);
2687
2688 let provider = MockStellarProviderTrait::new();
2689 let signer = Arc::new(MockStellarSignTrait::new());
2690 let dex_service = create_mock_dex_service();
2691
2692 let mut relayer_repo = MockRelayerRepository::new();
2693 relayer_repo
2694 .expect_update_policy()
2695 .times(1)
2696 .returning(|_, _| Ok(RelayerRepoModel::default()));
2697
2698 let tx_repo = MockTransactionRepository::new();
2699 let job_producer = MockJobProducerTrait::new();
2700 let counter = MockTransactionCounterServiceTrait::new();
2701
2702 let relayer = StellarRelayer::new(
2703 relayer_model.clone(),
2704 signer,
2705 provider,
2706 StellarRelayerDependencies::new(
2707 Arc::new(relayer_repo),
2708 ctx.network_repository.clone(),
2709 Arc::new(tx_repo),
2710 Arc::new(counter),
2711 Arc::new(job_producer),
2712 ),
2713 dex_service,
2714 )
2715 .await
2716 .unwrap();
2717
2718 let result = relayer.populate_allowed_tokens_metadata().await;
2719 assert!(result.is_ok());
2720
2721 let updated_policy = result.unwrap();
2722 assert!(updated_policy.allowed_tokens.is_some());
2723
2724 let tokens = updated_policy.allowed_tokens.unwrap();
2725 assert_eq!(tokens.len(), 1);
2726
2727 let token = &tokens[0];
2729 assert!(token.metadata.is_some());
2730
2731 let metadata = token.metadata.as_ref().unwrap();
2732 assert_eq!(metadata.decimals, 7); assert_eq!(
2734 metadata.canonical_asset_id,
2735 "USDC:GBBD47IF6LWK7P7MDEVSCWR7DPUWV3NY3DTQEVFL4NAT4AQH3ZLLFLA5"
2736 );
2737
2738 match &metadata.kind {
2740 StellarTokenKind::Classic { code, issuer } => {
2741 assert_eq!(code, "USDC");
2742 assert_eq!(
2743 issuer,
2744 "GBBD47IF6LWK7P7MDEVSCWR7DPUWV3NY3DTQEVFL4NAT4AQH3ZLLFLA5"
2745 );
2746 }
2747 _ => panic!("Expected Classic token kind"),
2748 }
2749 }
2750
2751 #[tokio::test]
2752 async fn test_populate_allowed_tokens_metadata_multiple_tokens() {
2753 let ctx = TestCtx::default();
2754 ctx.setup_network().await;
2755 let mut relayer_model = ctx.relayer_model.clone();
2756
2757 let mut policy = RelayerStellarPolicy::default();
2759 policy.allowed_tokens = Some(vec![
2760 crate::models::StellarAllowedTokensPolicy {
2761 asset: "USDC:GBBD47IF6LWK7P7MDEVSCWR7DPUWV3NY3DTQEVFL4NAT4AQH3ZLLFLA5"
2762 .to_string(),
2763 metadata: None,
2764 max_allowed_fee: None,
2765 swap_config: None,
2766 },
2767 crate::models::StellarAllowedTokensPolicy {
2768 asset: "AQUA:GAHPYWLK6YRN7CVYZOO4H3VDRZ7PVF5UJGLZCSPAEIKJE2XSWF5LAGER"
2769 .to_string(),
2770 metadata: None,
2771 max_allowed_fee: Some(1000000),
2772 swap_config: None,
2773 },
2774 ]);
2775 relayer_model.policies = RelayerNetworkPolicy::Stellar(policy);
2776
2777 let provider = MockStellarProviderTrait::new();
2778 let signer = Arc::new(MockStellarSignTrait::new());
2779 let dex_service = create_mock_dex_service();
2780
2781 let mut relayer_repo = MockRelayerRepository::new();
2782 relayer_repo
2783 .expect_update_policy()
2784 .times(1)
2785 .returning(|_, _| Ok(RelayerRepoModel::default()));
2786
2787 let tx_repo = MockTransactionRepository::new();
2788 let job_producer = MockJobProducerTrait::new();
2789 let counter = MockTransactionCounterServiceTrait::new();
2790
2791 let relayer = StellarRelayer::new(
2792 relayer_model.clone(),
2793 signer,
2794 provider,
2795 StellarRelayerDependencies::new(
2796 Arc::new(relayer_repo),
2797 ctx.network_repository.clone(),
2798 Arc::new(tx_repo),
2799 Arc::new(counter),
2800 Arc::new(job_producer),
2801 ),
2802 dex_service,
2803 )
2804 .await
2805 .unwrap();
2806
2807 let result = relayer.populate_allowed_tokens_metadata().await;
2808 assert!(result.is_ok());
2809
2810 let updated_policy = result.unwrap();
2811 let tokens = updated_policy.allowed_tokens.unwrap();
2812 assert_eq!(tokens.len(), 2);
2813
2814 assert!(tokens[0].metadata.is_some());
2816 assert!(tokens[1].metadata.is_some());
2817
2818 let usdc_metadata = tokens[0].metadata.as_ref().unwrap();
2820 match &usdc_metadata.kind {
2821 StellarTokenKind::Classic { code, .. } => {
2822 assert_eq!(code, "USDC");
2823 }
2824 _ => panic!("Expected Classic token kind for USDC"),
2825 }
2826
2827 let aqua_metadata = tokens[1].metadata.as_ref().unwrap();
2829 match &aqua_metadata.kind {
2830 StellarTokenKind::Classic { code, .. } => {
2831 assert_eq!(code, "AQUA");
2832 }
2833 _ => panic!("Expected Classic token kind for AQUA"),
2834 }
2835
2836 assert_eq!(tokens[1].max_allowed_fee, Some(1000000));
2838 }
2839
2840 #[tokio::test]
2841 async fn test_populate_allowed_tokens_metadata_invalid_asset() {
2842 let ctx = TestCtx::default();
2843 ctx.setup_network().await;
2844 let mut relayer_model = ctx.relayer_model.clone();
2845
2846 let mut policy = RelayerStellarPolicy::default();
2848 policy.allowed_tokens = Some(vec![crate::models::StellarAllowedTokensPolicy {
2849 asset: "INVALID_FORMAT".to_string(), metadata: None,
2851 max_allowed_fee: None,
2852 swap_config: None,
2853 }]);
2854 relayer_model.policies = RelayerNetworkPolicy::Stellar(policy);
2855
2856 let provider = MockStellarProviderTrait::new();
2857 let signer = Arc::new(MockStellarSignTrait::new());
2858 let dex_service = create_mock_dex_service();
2859
2860 let relayer_repo = MockRelayerRepository::new();
2861 let tx_repo = MockTransactionRepository::new();
2862 let job_producer = MockJobProducerTrait::new();
2863 let counter = MockTransactionCounterServiceTrait::new();
2864
2865 let relayer = StellarRelayer::new(
2866 relayer_model.clone(),
2867 signer,
2868 provider,
2869 StellarRelayerDependencies::new(
2870 Arc::new(relayer_repo),
2871 ctx.network_repository.clone(),
2872 Arc::new(tx_repo),
2873 Arc::new(counter),
2874 Arc::new(job_producer),
2875 ),
2876 dex_service,
2877 )
2878 .await
2879 .unwrap();
2880
2881 let result = relayer.populate_allowed_tokens_metadata().await;
2882 assert!(result.is_err());
2883 }
2884 }
2885
2886 mod migrate_fee_payment_strategy_tests {
2888 use super::*;
2889
2890 #[tokio::test]
2891 async fn test_migrate_fee_payment_strategy_in_memory_storage() {
2892 let ctx = TestCtx::default();
2893 ctx.setup_network().await;
2894 let relayer_model = ctx.relayer_model.clone();
2895
2896 let provider = MockStellarProviderTrait::new();
2897 let signer = Arc::new(MockStellarSignTrait::new());
2898 let dex_service = create_mock_dex_service();
2899
2900 let mut relayer_repo = MockRelayerRepository::new();
2901 relayer_repo
2903 .expect_is_persistent_storage()
2904 .returning(|| false);
2905 relayer_repo.expect_update_policy().times(0);
2907
2908 let tx_repo = MockTransactionRepository::new();
2909 let job_producer = MockJobProducerTrait::new();
2910 let counter = MockTransactionCounterServiceTrait::new();
2911
2912 let relayer = StellarRelayer::new(
2913 relayer_model.clone(),
2914 signer,
2915 provider,
2916 StellarRelayerDependencies::new(
2917 Arc::new(relayer_repo),
2918 ctx.network_repository.clone(),
2919 Arc::new(tx_repo),
2920 Arc::new(counter),
2921 Arc::new(job_producer),
2922 ),
2923 dex_service,
2924 )
2925 .await
2926 .unwrap();
2927
2928 let result = relayer.migrate_fee_payment_strategy_if_needed().await;
2929 assert!(result.is_ok());
2930 }
2931
2932 #[tokio::test]
2933 async fn test_migrate_fee_payment_strategy_already_set() {
2934 let ctx = TestCtx::default();
2935 ctx.setup_network().await;
2936 let mut relayer_model = ctx.relayer_model.clone();
2937
2938 let mut policy = RelayerStellarPolicy::default();
2940 policy.fee_payment_strategy = Some(StellarFeePaymentStrategy::User);
2941 relayer_model.policies = RelayerNetworkPolicy::Stellar(policy);
2942
2943 let provider = MockStellarProviderTrait::new();
2944 let signer = Arc::new(MockStellarSignTrait::new());
2945 let dex_service = create_mock_dex_service();
2946
2947 let mut relayer_repo = MockRelayerRepository::new();
2948 relayer_repo
2949 .expect_is_persistent_storage()
2950 .returning(|| true);
2951 relayer_repo.expect_update_policy().times(0);
2953
2954 let tx_repo = MockTransactionRepository::new();
2955 let job_producer = MockJobProducerTrait::new();
2956 let counter = MockTransactionCounterServiceTrait::new();
2957
2958 let relayer = StellarRelayer::new(
2959 relayer_model.clone(),
2960 signer,
2961 provider,
2962 StellarRelayerDependencies::new(
2963 Arc::new(relayer_repo),
2964 ctx.network_repository.clone(),
2965 Arc::new(tx_repo),
2966 Arc::new(counter),
2967 Arc::new(job_producer),
2968 ),
2969 dex_service,
2970 )
2971 .await
2972 .unwrap();
2973
2974 let result = relayer.migrate_fee_payment_strategy_if_needed().await;
2975 assert!(result.is_ok());
2976 }
2977
2978 #[tokio::test]
2979 async fn test_migrate_fee_payment_strategy_migration_needed() {
2980 let ctx = TestCtx::default();
2981 ctx.setup_network().await;
2982 let relayer_model = ctx.relayer_model.clone();
2983
2984 let provider = MockStellarProviderTrait::new();
2985 let signer = Arc::new(MockStellarSignTrait::new());
2986 let dex_service = create_mock_dex_service();
2987
2988 let mut relayer_repo = MockRelayerRepository::new();
2989 relayer_repo
2990 .expect_is_persistent_storage()
2991 .returning(|| true);
2992 relayer_repo
2993 .expect_update_policy()
2994 .times(1)
2995 .returning(|_, policy| {
2996 if let RelayerNetworkPolicy::Stellar(stellar_policy) = &policy {
2998 assert_eq!(
2999 stellar_policy.fee_payment_strategy,
3000 Some(StellarFeePaymentStrategy::Relayer)
3001 );
3002 }
3003 Ok(RelayerRepoModel::default())
3004 });
3005
3006 let tx_repo = MockTransactionRepository::new();
3007 let job_producer = MockJobProducerTrait::new();
3008 let counter = MockTransactionCounterServiceTrait::new();
3009
3010 let relayer = StellarRelayer::new(
3011 relayer_model.clone(),
3012 signer,
3013 provider,
3014 StellarRelayerDependencies::new(
3015 Arc::new(relayer_repo),
3016 ctx.network_repository.clone(),
3017 Arc::new(tx_repo),
3018 Arc::new(counter),
3019 Arc::new(job_producer),
3020 ),
3021 dex_service,
3022 )
3023 .await
3024 .unwrap();
3025
3026 let result = relayer.migrate_fee_payment_strategy_if_needed().await;
3027 assert!(result.is_ok());
3028 }
3029
3030 #[tokio::test]
3031 async fn test_migrate_fee_payment_strategy_update_fails() {
3032 let ctx = TestCtx::default();
3033 ctx.setup_network().await;
3034 let relayer_model = ctx.relayer_model.clone();
3035
3036 let provider = MockStellarProviderTrait::new();
3037 let signer = Arc::new(MockStellarSignTrait::new());
3038 let dex_service = create_mock_dex_service();
3039
3040 let mut relayer_repo = MockRelayerRepository::new();
3041 relayer_repo
3042 .expect_is_persistent_storage()
3043 .returning(|| true);
3044 relayer_repo
3045 .expect_update_policy()
3046 .times(1)
3047 .returning(|_, _| {
3048 Err(RepositoryError::TransactionFailure(
3049 "Database error".to_string(),
3050 ))
3051 });
3052
3053 let tx_repo = MockTransactionRepository::new();
3054 let job_producer = MockJobProducerTrait::new();
3055 let counter = MockTransactionCounterServiceTrait::new();
3056
3057 let relayer = StellarRelayer::new(
3058 relayer_model.clone(),
3059 signer,
3060 provider,
3061 StellarRelayerDependencies::new(
3062 Arc::new(relayer_repo),
3063 ctx.network_repository.clone(),
3064 Arc::new(tx_repo),
3065 Arc::new(counter),
3066 Arc::new(job_producer),
3067 ),
3068 dex_service,
3069 )
3070 .await
3071 .unwrap();
3072
3073 let result = relayer.migrate_fee_payment_strategy_if_needed().await;
3074 assert!(result.is_err());
3075 assert!(matches!(
3076 result.unwrap_err(),
3077 RelayerError::PolicyConfigurationError(_)
3078 ));
3079 }
3080 }
3081
3082 mod check_balance_and_trigger_token_swap_tests {
3084 use super::*;
3085 use crate::models::RelayerStellarSwapConfig;
3086
3087 #[tokio::test]
3088 async fn test_check_balance_no_swap_config() {
3089 let ctx = TestCtx::default();
3090 ctx.setup_network().await;
3091 let relayer_model = ctx.relayer_model.clone();
3092
3093 let provider = MockStellarProviderTrait::new();
3094 let signer = Arc::new(MockStellarSignTrait::new());
3095 let dex_service = create_mock_dex_service();
3096
3097 let relayer_repo = MockRelayerRepository::new();
3098 let tx_repo = MockTransactionRepository::new();
3099 let job_producer = MockJobProducerTrait::new();
3100 let counter = MockTransactionCounterServiceTrait::new();
3101
3102 let relayer = StellarRelayer::new(
3103 relayer_model.clone(),
3104 signer,
3105 provider,
3106 StellarRelayerDependencies::new(
3107 Arc::new(relayer_repo),
3108 ctx.network_repository.clone(),
3109 Arc::new(tx_repo),
3110 Arc::new(counter),
3111 Arc::new(job_producer),
3112 ),
3113 dex_service,
3114 )
3115 .await
3116 .unwrap();
3117
3118 let result = relayer
3119 .check_balance_and_trigger_token_swap_if_needed()
3120 .await;
3121 assert!(result.is_ok());
3122 }
3123
3124 #[tokio::test]
3125 async fn test_check_balance_no_threshold() {
3126 let ctx = TestCtx::default();
3127 ctx.setup_network().await;
3128 let mut relayer_model = ctx.relayer_model.clone();
3129
3130 let mut policy = RelayerStellarPolicy::default();
3132 policy.swap_config = Some(RelayerStellarSwapConfig {
3133 strategies: vec![],
3134 min_balance_threshold: None,
3135 cron_schedule: None,
3136 });
3137 relayer_model.policies = RelayerNetworkPolicy::Stellar(policy);
3138
3139 let provider = MockStellarProviderTrait::new();
3140 let signer = Arc::new(MockStellarSignTrait::new());
3141 let dex_service = create_mock_dex_service();
3142
3143 let relayer_repo = MockRelayerRepository::new();
3144 let tx_repo = MockTransactionRepository::new();
3145 let job_producer = MockJobProducerTrait::new();
3146 let counter = MockTransactionCounterServiceTrait::new();
3147
3148 let relayer = StellarRelayer::new(
3149 relayer_model.clone(),
3150 signer,
3151 provider,
3152 StellarRelayerDependencies::new(
3153 Arc::new(relayer_repo),
3154 ctx.network_repository.clone(),
3155 Arc::new(tx_repo),
3156 Arc::new(counter),
3157 Arc::new(job_producer),
3158 ),
3159 dex_service,
3160 )
3161 .await
3162 .unwrap();
3163
3164 let result = relayer
3165 .check_balance_and_trigger_token_swap_if_needed()
3166 .await;
3167 assert!(result.is_ok());
3168 }
3169
3170 #[tokio::test]
3171 async fn test_check_balance_above_threshold() {
3172 let ctx = TestCtx::default();
3173 ctx.setup_network().await;
3174 let mut relayer_model = ctx.relayer_model.clone();
3175
3176 let mut policy = RelayerStellarPolicy::default();
3178 policy.swap_config = Some(RelayerStellarSwapConfig {
3179 strategies: vec![],
3180 min_balance_threshold: Some(1000000), cron_schedule: None,
3182 });
3183 relayer_model.policies = RelayerNetworkPolicy::Stellar(policy);
3184
3185 let mut provider = MockStellarProviderTrait::new();
3186 provider.expect_get_account().returning(|_| {
3188 Box::pin(async {
3189 Ok(AccountEntry {
3190 account_id: AccountId(PublicKey::PublicKeyTypeEd25519(Uint256([0; 32]))),
3191 balance: 10000000, ext: AccountEntryExt::V0,
3193 flags: 0,
3194 home_domain: String32::default(),
3195 inflation_dest: None,
3196 seq_num: SequenceNumber(5),
3197 num_sub_entries: 0,
3198 signers: VecM::default(),
3199 thresholds: Thresholds([0, 0, 0, 0]),
3200 })
3201 })
3202 });
3203
3204 let signer = Arc::new(MockStellarSignTrait::new());
3205 let dex_service = create_mock_dex_service();
3206
3207 let relayer_repo = MockRelayerRepository::new();
3208 let tx_repo = MockTransactionRepository::new();
3209 let job_producer = MockJobProducerTrait::new();
3210 let counter = MockTransactionCounterServiceTrait::new();
3211
3212 let relayer = StellarRelayer::new(
3213 relayer_model.clone(),
3214 signer,
3215 provider,
3216 StellarRelayerDependencies::new(
3217 Arc::new(relayer_repo),
3218 ctx.network_repository.clone(),
3219 Arc::new(tx_repo),
3220 Arc::new(counter),
3221 Arc::new(job_producer),
3222 ),
3223 dex_service,
3224 )
3225 .await
3226 .unwrap();
3227
3228 let result = relayer
3229 .check_balance_and_trigger_token_swap_if_needed()
3230 .await;
3231 assert!(result.is_ok());
3232 }
3233
3234 #[tokio::test]
3235 async fn test_check_balance_provider_error() {
3236 let ctx = TestCtx::default();
3237 ctx.setup_network().await;
3238 let mut relayer_model = ctx.relayer_model.clone();
3239
3240 let mut policy = RelayerStellarPolicy::default();
3242 policy.swap_config = Some(RelayerStellarSwapConfig {
3243 strategies: vec![],
3244 min_balance_threshold: Some(1000000),
3245 cron_schedule: None,
3246 });
3247 relayer_model.policies = RelayerNetworkPolicy::Stellar(policy);
3248
3249 let mut provider = MockStellarProviderTrait::new();
3250 provider.expect_get_account().returning(|_| {
3251 Box::pin(async { Err(ProviderError::Other("Network error".to_string())) })
3252 });
3253
3254 let signer = Arc::new(MockStellarSignTrait::new());
3255 let dex_service = create_mock_dex_service();
3256
3257 let relayer_repo = MockRelayerRepository::new();
3258 let tx_repo = MockTransactionRepository::new();
3259 let job_producer = MockJobProducerTrait::new();
3260 let counter = MockTransactionCounterServiceTrait::new();
3261
3262 let relayer = StellarRelayer::new(
3263 relayer_model.clone(),
3264 signer,
3265 provider,
3266 StellarRelayerDependencies::new(
3267 Arc::new(relayer_repo),
3268 ctx.network_repository.clone(),
3269 Arc::new(tx_repo),
3270 Arc::new(counter),
3271 Arc::new(job_producer),
3272 ),
3273 dex_service,
3274 )
3275 .await
3276 .unwrap();
3277
3278 let result = relayer
3279 .check_balance_and_trigger_token_swap_if_needed()
3280 .await;
3281 assert!(result.is_err());
3282 }
3283 }
3284
3285 mod check_health_tests {
3287 use super::*;
3288 use crate::models::RelayerStellarSwapConfig;
3289
3290 #[tokio::test]
3291 async fn test_check_health_success() {
3292 let ctx = TestCtx::default();
3293 ctx.setup_network().await;
3294 let relayer_model = ctx.relayer_model.clone();
3295
3296 let mut provider = MockStellarProviderTrait::new();
3297 provider.expect_get_account().returning(|_| {
3298 Box::pin(async {
3299 Ok(AccountEntry {
3300 account_id: AccountId(PublicKey::PublicKeyTypeEd25519(Uint256([0; 32]))),
3301 balance: 10000000,
3302 ext: AccountEntryExt::V0,
3303 flags: 0,
3304 home_domain: String32::default(),
3305 inflation_dest: None,
3306 seq_num: SequenceNumber(5),
3307 num_sub_entries: 0,
3308 signers: VecM::default(),
3309 thresholds: Thresholds([0, 0, 0, 0]),
3310 })
3311 })
3312 });
3313
3314 let signer = Arc::new(MockStellarSignTrait::new());
3315 let dex_service = create_mock_dex_service();
3316
3317 let relayer_repo = MockRelayerRepository::new();
3318 let tx_repo = MockTransactionRepository::new();
3319 let job_producer = MockJobProducerTrait::new();
3320
3321 let mut counter = MockTransactionCounterServiceTrait::new();
3322 counter
3323 .expect_set()
3324 .returning(|_| Box::pin(async { Ok(()) }));
3325
3326 let relayer = StellarRelayer::new(
3327 relayer_model.clone(),
3328 signer,
3329 provider,
3330 StellarRelayerDependencies::new(
3331 Arc::new(relayer_repo),
3332 ctx.network_repository.clone(),
3333 Arc::new(tx_repo),
3334 Arc::new(counter),
3335 Arc::new(job_producer),
3336 ),
3337 dex_service,
3338 )
3339 .await
3340 .unwrap();
3341
3342 let result = relayer.check_health().await;
3343 assert!(result.is_ok());
3344 }
3345
3346 #[tokio::test]
3347 async fn test_check_health_sequence_sync_fails() {
3348 let ctx = TestCtx::default();
3349 ctx.setup_network().await;
3350 let relayer_model = ctx.relayer_model.clone();
3351
3352 let mut provider = MockStellarProviderTrait::new();
3353 provider.expect_get_account().returning(|_| {
3354 Box::pin(async { Err(ProviderError::Other("Network error".to_string())) })
3355 });
3356
3357 let signer = Arc::new(MockStellarSignTrait::new());
3358 let dex_service = create_mock_dex_service();
3359
3360 let relayer_repo = MockRelayerRepository::new();
3361 let tx_repo = MockTransactionRepository::new();
3362 let job_producer = MockJobProducerTrait::new();
3363 let counter = MockTransactionCounterServiceTrait::new();
3364
3365 let relayer = StellarRelayer::new(
3366 relayer_model.clone(),
3367 signer,
3368 provider,
3369 StellarRelayerDependencies::new(
3370 Arc::new(relayer_repo),
3371 ctx.network_repository.clone(),
3372 Arc::new(tx_repo),
3373 Arc::new(counter),
3374 Arc::new(job_producer),
3375 ),
3376 dex_service,
3377 )
3378 .await
3379 .unwrap();
3380
3381 let result = relayer.check_health().await;
3382 assert!(result.is_err());
3383 let failures = result.unwrap_err();
3384 assert_eq!(failures.len(), 1);
3385 assert!(matches!(
3386 failures[0],
3387 HealthCheckFailure::SequenceSyncFailed(_)
3388 ));
3389 }
3390
3391 #[tokio::test]
3392 async fn test_check_health_with_user_fee_strategy() {
3393 let ctx = TestCtx::default();
3394 ctx.setup_network().await;
3395 let mut relayer_model = ctx.relayer_model.clone();
3396
3397 let mut policy = RelayerStellarPolicy::default();
3399 policy.fee_payment_strategy = Some(StellarFeePaymentStrategy::User);
3400 policy.swap_config = Some(RelayerStellarSwapConfig {
3401 strategies: vec![],
3402 min_balance_threshold: Some(1000000),
3403 cron_schedule: None,
3404 });
3405 relayer_model.policies = RelayerNetworkPolicy::Stellar(policy);
3406
3407 let mut provider = MockStellarProviderTrait::new();
3408 provider.expect_get_account().returning(|_| {
3409 Box::pin(async {
3410 Ok(AccountEntry {
3411 account_id: AccountId(PublicKey::PublicKeyTypeEd25519(Uint256([0; 32]))),
3412 balance: 10000000, ext: AccountEntryExt::V0,
3414 flags: 0,
3415 home_domain: String32::default(),
3416 inflation_dest: None,
3417 seq_num: SequenceNumber(5),
3418 num_sub_entries: 0,
3419 signers: VecM::default(),
3420 thresholds: Thresholds([0, 0, 0, 0]),
3421 })
3422 })
3423 });
3424
3425 let signer = Arc::new(MockStellarSignTrait::new());
3426 let dex_service = create_mock_dex_service();
3427
3428 let relayer_repo = MockRelayerRepository::new();
3429 let tx_repo = MockTransactionRepository::new();
3430 let job_producer = MockJobProducerTrait::new();
3431
3432 let mut counter = MockTransactionCounterServiceTrait::new();
3433 counter
3434 .expect_set()
3435 .returning(|_| Box::pin(async { Ok(()) }));
3436
3437 let relayer = StellarRelayer::new(
3438 relayer_model.clone(),
3439 signer,
3440 provider,
3441 StellarRelayerDependencies::new(
3442 Arc::new(relayer_repo),
3443 ctx.network_repository.clone(),
3444 Arc::new(tx_repo),
3445 Arc::new(counter),
3446 Arc::new(job_producer),
3447 ),
3448 dex_service,
3449 )
3450 .await
3451 .unwrap();
3452
3453 let result = relayer.check_health().await;
3454 assert!(result.is_ok());
3456 }
3457 }
3458
3459 mod rpc_tests {
3461 use super::*;
3462 use crate::models::{JsonRpcId, StellarRpcRequest};
3463
3464 #[tokio::test]
3465 async fn test_rpc_invalid_network_request() {
3466 let ctx = TestCtx::default();
3467 ctx.setup_network().await;
3468 let relayer_model = ctx.relayer_model.clone();
3469
3470 let provider = MockStellarProviderTrait::new();
3471 let signer = Arc::new(MockStellarSignTrait::new());
3472 let dex_service = create_mock_dex_service();
3473
3474 let relayer_repo = MockRelayerRepository::new();
3475 let tx_repo = MockTransactionRepository::new();
3476 let job_producer = MockJobProducerTrait::new();
3477 let counter = MockTransactionCounterServiceTrait::new();
3478
3479 let relayer = StellarRelayer::new(
3480 relayer_model.clone(),
3481 signer,
3482 provider,
3483 StellarRelayerDependencies::new(
3484 Arc::new(relayer_repo),
3485 ctx.network_repository.clone(),
3486 Arc::new(tx_repo),
3487 Arc::new(counter),
3488 Arc::new(job_producer),
3489 ),
3490 dex_service,
3491 )
3492 .await
3493 .unwrap();
3494
3495 let request = JsonRpcRequest {
3497 jsonrpc: "2.0".to_string(),
3498 id: Some(JsonRpcId::Number(1)),
3499 params: NetworkRpcRequest::Evm(crate::models::EvmRpcRequest::RawRpcRequest {
3500 method: "eth_blockNumber".to_string(),
3501 params: serde_json::Value::Null,
3502 }),
3503 };
3504
3505 let result = relayer.rpc(request).await;
3506 assert!(result.is_ok());
3507 let response = result.unwrap();
3508 assert!(response.error.is_some());
3510 }
3511
3512 #[tokio::test]
3513 async fn test_rpc_provider_error() {
3514 let ctx = TestCtx::default();
3515 ctx.setup_network().await;
3516 let relayer_model = ctx.relayer_model.clone();
3517
3518 let mut provider = MockStellarProviderTrait::new();
3519 provider.expect_raw_request_dyn().returning(|_, _, _| {
3520 Box::pin(async { Err(ProviderError::Other("RPC error".to_string())) })
3521 });
3522
3523 let signer = Arc::new(MockStellarSignTrait::new());
3524 let dex_service = create_mock_dex_service();
3525
3526 let relayer_repo = MockRelayerRepository::new();
3527 let tx_repo = MockTransactionRepository::new();
3528 let job_producer = MockJobProducerTrait::new();
3529 let counter = MockTransactionCounterServiceTrait::new();
3530
3531 let relayer = StellarRelayer::new(
3532 relayer_model.clone(),
3533 signer,
3534 provider,
3535 StellarRelayerDependencies::new(
3536 Arc::new(relayer_repo),
3537 ctx.network_repository.clone(),
3538 Arc::new(tx_repo),
3539 Arc::new(counter),
3540 Arc::new(job_producer),
3541 ),
3542 dex_service,
3543 )
3544 .await
3545 .unwrap();
3546
3547 let request = JsonRpcRequest {
3548 jsonrpc: "2.0".to_string(),
3549 id: Some(JsonRpcId::Number(1)),
3550 params: NetworkRpcRequest::Stellar(StellarRpcRequest::RawRpcRequest {
3551 method: "getHealth".to_string(),
3552 params: serde_json::Value::Null,
3553 }),
3554 };
3555
3556 let result = relayer.rpc(request).await;
3557 assert!(result.is_ok());
3558 let response = result.unwrap();
3559 assert!(response.error.is_some());
3561 }
3562
3563 #[tokio::test]
3564 async fn test_rpc_success() {
3565 let ctx = TestCtx::default();
3566 ctx.setup_network().await;
3567 let relayer_model = ctx.relayer_model.clone();
3568
3569 let mut provider = MockStellarProviderTrait::new();
3570 provider.expect_raw_request_dyn().returning(|_, _, _| {
3571 Box::pin(async { Ok(serde_json::json!({"status": "healthy"})) })
3572 });
3573
3574 let signer = Arc::new(MockStellarSignTrait::new());
3575 let dex_service = create_mock_dex_service();
3576
3577 let relayer_repo = MockRelayerRepository::new();
3578 let tx_repo = MockTransactionRepository::new();
3579 let job_producer = MockJobProducerTrait::new();
3580 let counter = MockTransactionCounterServiceTrait::new();
3581
3582 let relayer = StellarRelayer::new(
3583 relayer_model.clone(),
3584 signer,
3585 provider,
3586 StellarRelayerDependencies::new(
3587 Arc::new(relayer_repo),
3588 ctx.network_repository.clone(),
3589 Arc::new(tx_repo),
3590 Arc::new(counter),
3591 Arc::new(job_producer),
3592 ),
3593 dex_service,
3594 )
3595 .await
3596 .unwrap();
3597
3598 let request = JsonRpcRequest {
3599 jsonrpc: "2.0".to_string(),
3600 id: Some(JsonRpcId::Number(1)),
3601 params: NetworkRpcRequest::Stellar(StellarRpcRequest::RawRpcRequest {
3602 method: "getHealth".to_string(),
3603 params: serde_json::Value::Null,
3604 }),
3605 };
3606
3607 let result = relayer.rpc(request).await;
3608 assert!(result.is_ok());
3609 let response = result.unwrap();
3610 assert!(response.error.is_none());
3611 assert!(response.result.is_some());
3612 }
3613 }
3614}