1use std::sync::Arc;
28
29use crate::{
30 constants::{
31 transactions::PENDING_TRANSACTION_STATUSES, EVM_SMALLEST_UNIT_NAME,
32 EVM_STATUS_CHECK_INITIAL_DELAY_SECONDS,
33 },
34 domain::{
35 relayer::{Relayer, RelayerError},
36 BalanceResponse, SignDataRequest, SignDataResponse, SignTransactionExternalResponse,
37 SignTransactionRequest, SignTypedDataRequest,
38 },
39 jobs::{
40 JobProducerTrait, RelayerHealthCheck, TransactionRequest, TransactionSend,
41 TransactionStatusCheck,
42 },
43 models::{
44 produce_relayer_disabled_payload, DeletePendingTransactionsResponse, DisabledReason,
45 EvmNetwork, GetStatusOptions, HealthCheckFailure, JsonRpcRequest, JsonRpcResponse,
46 NetworkRepoModel, NetworkRpcRequest, NetworkRpcResult, NetworkTransactionRequest,
47 NetworkType, PaginationQuery, RelayerRepoModel, RelayerStatus, RepositoryError,
48 RpcErrorCodes, TransactionRepoModel, TransactionStatus,
49 },
50 repositories::{NetworkRepository, RelayerRepository, Repository, TransactionRepository},
51 services::{
52 provider::{EvmProvider, EvmProviderTrait},
53 signer::{DataSignerTrait, EvmSigner},
54 TransactionCounterService, TransactionCounterServiceTrait,
55 },
56 utils::calculate_scheduled_timestamp,
57};
58use async_trait::async_trait;
59use eyre::Result;
60use tracing::{debug, info, instrument, warn};
61
62use super::{create_error_response, create_success_response, EvmTransactionValidator};
63use crate::utils::{map_provider_error, sanitize_error_description};
64
65#[allow(dead_code)]
66pub struct EvmRelayer<P, RR, NR, TR, J, S, TCS>
67where
68 P: EvmProviderTrait + Send + Sync,
69 RR: Repository<RelayerRepoModel, String> + RelayerRepository + Send + Sync + 'static,
70 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
71 TR: Repository<TransactionRepoModel, String> + TransactionRepository + Send + Sync + 'static,
72 J: JobProducerTrait + Send + Sync + 'static,
73 S: DataSignerTrait + Send + Sync + 'static,
74{
75 relayer: RelayerRepoModel,
76 signer: S,
77 network: EvmNetwork,
78 provider: P,
79 relayer_repository: Arc<RR>,
80 network_repository: Arc<NR>,
81 transaction_repository: Arc<TR>,
82 job_producer: Arc<J>,
83 transaction_counter_service: Arc<TCS>,
84}
85
86#[allow(clippy::too_many_arguments)]
87impl<P, RR, NR, TR, J, S, TCS> EvmRelayer<P, RR, NR, TR, J, S, TCS>
88where
89 P: EvmProviderTrait + Send + Sync,
90 RR: Repository<RelayerRepoModel, String> + RelayerRepository + Send + Sync + 'static,
91 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
92 TR: Repository<TransactionRepoModel, String> + TransactionRepository + Send + Sync + 'static,
93 J: JobProducerTrait + Send + Sync + 'static,
94 S: DataSignerTrait + Send + Sync + 'static,
95 TCS: TransactionCounterServiceTrait + Send + Sync + 'static,
96{
97 pub fn new(
114 relayer: RelayerRepoModel,
115 signer: S,
116 provider: P,
117 network: EvmNetwork,
118 relayer_repository: Arc<RR>,
119 network_repository: Arc<NR>,
120 transaction_repository: Arc<TR>,
121 transaction_counter_service: Arc<TCS>,
122 job_producer: Arc<J>,
123 ) -> Result<Self, RelayerError> {
124 Ok(Self {
125 relayer,
126 signer,
127 network,
128 provider,
129 relayer_repository,
130 network_repository,
131 transaction_repository,
132 transaction_counter_service,
133 job_producer,
134 })
135 }
136
137 #[instrument(
143 level = "debug",
144 skip(self),
145 fields(
146 request_id = ?crate::observability::request_id::get_request_id(),
147 relayer_id = %self.relayer.id,
148 )
149 )]
150 async fn sync_nonce(&self) -> Result<(), RelayerError> {
151 let on_chain_nonce = self
152 .provider
153 .get_transaction_count(&self.relayer.address)
154 .await
155 .map_err(|e| RelayerError::ProviderError(e.to_string()))?;
156
157 let transaction_counter_nonce = self
158 .transaction_counter_service
159 .get()
160 .await
161 .ok()
162 .flatten()
163 .unwrap_or(0);
164
165 let nonce = std::cmp::max(on_chain_nonce, transaction_counter_nonce);
166
167 debug!(
168 relayer_id = %self.relayer.id,
169 on_chain_nonce = %on_chain_nonce,
170 transaction_counter_nonce = %transaction_counter_nonce,
171 "syncing nonce"
172 );
173
174 debug!(nonce = %nonce, "setting nonce for relayer");
175
176 self.transaction_counter_service.set(nonce).await?;
177
178 Ok(())
179 }
180
181 #[instrument(
187 level = "debug",
188 skip(self),
189 fields(
190 request_id = ?crate::observability::request_id::get_request_id(),
191 relayer_id = %self.relayer.id,
192 )
193 )]
194 async fn validate_rpc(&self) -> Result<(), RelayerError> {
195 self.provider
196 .health_check()
197 .await
198 .map_err(|e| RelayerError::ProviderError(e.to_string()))?;
199
200 Ok(())
201 }
202
203 #[instrument(
213 level = "debug",
214 skip(self, transaction),
215 fields(
216 request_id = ?crate::observability::request_id::get_request_id(),
217 relayer_id = %self.relayer.id,
218 tx_id = %transaction.id,
219 )
220 )]
221 async fn cancel_transaction_via_job(
222 &self,
223 transaction: TransactionRepoModel,
224 ) -> Result<(), RelayerError> {
225 let cancel_job = TransactionSend::cancel(
226 transaction.id.clone(),
227 transaction.relayer_id.clone(),
228 "Cancelled via delete_pending_transactions".to_string(),
229 );
230
231 self.job_producer
232 .produce_submit_transaction_job(cancel_job, None)
233 .await
234 .map_err(RelayerError::from)?;
235
236 Ok(())
237 }
238}
239
240pub type DefaultEvmRelayer<J, T, RR, NR, TCR> =
242 EvmRelayer<EvmProvider, RR, NR, T, J, EvmSigner, TransactionCounterService<TCR>>;
243
244#[async_trait]
245impl<P, RR, NR, TR, J, S, TCS> Relayer for EvmRelayer<P, RR, NR, TR, J, S, TCS>
246where
247 P: EvmProviderTrait + Send + Sync,
248 RR: Repository<RelayerRepoModel, String> + RelayerRepository + Send + Sync + 'static,
249 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
250 TR: Repository<TransactionRepoModel, String> + TransactionRepository + Send + Sync + 'static,
251 J: JobProducerTrait + Send + Sync + 'static,
252 S: DataSignerTrait + Send + Sync + 'static,
253 TCS: TransactionCounterServiceTrait + Send + Sync + 'static,
254{
255 #[instrument(
265 level = "debug",
266 skip(self, network_transaction),
267 fields(
268 request_id = ?crate::observability::request_id::get_request_id(),
269 relayer_id = %self.relayer.id,
270 network_type = ?self.relayer.network_type,
271 )
272 )]
273 async fn process_transaction_request(
274 &self,
275 network_transaction: NetworkTransactionRequest,
276 ) -> Result<TransactionRepoModel, RelayerError> {
277 let network_model = self
278 .network_repository
279 .get_by_name(NetworkType::Evm, &self.relayer.network)
280 .await?
281 .ok_or_else(|| {
282 RelayerError::NetworkConfiguration(format!(
283 "Network {} not found",
284 self.relayer.network
285 ))
286 })?;
287 let transaction =
288 TransactionRepoModel::try_from((&network_transaction, &self.relayer, &network_model))?;
289
290 self.transaction_repository
291 .create(transaction.clone())
292 .await
293 .map_err(|e| RepositoryError::TransactionFailure(e.to_string()))?;
294
295 self.job_producer
297 .produce_transaction_request_job(
298 TransactionRequest::new(transaction.id.clone(), transaction.relayer_id.clone()),
299 None,
300 )
301 .await?;
302
303 self.job_producer
305 .produce_check_transaction_status_job(
306 TransactionStatusCheck::new(
307 transaction.id.clone(),
308 transaction.relayer_id.clone(),
309 crate::models::NetworkType::Evm,
310 ),
311 Some(calculate_scheduled_timestamp(
312 EVM_STATUS_CHECK_INITIAL_DELAY_SECONDS,
313 )),
314 )
315 .await?;
316
317 Ok(transaction)
318 }
319
320 #[instrument(
326 level = "debug",
327 skip(self),
328 fields(
329 request_id = ?crate::observability::request_id::get_request_id(),
330 relayer_id = %self.relayer.id,
331 )
332 )]
333 async fn get_balance(&self) -> Result<BalanceResponse, RelayerError> {
334 let balance: u128 = self
335 .provider
336 .get_balance(&self.relayer.address)
337 .await
338 .map_err(|e| RelayerError::ProviderError(e.to_string()))?
339 .try_into()
340 .map_err(|_| {
341 RelayerError::ProviderError("Failed to convert balance to u128".to_string())
342 })?;
343
344 Ok(BalanceResponse {
345 balance,
346 unit: EVM_SMALLEST_UNIT_NAME.to_string(),
347 })
348 }
349
350 #[instrument(
356 level = "debug",
357 skip(self),
358 fields(
359 request_id = ?crate::observability::request_id::get_request_id(),
360 relayer_id = %self.relayer.id,
361 )
362 )]
363 async fn get_status(&self, options: GetStatusOptions) -> Result<RelayerStatus, RelayerError> {
364 let relayer_model = &self.relayer;
365
366 let nonce = self
368 .transaction_counter_service
369 .get()
370 .await
371 .ok()
372 .flatten()
373 .unwrap_or(0);
374 let nonce_str = nonce.to_string();
375
376 let balance = if options.include_balance {
377 Some(self.get_balance().await?.balance.to_string())
378 } else {
379 None
380 };
381
382 let pending_transactions_count = if options.include_pending_count {
383 Some(
385 self.transaction_repository
386 .count_by_status(&relayer_model.id, PENDING_TRANSACTION_STATUSES)
387 .await
388 .map_err(RelayerError::from)?,
389 )
390 } else {
391 None
392 };
393
394 let last_confirmed_transaction_timestamp = if options.include_last_confirmed_tx {
395 self.transaction_repository
397 .find_by_status_paginated(
398 &relayer_model.id,
399 &[TransactionStatus::Confirmed],
400 PaginationQuery {
401 page: 1,
402 per_page: 1,
403 },
404 false, )
406 .await
407 .map_err(RelayerError::from)?
408 .items
409 .into_iter()
410 .next()
411 .and_then(|tx| tx.confirmed_at)
412 } else {
413 None
414 };
415
416 Ok(RelayerStatus::Evm {
417 balance,
418 pending_transactions_count,
419 last_confirmed_transaction_timestamp,
420 system_disabled: relayer_model.system_disabled,
421 paused: relayer_model.paused,
422 nonce: nonce_str,
423 })
424 }
425
426 #[instrument(
433 level = "debug",
434 skip(self),
435 fields(
436 request_id = ?crate::observability::request_id::get_request_id(),
437 relayer_id = %self.relayer.id,
438 )
439 )]
440 async fn delete_pending_transactions(
441 &self,
442 ) -> Result<DeletePendingTransactionsResponse, RelayerError> {
443 let pending_statuses = [
444 TransactionStatus::Pending,
445 TransactionStatus::Sent,
446 TransactionStatus::Submitted,
447 ];
448
449 let pending_transactions = self
451 .transaction_repository
452 .find_by_status(&self.relayer.id, &pending_statuses[..])
453 .await
454 .map_err(RelayerError::from)?;
455
456 let transaction_count = pending_transactions.len();
457
458 if transaction_count == 0 {
459 info!(
460 relayer_id = %self.relayer.id,
461 "no pending transactions found for relayer"
462 );
463 return Ok(DeletePendingTransactionsResponse {
464 queued_for_cancellation_transaction_ids: vec![],
465 failed_to_queue_transaction_ids: vec![],
466 total_processed: 0,
467 });
468 }
469
470 info!(
471 relayer_id = %self.relayer.id,
472 transaction_count = %transaction_count,
473 "processing pending transactions for relayer"
474 );
475
476 let mut cancelled_transaction_ids = Vec::new();
477 let mut failed_transaction_ids = Vec::new();
478
479 for transaction in pending_transactions {
481 match self.cancel_transaction_via_job(transaction.clone()).await {
482 Ok(_) => {
483 cancelled_transaction_ids.push(transaction.id.clone());
484 info!(
485 tx_id = %transaction.id,
486 relayer_id = %self.relayer.id,
487 status = ?transaction.status,
488 "initiated cancellation for transaction"
489 );
490 }
491 Err(e) => {
492 failed_transaction_ids.push(transaction.id.clone());
493 warn!(
494 tx_id = %transaction.id,
495 relayer_id = %self.relayer.id,
496 error = %e,
497 "failed to cancel transaction"
498 );
499 }
500 }
501 }
502
503 let total_processed = cancelled_transaction_ids.len() + failed_transaction_ids.len();
504
505 debug!(
506 queued_for_cancellation = %cancelled_transaction_ids.len(),
507 failed_to_queue = %failed_transaction_ids.len(),
508 "completed processing pending transactions for relayer"
509 );
510
511 Ok(DeletePendingTransactionsResponse {
512 queued_for_cancellation_transaction_ids: cancelled_transaction_ids,
513 failed_to_queue_transaction_ids: failed_transaction_ids,
514 total_processed: total_processed as u32,
515 })
516 }
517
518 #[instrument(
528 level = "debug",
529 skip(self, request),
530 fields(
531 request_id = ?crate::observability::request_id::get_request_id(),
532 relayer_id = %self.relayer.id,
533 )
534 )]
535 async fn sign_data(&self, request: SignDataRequest) -> Result<SignDataResponse, RelayerError> {
536 let result = self.signer.sign_data(request).await?;
537
538 Ok(result)
539 }
540
541 #[instrument(
551 level = "debug",
552 skip(self, request),
553 fields(
554 request_id = ?crate::observability::request_id::get_request_id(),
555 relayer_id = %self.relayer.id,
556 )
557 )]
558 async fn sign_typed_data(
559 &self,
560 request: SignTypedDataRequest,
561 ) -> Result<SignDataResponse, RelayerError> {
562 let result = self.signer.sign_typed_data(request).await?;
563
564 Ok(result)
565 }
566
567 #[instrument(
577 level = "debug",
578 skip(self, request),
579 fields(
580 request_id = ?crate::observability::request_id::get_request_id(),
581 relayer_id = %self.relayer.id,
582 )
583 )]
584 async fn rpc(
585 &self,
586 request: JsonRpcRequest<NetworkRpcRequest>,
587 ) -> Result<JsonRpcResponse<NetworkRpcResult>, RelayerError> {
588 let evm_request = match request.params {
589 NetworkRpcRequest::Evm(evm_req) => evm_req,
590 _ => {
591 return Ok(create_error_response(
592 request.id,
593 RpcErrorCodes::INVALID_PARAMS,
594 "Invalid params",
595 "Expected EVM network request",
596 ))
597 }
598 };
599
600 let (method, params_json) = match evm_request {
602 crate::models::EvmRpcRequest::RawRpcRequest { method, params } => (method, params),
603 };
604
605 match self.provider.raw_request_dyn(&method, params_json).await {
607 Ok(result_value) => Ok(create_success_response(request.id, result_value)),
608 Err(provider_error) => {
609 tracing::error!(
611 error = %provider_error,
612 "RPC provider error occurred"
613 );
614 let (error_code, error_message) = map_provider_error(&provider_error);
615 let sanitized_description = sanitize_error_description(&provider_error);
616 Ok(create_error_response(
617 request.id,
618 error_code,
619 error_message,
620 &sanitized_description,
621 ))
622 }
623 }
624 }
625
626 #[instrument(
632 level = "debug",
633 skip(self),
634 fields(
635 request_id = ?crate::observability::request_id::get_request_id(),
636 relayer_id = %self.relayer.id,
637 )
638 )]
639 async fn validate_min_balance(&self) -> Result<(), RelayerError> {
640 let policy = self.relayer.policies.get_evm_policy();
641 EvmTransactionValidator::init_balance_validation(
642 &self.relayer.address,
643 &policy,
644 &self.provider,
645 )
646 .await
647 .map_err(|e| RelayerError::InsufficientBalanceError(e.to_string()))?;
648
649 Ok(())
650 }
651
652 #[instrument(
658 level = "debug",
659 skip(self),
660 fields(
661 request_id = ?crate::observability::request_id::get_request_id(),
662 relayer_id = %self.relayer.id,
663 )
664 )]
665 async fn check_health(&self) -> Result<(), Vec<HealthCheckFailure>> {
666 debug!("running health checks");
667
668 let nonce_sync_result = self.sync_nonce().await;
669 let validate_rpc_result = self.validate_rpc().await;
670 let validate_min_balance_result = self.validate_min_balance().await;
671
672 let failures: Vec<HealthCheckFailure> = vec![
674 nonce_sync_result
675 .err()
676 .map(|e| HealthCheckFailure::NonceSyncFailed(e.to_string())),
677 validate_rpc_result
678 .err()
679 .map(|e| HealthCheckFailure::RpcValidationFailed(e.to_string())),
680 validate_min_balance_result
681 .err()
682 .map(|e| HealthCheckFailure::BalanceCheckFailed(e.to_string())),
683 ]
684 .into_iter()
685 .flatten()
686 .collect();
687
688 if failures.is_empty() {
689 info!("all health checks passed");
690 Ok(())
691 } else {
692 warn!("health checks failed: {:?}", failures);
693 Err(failures)
694 }
695 }
696
697 #[instrument(
698 level = "debug",
699 skip(self),
700 fields(
701 request_id = ?crate::observability::request_id::get_request_id(),
702 relayer_id = %self.relayer.id,
703 )
704 )]
705 async fn initialize_relayer(&self) -> Result<(), RelayerError> {
706 debug!("initializing EVM relayer");
707
708 match self.check_health().await {
709 Ok(_) => {
710 if self.relayer.system_disabled {
712 self.relayer_repository
714 .enable_relayer(self.relayer.id.clone())
715 .await?;
716 }
717 Ok(())
718 }
719 Err(failures) => {
720 let reason = DisabledReason::from_health_failures(failures).unwrap_or_else(|| {
722 DisabledReason::RpcValidationFailed("Unknown error".to_string())
723 });
724
725 warn!(reason = %reason, "disabling relayer");
726 let updated_relayer = self
727 .relayer_repository
728 .disable_relayer(self.relayer.id.clone(), reason.clone())
729 .await?;
730
731 if let Some(notification_id) = &self.relayer.notification_id {
733 self.job_producer
734 .produce_send_notification_job(
735 produce_relayer_disabled_payload(
736 notification_id,
737 &updated_relayer,
738 &reason.safe_description(),
739 ),
740 None,
741 )
742 .await?;
743 }
744
745 self.job_producer
747 .produce_relayer_health_check_job(
748 RelayerHealthCheck::new(self.relayer.id.clone()),
749 Some(calculate_scheduled_timestamp(10)),
750 )
751 .await?;
752
753 Ok(())
754 }
755 }
756 }
757
758 #[instrument(
759 level = "debug",
760 skip(self, _request),
761 fields(
762 request_id = ?crate::observability::request_id::get_request_id(),
763 relayer_id = %self.relayer.id,
764 )
765 )]
766 async fn sign_transaction(
767 &self,
768 _request: &SignTransactionRequest,
769 ) -> Result<SignTransactionExternalResponse, RelayerError> {
770 Err(RelayerError::NotSupported(
771 "Transaction signing not supported for EVM".to_string(),
772 ))
773 }
774}
775
776#[cfg(test)]
777mod tests {
778 use super::*;
779 use crate::models::RpcConfig;
780 use crate::{
781 config::{EvmNetworkConfig, NetworkConfigCommon},
782 jobs::MockJobProducerTrait,
783 models::{
784 EvmRpcRequest, EvmRpcResult, JsonRpcId, NetworkRepoModel, NetworkType,
785 RelayerEvmPolicy, RelayerNetworkPolicy, RepositoryError, SignerError,
786 TransactionStatus, U256,
787 },
788 repositories::{MockNetworkRepository, MockRelayerRepository, MockTransactionRepository},
789 services::{
790 provider::{MockEvmProviderTrait, ProviderError},
791 MockTransactionCounterServiceTrait,
792 },
793 };
794 use mockall::predicate::*;
795 use std::future::ready;
796
797 mockall::mock! {
798 pub DataSigner {}
799
800 #[async_trait]
801 impl DataSignerTrait for DataSigner {
802 async fn sign_data(&self, request: SignDataRequest) -> Result<SignDataResponse, SignerError>;
803 async fn sign_typed_data(&self, request: SignTypedDataRequest) -> Result<SignDataResponse, SignerError>;
804 }
805 }
806
807 fn create_test_evm_network() -> EvmNetwork {
808 EvmNetwork {
809 network: "mainnet".to_string(),
810 rpc_urls: vec![RpcConfig::new(
811 "https://mainnet.infura.io/v3/YOUR_INFURA_API_KEY".to_string(),
812 )],
813 explorer_urls: None,
814 average_blocktime_ms: 12000,
815 is_testnet: false,
816 tags: vec!["mainnet".to_string()],
817 chain_id: 1,
818 required_confirmations: 1,
819 features: vec!["eip1559".to_string()],
820 symbol: "ETH".to_string(),
821 gas_price_cache: None,
822 }
823 }
824
825 fn create_test_network_repo_model() -> NetworkRepoModel {
826 let config = EvmNetworkConfig {
827 common: NetworkConfigCommon {
828 network: "mainnet".to_string(),
829 from: None,
830 rpc_urls: Some(vec![crate::models::RpcConfig::new(
831 "https://mainnet.infura.io/v3/YOUR_INFURA_API_KEY".to_string(),
832 )]),
833 explorer_urls: None,
834 average_blocktime_ms: Some(12000),
835 is_testnet: Some(false),
836 tags: Some(vec!["mainnet".to_string()]),
837 },
838 chain_id: Some(1),
839 required_confirmations: Some(1),
840 features: Some(vec!["eip1559".to_string()]),
841 symbol: Some("ETH".to_string()),
842 gas_price_cache: None,
843 };
844
845 NetworkRepoModel::new_evm(config)
846 }
847
848 fn create_test_relayer() -> RelayerRepoModel {
849 RelayerRepoModel {
850 id: "test-relayer-id".to_string(),
851 name: "Test Relayer".to_string(),
852 network: "mainnet".to_string(), address: "0xSender".to_string(),
854 paused: false,
855 system_disabled: false,
856 signer_id: "test-signer-id".to_string(),
857 notification_id: Some("test-notification-id".to_string()),
858 policies: RelayerNetworkPolicy::Evm(RelayerEvmPolicy {
859 min_balance: Some(100000000000000000u128), whitelist_receivers: Some(vec!["0xRecipient".to_string()]),
861 gas_price_cap: Some(100000000000), eip1559_pricing: Some(true),
863 private_transactions: Some(false),
864 gas_limit_estimation: Some(true),
865 }),
866 network_type: NetworkType::Evm,
867 custom_rpc_urls: None,
868 ..Default::default()
869 }
870 }
871
872 fn setup_mocks() -> (
873 MockEvmProviderTrait,
874 MockRelayerRepository,
875 MockNetworkRepository,
876 MockTransactionRepository,
877 MockJobProducerTrait,
878 MockDataSigner,
879 MockTransactionCounterServiceTrait,
880 ) {
881 (
882 MockEvmProviderTrait::new(),
883 MockRelayerRepository::new(),
884 MockNetworkRepository::new(),
885 MockTransactionRepository::new(),
886 MockJobProducerTrait::new(),
887 MockDataSigner::new(),
888 MockTransactionCounterServiceTrait::new(),
889 )
890 }
891
892 #[tokio::test]
893 async fn test_get_balance() {
894 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
895 setup_mocks();
896 let relayer_model = create_test_relayer();
897
898 provider
899 .expect_get_balance()
900 .with(eq("0xSender"))
901 .returning(|_| Box::pin(ready(Ok(U256::from(1000000000000000000u64))))); let relayer = EvmRelayer::new(
904 relayer_model,
905 signer,
906 provider,
907 create_test_evm_network(),
908 Arc::new(relayer_repo),
909 Arc::new(network_repo),
910 Arc::new(tx_repo),
911 Arc::new(counter),
912 Arc::new(job_producer),
913 )
914 .unwrap();
915
916 let balance = relayer.get_balance().await.unwrap();
917 assert_eq!(balance.balance, 1000000000000000000u128);
918 assert_eq!(balance.unit, EVM_SMALLEST_UNIT_NAME);
919 }
920
921 #[tokio::test]
922 async fn test_process_transaction_request() {
923 let (
924 provider,
925 relayer_repo,
926 mut network_repo,
927 mut tx_repo,
928 mut job_producer,
929 signer,
930 counter,
931 ) = setup_mocks();
932 let relayer_model = create_test_relayer();
933
934 let network_tx = NetworkTransactionRequest::Evm(crate::models::EvmTransactionRequest {
935 to: Some("0xRecipient".to_string()),
936 value: U256::from(1000000000000000000u64),
937 data: Some("0xData".to_string()),
938 gas_limit: Some(21000),
939 gas_price: Some(20000000000),
940 max_fee_per_gas: None,
941 max_priority_fee_per_gas: None,
942 speed: None,
943 valid_until: None,
944 });
945
946 network_repo
947 .expect_get_by_name()
948 .with(eq(NetworkType::Evm), eq("mainnet"))
949 .returning(|_, _| Ok(Some(create_test_network_repo_model())));
950
951 tx_repo.expect_create().returning(Ok);
952 job_producer
953 .expect_produce_transaction_request_job()
954 .returning(|_, _| Box::pin(ready(Ok(()))));
955 job_producer
956 .expect_produce_check_transaction_status_job()
957 .returning(|_, _| Box::pin(ready(Ok(()))));
958
959 let relayer = EvmRelayer::new(
960 relayer_model,
961 signer,
962 provider,
963 create_test_evm_network(),
964 Arc::new(relayer_repo),
965 Arc::new(network_repo),
966 Arc::new(tx_repo),
967 Arc::new(counter),
968 Arc::new(job_producer),
969 )
970 .unwrap();
971
972 let result = relayer.process_transaction_request(network_tx).await;
973 assert!(result.is_ok());
974 }
975
976 #[tokio::test]
977 async fn test_validate_min_balance_sufficient() {
978 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
979 setup_mocks();
980 let relayer_model = create_test_relayer();
981
982 provider
983 .expect_get_balance()
984 .returning(|_| Box::pin(ready(Ok(U256::from(200000000000000000u64))))); let relayer = EvmRelayer::new(
987 relayer_model,
988 signer,
989 provider,
990 create_test_evm_network(),
991 Arc::new(relayer_repo),
992 Arc::new(network_repo),
993 Arc::new(tx_repo),
994 Arc::new(counter),
995 Arc::new(job_producer),
996 )
997 .unwrap();
998
999 let result = relayer.validate_min_balance().await;
1000 assert!(result.is_ok());
1001 }
1002
1003 #[tokio::test]
1004 async fn test_validate_min_balance_insufficient() {
1005 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
1006 setup_mocks();
1007 let relayer_model = create_test_relayer();
1008
1009 provider
1010 .expect_get_balance()
1011 .returning(|_| Box::pin(ready(Ok(U256::from(50000000000000000u64))))); let relayer = EvmRelayer::new(
1014 relayer_model,
1015 signer,
1016 provider,
1017 create_test_evm_network(),
1018 Arc::new(relayer_repo),
1019 Arc::new(network_repo),
1020 Arc::new(tx_repo),
1021 Arc::new(counter),
1022 Arc::new(job_producer),
1023 )
1024 .unwrap();
1025
1026 let result = relayer.validate_min_balance().await;
1027 assert!(matches!(
1028 result,
1029 Err(RelayerError::InsufficientBalanceError(_))
1030 ));
1031 }
1032
1033 #[tokio::test]
1034 async fn test_sync_nonce() {
1035 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, mut counter) =
1036 setup_mocks();
1037 let relayer_model = create_test_relayer();
1038
1039 provider
1040 .expect_get_transaction_count()
1041 .returning(|_| Box::pin(ready(Ok(42u64))));
1042
1043 counter
1044 .expect_set()
1045 .returning(|_nonce| Box::pin(ready(Ok(()))));
1046
1047 counter
1048 .expect_get()
1049 .returning(|| Box::pin(ready(Ok(Some(42u64)))));
1050
1051 let relayer = EvmRelayer::new(
1052 relayer_model,
1053 signer,
1054 provider,
1055 create_test_evm_network(),
1056 Arc::new(relayer_repo),
1057 Arc::new(network_repo),
1058 Arc::new(tx_repo),
1059 Arc::new(counter),
1060 Arc::new(job_producer),
1061 )
1062 .unwrap();
1063
1064 let result = relayer.sync_nonce().await;
1065 assert!(result.is_ok());
1066 }
1067
1068 #[tokio::test]
1069 async fn test_sync_nonce_lower_on_chain_nonce() {
1070 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, mut counter) =
1071 setup_mocks();
1072 let relayer_model = create_test_relayer();
1073
1074 provider
1075 .expect_get_transaction_count()
1076 .returning(|_| Box::pin(ready(Ok(40u64))));
1077
1078 counter
1079 .expect_set()
1080 .with(eq(42u64))
1081 .returning(|_nonce| Box::pin(ready(Ok(()))));
1082
1083 counter
1084 .expect_get()
1085 .returning(|| Box::pin(ready(Ok(Some(42u64)))));
1086
1087 let relayer = EvmRelayer::new(
1088 relayer_model,
1089 signer,
1090 provider,
1091 create_test_evm_network(),
1092 Arc::new(relayer_repo),
1093 Arc::new(network_repo),
1094 Arc::new(tx_repo),
1095 Arc::new(counter),
1096 Arc::new(job_producer),
1097 )
1098 .unwrap();
1099
1100 let result = relayer.sync_nonce().await;
1101 assert!(result.is_ok());
1102 }
1103
1104 #[tokio::test]
1105 async fn test_sync_nonce_lower_transaction_counter_nonce() {
1106 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, mut counter) =
1107 setup_mocks();
1108 let relayer_model = create_test_relayer();
1109
1110 provider
1111 .expect_get_transaction_count()
1112 .returning(|_| Box::pin(ready(Ok(42u64))));
1113
1114 counter
1115 .expect_set()
1116 .with(eq(42u64))
1117 .returning(|_nonce| Box::pin(ready(Ok(()))));
1118
1119 counter
1120 .expect_get()
1121 .returning(|| Box::pin(ready(Ok(Some(40u64)))));
1122
1123 let relayer = EvmRelayer::new(
1124 relayer_model,
1125 signer,
1126 provider,
1127 create_test_evm_network(),
1128 Arc::new(relayer_repo),
1129 Arc::new(network_repo),
1130 Arc::new(tx_repo),
1131 Arc::new(counter),
1132 Arc::new(job_producer),
1133 )
1134 .unwrap();
1135
1136 let result = relayer.sync_nonce().await;
1137 assert!(result.is_ok());
1138 }
1139
1140 #[tokio::test]
1141 async fn test_validate_rpc() {
1142 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
1143 setup_mocks();
1144 let relayer_model = create_test_relayer();
1145
1146 provider
1147 .expect_health_check()
1148 .returning(|| Box::pin(ready(Ok(true))));
1149
1150 let relayer = EvmRelayer::new(
1151 relayer_model,
1152 signer,
1153 provider,
1154 create_test_evm_network(),
1155 Arc::new(relayer_repo),
1156 Arc::new(network_repo),
1157 Arc::new(tx_repo),
1158 Arc::new(counter),
1159 Arc::new(job_producer),
1160 )
1161 .unwrap();
1162
1163 let result = relayer.validate_rpc().await;
1164 assert!(result.is_ok());
1165 }
1166
1167 #[tokio::test]
1168 async fn test_get_status_success() {
1169 let (
1170 mut provider,
1171 relayer_repo,
1172 network_repo,
1173 mut tx_repo,
1174 job_producer,
1175 signer,
1176 mut counter,
1177 ) = setup_mocks();
1178 let relayer_model = create_test_relayer();
1179
1180 counter
1182 .expect_get()
1183 .returning(|| Box::pin(ready(Ok(Some(10u64)))))
1184 .once();
1185 provider
1186 .expect_get_balance()
1187 .returning(|_| Box::pin(ready(Ok(U256::from(1000000000000000000u64)))))
1188 .once();
1189
1190 tx_repo
1192 .expect_count_by_status()
1193 .withf(|relayer_id, statuses| {
1194 relayer_id == "test-relayer-id"
1195 && statuses
1196 == [
1197 TransactionStatus::Pending,
1198 TransactionStatus::Sent,
1199 TransactionStatus::Submitted,
1200 ]
1201 })
1202 .returning(|_, _| Ok(0u64))
1203 .once();
1204
1205 let latest_confirmed_tx = TransactionRepoModel {
1207 id: "tx1".to_string(),
1208 relayer_id: relayer_model.id.clone(),
1209 status: TransactionStatus::Confirmed,
1210 confirmed_at: Some("2023-01-01T12:00:00Z".to_string()),
1211 ..TransactionRepoModel::default()
1212 };
1213 let relayer_id_clone = relayer_model.id.clone();
1214 tx_repo
1215 .expect_find_by_status_paginated()
1216 .withf(move |relayer_id, statuses, query, oldest_first| {
1217 *relayer_id == relayer_id_clone
1218 && statuses == [TransactionStatus::Confirmed]
1219 && query.page == 1
1220 && query.per_page == 1
1221 && *oldest_first == false
1222 })
1223 .returning(move |_, _, _, _| {
1224 Ok(crate::repositories::PaginatedResult {
1225 items: vec![latest_confirmed_tx.clone()],
1226 total: 1,
1227 page: 1,
1228 per_page: 1,
1229 })
1230 })
1231 .once();
1232
1233 let relayer = EvmRelayer::new(
1234 relayer_model.clone(),
1235 signer,
1236 provider,
1237 create_test_evm_network(),
1238 Arc::new(relayer_repo),
1239 Arc::new(network_repo),
1240 Arc::new(tx_repo),
1241 Arc::new(counter),
1242 Arc::new(job_producer),
1243 )
1244 .unwrap();
1245
1246 let status = relayer
1247 .get_status(GetStatusOptions::default())
1248 .await
1249 .unwrap();
1250
1251 match status {
1252 RelayerStatus::Evm {
1253 balance,
1254 pending_transactions_count,
1255 last_confirmed_transaction_timestamp,
1256 system_disabled,
1257 paused,
1258 nonce,
1259 } => {
1260 assert_eq!(balance, Some("1000000000000000000".to_string()));
1261 assert_eq!(pending_transactions_count, Some(0));
1262 assert_eq!(
1263 last_confirmed_transaction_timestamp,
1264 Some("2023-01-01T12:00:00Z".to_string())
1265 );
1266 assert_eq!(system_disabled, relayer_model.system_disabled);
1267 assert_eq!(paused, relayer_model.paused);
1268 assert_eq!(nonce, "10");
1269 }
1270 _ => panic!("Expected EVM RelayerStatus"),
1271 }
1272 }
1273
1274 #[tokio::test]
1275 async fn test_get_status_skip_all_optional_fields() {
1276 let (provider, relayer_repo, network_repo, tx_repo, job_producer, signer, mut counter) =
1277 setup_mocks();
1278 let relayer_model = create_test_relayer();
1279
1280 counter
1282 .expect_get()
1283 .returning(|| Box::pin(ready(Ok(Some(10u64)))))
1284 .once();
1285
1286 let relayer = EvmRelayer::new(
1290 relayer_model.clone(),
1291 signer,
1292 provider,
1293 create_test_evm_network(),
1294 Arc::new(relayer_repo),
1295 Arc::new(network_repo),
1296 Arc::new(tx_repo),
1297 Arc::new(counter),
1298 Arc::new(job_producer),
1299 )
1300 .unwrap();
1301
1302 let options = GetStatusOptions {
1303 include_balance: false,
1304 include_pending_count: false,
1305 include_last_confirmed_tx: false,
1306 };
1307 let status = relayer.get_status(options).await.unwrap();
1308
1309 match status {
1310 RelayerStatus::Evm {
1311 balance,
1312 pending_transactions_count,
1313 last_confirmed_transaction_timestamp,
1314 system_disabled,
1315 paused,
1316 nonce,
1317 } => {
1318 assert_eq!(balance, None);
1319 assert_eq!(pending_transactions_count, None);
1320 assert_eq!(last_confirmed_transaction_timestamp, None);
1321 assert_eq!(system_disabled, relayer_model.system_disabled);
1322 assert_eq!(paused, relayer_model.paused);
1323 assert_eq!(nonce, "10");
1324 }
1325 _ => panic!("Expected EVM RelayerStatus"),
1326 }
1327 }
1328
1329 #[tokio::test]
1330 async fn test_get_status_partial_options() {
1331 let (
1332 mut provider,
1333 relayer_repo,
1334 network_repo,
1335 mut tx_repo,
1336 job_producer,
1337 signer,
1338 mut counter,
1339 ) = setup_mocks();
1340 let relayer_model = create_test_relayer();
1341
1342 counter
1343 .expect_get()
1344 .returning(|| Box::pin(ready(Ok(Some(5u64)))))
1345 .once();
1346
1347 provider
1349 .expect_get_balance()
1350 .returning(|_| Box::pin(ready(Ok(U256::from(2000000000000000000u64)))))
1351 .once();
1352
1353 tx_repo
1355 .expect_count_by_status()
1356 .returning(|_, _| Ok(3u64))
1357 .once();
1358
1359 let relayer = EvmRelayer::new(
1362 relayer_model.clone(),
1363 signer,
1364 provider,
1365 create_test_evm_network(),
1366 Arc::new(relayer_repo),
1367 Arc::new(network_repo),
1368 Arc::new(tx_repo),
1369 Arc::new(counter),
1370 Arc::new(job_producer),
1371 )
1372 .unwrap();
1373
1374 let options = GetStatusOptions {
1375 include_balance: true,
1376 include_pending_count: true,
1377 include_last_confirmed_tx: false,
1378 };
1379 let status = relayer.get_status(options).await.unwrap();
1380
1381 match status {
1382 RelayerStatus::Evm {
1383 balance,
1384 pending_transactions_count,
1385 last_confirmed_transaction_timestamp,
1386 nonce,
1387 ..
1388 } => {
1389 assert_eq!(balance, Some("2000000000000000000".to_string()));
1390 assert_eq!(pending_transactions_count, Some(3));
1391 assert_eq!(last_confirmed_transaction_timestamp, None);
1392 assert_eq!(nonce, "5");
1393 }
1394 _ => panic!("Expected EVM RelayerStatus"),
1395 }
1396 }
1397
1398 #[tokio::test]
1399 async fn test_get_status_provider_nonce_error() {
1400 let (
1401 mut provider,
1402 relayer_repo,
1403 network_repo,
1404 mut tx_repo,
1405 job_producer,
1406 signer,
1407 mut counter,
1408 ) = setup_mocks();
1409 let relayer_model = create_test_relayer();
1410
1411 counter
1413 .expect_get()
1414 .returning(|| Box::pin(ready(Ok(None))))
1415 .once();
1416 provider
1417 .expect_get_balance()
1418 .returning(|_| Box::pin(ready(Ok(U256::from(1000000000000000000u64)))))
1419 .once();
1420
1421 tx_repo
1423 .expect_count_by_status()
1424 .returning(|_, _| Ok(0u64))
1425 .once();
1426
1427 tx_repo
1429 .expect_find_by_status_paginated()
1430 .withf(|_relayer_id, statuses, query, oldest_first| {
1431 statuses == [TransactionStatus::Confirmed]
1432 && query.page == 1
1433 && query.per_page == 1
1434 && *oldest_first == false
1435 })
1436 .returning(|_, _, _, _| {
1437 Ok(crate::repositories::PaginatedResult {
1438 items: vec![],
1439 total: 0,
1440 page: 1,
1441 per_page: 1,
1442 })
1443 })
1444 .once();
1445
1446 let relayer = EvmRelayer::new(
1447 relayer_model.clone(),
1448 signer,
1449 provider,
1450 create_test_evm_network(),
1451 Arc::new(relayer_repo),
1452 Arc::new(network_repo),
1453 Arc::new(tx_repo),
1454 Arc::new(counter),
1455 Arc::new(job_producer),
1456 )
1457 .unwrap();
1458
1459 let status = relayer
1461 .get_status(GetStatusOptions::default())
1462 .await
1463 .unwrap();
1464 match status {
1465 RelayerStatus::Evm { nonce, .. } => {
1466 assert_eq!(nonce, "0");
1467 }
1468 _ => panic!("Expected Evm status"),
1469 }
1470 }
1471
1472 #[tokio::test]
1473 async fn test_get_status_repository_pending_error() {
1474 let (
1475 mut provider,
1476 relayer_repo,
1477 network_repo,
1478 mut tx_repo,
1479 job_producer,
1480 signer,
1481 mut counter,
1482 ) = setup_mocks();
1483 let relayer_model = create_test_relayer();
1484
1485 counter
1487 .expect_get()
1488 .returning(|| Box::pin(ready(Ok(Some(10u64)))))
1489 .once();
1490 provider
1491 .expect_get_balance()
1492 .returning(|_| Box::pin(ready(Ok(U256::from(1000000000000000000u64)))));
1493
1494 tx_repo
1495 .expect_count_by_status()
1496 .withf(|relayer_id, statuses| {
1497 relayer_id == "test-relayer-id"
1498 && statuses
1499 == [
1500 TransactionStatus::Pending,
1501 TransactionStatus::Sent,
1502 TransactionStatus::Submitted,
1503 ]
1504 })
1505 .returning(|_, _| Err(RepositoryError::Unknown("DB down".to_string())))
1506 .once();
1507
1508 let relayer = EvmRelayer::new(
1509 relayer_model.clone(),
1510 signer,
1511 provider,
1512 create_test_evm_network(),
1513 Arc::new(relayer_repo),
1514 Arc::new(network_repo),
1515 Arc::new(tx_repo),
1516 Arc::new(counter),
1517 Arc::new(job_producer),
1518 )
1519 .unwrap();
1520
1521 let result = relayer.get_status(GetStatusOptions::default()).await;
1522 assert!(result.is_err());
1523 match result.err().unwrap() {
1524 RelayerError::NetworkConfiguration(msg) => assert!(msg.contains("DB down")),
1526 _ => panic!("Expected NetworkConfiguration error for repo failure"),
1527 }
1528 }
1529
1530 #[tokio::test]
1531 async fn test_get_status_no_confirmed_transactions() {
1532 let (
1533 mut provider,
1534 relayer_repo,
1535 network_repo,
1536 mut tx_repo,
1537 job_producer,
1538 signer,
1539 mut counter,
1540 ) = setup_mocks();
1541 let relayer_model = create_test_relayer();
1542
1543 counter
1545 .expect_get()
1546 .returning(|| Box::pin(ready(Ok(Some(10u64)))));
1547 provider
1548 .expect_get_balance()
1549 .returning(|_| Box::pin(ready(Ok(U256::from(1000000000000000000u64)))));
1550 provider
1551 .expect_health_check()
1552 .returning(|| Box::pin(ready(Ok(true))));
1553
1554 tx_repo
1556 .expect_count_by_status()
1557 .withf(|relayer_id, statuses| {
1558 relayer_id == "test-relayer-id"
1559 && statuses
1560 == [
1561 TransactionStatus::Pending,
1562 TransactionStatus::Sent,
1563 TransactionStatus::Submitted,
1564 ]
1565 })
1566 .returning(|_, _| Ok(0u64))
1567 .once();
1568
1569 let relayer_id_clone = relayer_model.id.clone();
1571 tx_repo
1572 .expect_find_by_status_paginated()
1573 .withf(move |relayer_id, statuses, query, oldest_first| {
1574 *relayer_id == relayer_id_clone
1575 && statuses == [TransactionStatus::Confirmed]
1576 && query.page == 1
1577 && query.per_page == 1
1578 && *oldest_first == false
1579 })
1580 .returning(|_, _, _, _| {
1581 Ok(crate::repositories::PaginatedResult {
1582 items: vec![],
1583 total: 0,
1584 page: 1,
1585 per_page: 1,
1586 })
1587 })
1588 .once();
1589
1590 let relayer = EvmRelayer::new(
1591 relayer_model.clone(),
1592 signer,
1593 provider,
1594 create_test_evm_network(),
1595 Arc::new(relayer_repo),
1596 Arc::new(network_repo),
1597 Arc::new(tx_repo),
1598 Arc::new(counter),
1599 Arc::new(job_producer),
1600 )
1601 .unwrap();
1602
1603 let status = relayer
1604 .get_status(GetStatusOptions::default())
1605 .await
1606 .unwrap();
1607 match status {
1608 RelayerStatus::Evm {
1609 balance,
1610 pending_transactions_count,
1611 last_confirmed_transaction_timestamp,
1612 system_disabled,
1613 paused,
1614 nonce,
1615 } => {
1616 assert_eq!(balance, Some("1000000000000000000".to_string()));
1617 assert_eq!(pending_transactions_count, Some(0));
1618 assert_eq!(last_confirmed_transaction_timestamp, None);
1619 assert_eq!(system_disabled, relayer_model.system_disabled);
1620 assert_eq!(paused, relayer_model.paused);
1621 assert_eq!(nonce, "10");
1622 }
1623 _ => panic!("Expected EVM RelayerStatus"),
1624 }
1625 }
1626
1627 #[tokio::test]
1628 async fn test_cancel_transaction_via_job_success() {
1629 let (provider, relayer_repo, network_repo, tx_repo, mut job_producer, signer, counter) =
1630 setup_mocks();
1631 let relayer_model = create_test_relayer();
1632
1633 let test_transaction = TransactionRepoModel {
1634 id: "test-tx-id".to_string(),
1635 relayer_id: relayer_model.id.clone(),
1636 status: TransactionStatus::Pending,
1637 ..TransactionRepoModel::default()
1638 };
1639
1640 job_producer
1641 .expect_produce_submit_transaction_job()
1642 .withf(|job, delay| {
1643 matches!(job.command, crate::jobs::TransactionCommand::Cancel { ref reason }
1644 if job.transaction_id == "test-tx-id"
1645 && job.relayer_id == "test-relayer-id"
1646 && reason == "Cancelled via delete_pending_transactions")
1647 && delay.is_none()
1648 })
1649 .returning(|_, _| Box::pin(ready(Ok(()))))
1650 .once();
1651
1652 let relayer = EvmRelayer::new(
1653 relayer_model,
1654 signer,
1655 provider,
1656 create_test_evm_network(),
1657 Arc::new(relayer_repo),
1658 Arc::new(network_repo),
1659 Arc::new(tx_repo),
1660 Arc::new(counter),
1661 Arc::new(job_producer),
1662 )
1663 .unwrap();
1664
1665 let result = relayer.cancel_transaction_via_job(test_transaction).await;
1666 assert!(result.is_ok());
1667 }
1668
1669 #[tokio::test]
1670 async fn test_cancel_transaction_via_job_failure() {
1671 let (provider, relayer_repo, network_repo, tx_repo, mut job_producer, signer, counter) =
1672 setup_mocks();
1673 let relayer_model = create_test_relayer();
1674
1675 let test_transaction = TransactionRepoModel {
1676 id: "test-tx-id".to_string(),
1677 relayer_id: relayer_model.id.clone(),
1678 status: TransactionStatus::Pending,
1679 ..TransactionRepoModel::default()
1680 };
1681
1682 job_producer
1683 .expect_produce_submit_transaction_job()
1684 .returning(|_, _| {
1685 Box::pin(ready(Err(crate::jobs::JobProducerError::QueueError(
1686 "Queue is full".to_string(),
1687 ))))
1688 })
1689 .once();
1690
1691 let relayer = EvmRelayer::new(
1692 relayer_model,
1693 signer,
1694 provider,
1695 create_test_evm_network(),
1696 Arc::new(relayer_repo),
1697 Arc::new(network_repo),
1698 Arc::new(tx_repo),
1699 Arc::new(counter),
1700 Arc::new(job_producer),
1701 )
1702 .unwrap();
1703
1704 let result = relayer.cancel_transaction_via_job(test_transaction).await;
1705 assert!(result.is_err());
1706 match result.err().unwrap() {
1707 RelayerError::QueueError(_) => (),
1708 _ => panic!("Expected QueueError"),
1709 }
1710 }
1711
1712 #[tokio::test]
1713 async fn test_delete_pending_transactions_no_pending() {
1714 let (provider, relayer_repo, network_repo, mut tx_repo, job_producer, signer, counter) =
1715 setup_mocks();
1716 let relayer_model = create_test_relayer();
1717
1718 tx_repo
1719 .expect_find_by_status()
1720 .withf(|relayer_id, statuses| {
1721 relayer_id == "test-relayer-id"
1722 && statuses
1723 == [
1724 TransactionStatus::Pending,
1725 TransactionStatus::Sent,
1726 TransactionStatus::Submitted,
1727 ]
1728 })
1729 .returning(|_, _| Ok(vec![]))
1730 .once();
1731
1732 let relayer = EvmRelayer::new(
1733 relayer_model,
1734 signer,
1735 provider,
1736 create_test_evm_network(),
1737 Arc::new(relayer_repo),
1738 Arc::new(network_repo),
1739 Arc::new(tx_repo),
1740 Arc::new(counter),
1741 Arc::new(job_producer),
1742 )
1743 .unwrap();
1744
1745 let result = relayer.delete_pending_transactions().await.unwrap();
1746 assert_eq!(result.queued_for_cancellation_transaction_ids.len(), 0);
1747 assert_eq!(result.failed_to_queue_transaction_ids.len(), 0);
1748 assert_eq!(result.total_processed, 0);
1749 }
1750
1751 #[tokio::test]
1752 async fn test_delete_pending_transactions_all_successful() {
1753 let (provider, relayer_repo, network_repo, mut tx_repo, mut job_producer, signer, counter) =
1754 setup_mocks();
1755 let relayer_model = create_test_relayer();
1756
1757 let pending_transactions = vec![
1758 TransactionRepoModel {
1759 id: "tx1".to_string(),
1760 relayer_id: relayer_model.id.clone(),
1761 status: TransactionStatus::Pending,
1762 ..TransactionRepoModel::default()
1763 },
1764 TransactionRepoModel {
1765 id: "tx2".to_string(),
1766 relayer_id: relayer_model.id.clone(),
1767 status: TransactionStatus::Sent,
1768 ..TransactionRepoModel::default()
1769 },
1770 TransactionRepoModel {
1771 id: "tx3".to_string(),
1772 relayer_id: relayer_model.id.clone(),
1773 status: TransactionStatus::Submitted,
1774 ..TransactionRepoModel::default()
1775 },
1776 ];
1777
1778 tx_repo
1779 .expect_find_by_status()
1780 .withf(|relayer_id, statuses| {
1781 relayer_id == "test-relayer-id"
1782 && statuses
1783 == [
1784 TransactionStatus::Pending,
1785 TransactionStatus::Sent,
1786 TransactionStatus::Submitted,
1787 ]
1788 })
1789 .returning(move |_, _| Ok(pending_transactions.clone()))
1790 .once();
1791
1792 job_producer
1793 .expect_produce_submit_transaction_job()
1794 .returning(|_, _| Box::pin(ready(Ok(()))))
1795 .times(3);
1796
1797 let relayer = EvmRelayer::new(
1798 relayer_model,
1799 signer,
1800 provider,
1801 create_test_evm_network(),
1802 Arc::new(relayer_repo),
1803 Arc::new(network_repo),
1804 Arc::new(tx_repo),
1805 Arc::new(counter),
1806 Arc::new(job_producer),
1807 )
1808 .unwrap();
1809
1810 let result = relayer.delete_pending_transactions().await.unwrap();
1811 assert_eq!(result.queued_for_cancellation_transaction_ids.len(), 3);
1812 assert_eq!(result.failed_to_queue_transaction_ids.len(), 0);
1813 assert_eq!(result.total_processed, 3);
1814
1815 let expected_ids = vec!["tx1", "tx2", "tx3"];
1816 for id in expected_ids {
1817 assert!(result
1818 .queued_for_cancellation_transaction_ids
1819 .contains(&id.to_string()));
1820 }
1821 }
1822
1823 #[tokio::test]
1824 async fn test_delete_pending_transactions_partial_failures() {
1825 let (provider, relayer_repo, network_repo, mut tx_repo, mut job_producer, signer, counter) =
1826 setup_mocks();
1827 let relayer_model = create_test_relayer();
1828
1829 let pending_transactions = vec![
1830 TransactionRepoModel {
1831 id: "tx1".to_string(),
1832 relayer_id: relayer_model.id.clone(),
1833 status: TransactionStatus::Pending,
1834 ..TransactionRepoModel::default()
1835 },
1836 TransactionRepoModel {
1837 id: "tx2".to_string(),
1838 relayer_id: relayer_model.id.clone(),
1839 status: TransactionStatus::Sent,
1840 ..TransactionRepoModel::default()
1841 },
1842 TransactionRepoModel {
1843 id: "tx3".to_string(),
1844 relayer_id: relayer_model.id.clone(),
1845 status: TransactionStatus::Submitted,
1846 ..TransactionRepoModel::default()
1847 },
1848 ];
1849
1850 tx_repo
1851 .expect_find_by_status()
1852 .withf(|relayer_id, statuses| {
1853 relayer_id == "test-relayer-id"
1854 && statuses
1855 == [
1856 TransactionStatus::Pending,
1857 TransactionStatus::Sent,
1858 TransactionStatus::Submitted,
1859 ]
1860 })
1861 .returning(move |_, _| Ok(pending_transactions.clone()))
1862 .once();
1863
1864 job_producer
1866 .expect_produce_submit_transaction_job()
1867 .returning(|_, _| Box::pin(ready(Ok(()))))
1868 .times(1);
1869 job_producer
1870 .expect_produce_submit_transaction_job()
1871 .returning(|_, _| {
1872 Box::pin(ready(Err(crate::jobs::JobProducerError::QueueError(
1873 "Queue is full".to_string(),
1874 ))))
1875 })
1876 .times(1);
1877 job_producer
1878 .expect_produce_submit_transaction_job()
1879 .returning(|_, _| Box::pin(ready(Ok(()))))
1880 .times(1);
1881
1882 let relayer = EvmRelayer::new(
1883 relayer_model,
1884 signer,
1885 provider,
1886 create_test_evm_network(),
1887 Arc::new(relayer_repo),
1888 Arc::new(network_repo),
1889 Arc::new(tx_repo),
1890 Arc::new(counter),
1891 Arc::new(job_producer),
1892 )
1893 .unwrap();
1894
1895 let result = relayer.delete_pending_transactions().await.unwrap();
1896 assert_eq!(result.queued_for_cancellation_transaction_ids.len(), 2);
1897 assert_eq!(result.failed_to_queue_transaction_ids.len(), 1);
1898 assert_eq!(result.total_processed, 3);
1899 }
1900
1901 #[tokio::test]
1902 async fn test_delete_pending_transactions_repository_error() {
1903 let (provider, relayer_repo, network_repo, mut tx_repo, job_producer, signer, counter) =
1904 setup_mocks();
1905 let relayer_model = create_test_relayer();
1906
1907 tx_repo
1908 .expect_find_by_status()
1909 .withf(|relayer_id, statuses| {
1910 relayer_id == "test-relayer-id"
1911 && statuses
1912 == [
1913 TransactionStatus::Pending,
1914 TransactionStatus::Sent,
1915 TransactionStatus::Submitted,
1916 ]
1917 })
1918 .returning(|_, _| {
1919 Err(RepositoryError::Unknown(
1920 "Database connection failed".to_string(),
1921 ))
1922 })
1923 .once();
1924
1925 let relayer = EvmRelayer::new(
1926 relayer_model,
1927 signer,
1928 provider,
1929 create_test_evm_network(),
1930 Arc::new(relayer_repo),
1931 Arc::new(network_repo),
1932 Arc::new(tx_repo),
1933 Arc::new(counter),
1934 Arc::new(job_producer),
1935 )
1936 .unwrap();
1937
1938 let result = relayer.delete_pending_transactions().await;
1939 assert!(result.is_err());
1940 match result.err().unwrap() {
1941 RelayerError::NetworkConfiguration(msg) => {
1942 assert!(msg.contains("Database connection failed"))
1943 }
1944 _ => panic!("Expected NetworkConfiguration error for repository failure"),
1945 }
1946 }
1947
1948 #[tokio::test]
1949 async fn test_delete_pending_transactions_all_failures() {
1950 let (provider, relayer_repo, network_repo, mut tx_repo, mut job_producer, signer, counter) =
1951 setup_mocks();
1952 let relayer_model = create_test_relayer();
1953
1954 let pending_transactions = vec![
1955 TransactionRepoModel {
1956 id: "tx1".to_string(),
1957 relayer_id: relayer_model.id.clone(),
1958 status: TransactionStatus::Pending,
1959 ..TransactionRepoModel::default()
1960 },
1961 TransactionRepoModel {
1962 id: "tx2".to_string(),
1963 relayer_id: relayer_model.id.clone(),
1964 status: TransactionStatus::Sent,
1965 ..TransactionRepoModel::default()
1966 },
1967 ];
1968
1969 tx_repo
1970 .expect_find_by_status()
1971 .withf(|relayer_id, statuses| {
1972 relayer_id == "test-relayer-id"
1973 && statuses
1974 == [
1975 TransactionStatus::Pending,
1976 TransactionStatus::Sent,
1977 TransactionStatus::Submitted,
1978 ]
1979 })
1980 .returning(move |_, _| Ok(pending_transactions.clone()))
1981 .once();
1982
1983 job_producer
1984 .expect_produce_submit_transaction_job()
1985 .returning(|_, _| {
1986 Box::pin(ready(Err(crate::jobs::JobProducerError::QueueError(
1987 "Queue is full".to_string(),
1988 ))))
1989 })
1990 .times(2);
1991
1992 let relayer = EvmRelayer::new(
1993 relayer_model,
1994 signer,
1995 provider,
1996 create_test_evm_network(),
1997 Arc::new(relayer_repo),
1998 Arc::new(network_repo),
1999 Arc::new(tx_repo),
2000 Arc::new(counter),
2001 Arc::new(job_producer),
2002 )
2003 .unwrap();
2004
2005 let result = relayer.delete_pending_transactions().await.unwrap();
2006 assert_eq!(result.queued_for_cancellation_transaction_ids.len(), 0);
2007 assert_eq!(result.failed_to_queue_transaction_ids.len(), 2);
2008 assert_eq!(result.total_processed, 2);
2009
2010 let expected_failed_ids = vec!["tx1", "tx2"];
2011 for id in expected_failed_ids {
2012 assert!(result
2013 .failed_to_queue_transaction_ids
2014 .contains(&id.to_string()));
2015 }
2016 }
2017
2018 #[tokio::test]
2019 async fn test_rpc_eth_get_balance() {
2020 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2021 setup_mocks();
2022 let relayer_model = create_test_relayer();
2023
2024 provider
2025 .expect_raw_request_dyn()
2026 .withf(|method, params| {
2027 method == "eth_getBalance"
2028 && params.as_str()
2029 == Some(r#"["0x742d35Cc6634C0532925a3b844Bc454e4438f44e", "latest"]"#)
2030 })
2031 .returning(|_, _| Box::pin(async { Ok(serde_json::json!("0xde0b6b3a7640000")) }));
2032
2033 let relayer = EvmRelayer::new(
2034 relayer_model,
2035 signer,
2036 provider,
2037 create_test_evm_network(),
2038 Arc::new(relayer_repo),
2039 Arc::new(network_repo),
2040 Arc::new(tx_repo),
2041 Arc::new(counter),
2042 Arc::new(job_producer),
2043 )
2044 .unwrap();
2045
2046 let request = JsonRpcRequest {
2047 jsonrpc: "2.0".to_string(),
2048 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2049 method: "eth_getBalance".to_string(),
2050 params: serde_json::Value::String(
2051 r#"["0x742d35Cc6634C0532925a3b844Bc454e4438f44e", "latest"]"#.to_string(),
2052 ),
2053 }),
2054 id: Some(JsonRpcId::Number(1)),
2055 };
2056
2057 let response = relayer.rpc(request).await.unwrap();
2058 assert!(response.error.is_none());
2059 assert!(response.result.is_some());
2060
2061 if let Some(NetworkRpcResult::Evm(EvmRpcResult::RawRpcResult(result))) = response.result {
2062 assert_eq!(result, serde_json::json!("0xde0b6b3a7640000")); }
2064 }
2065
2066 #[tokio::test]
2067 async fn test_rpc_eth_block_number() {
2068 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2069 setup_mocks();
2070 let relayer_model = create_test_relayer();
2071
2072 provider
2073 .expect_raw_request_dyn()
2074 .withf(|method, params| method == "eth_blockNumber" && params.as_str() == Some("[]"))
2075 .returning(|_, _| Box::pin(async { Ok(serde_json::json!("0x3039")) }));
2076
2077 let relayer = EvmRelayer::new(
2078 relayer_model,
2079 signer,
2080 provider,
2081 create_test_evm_network(),
2082 Arc::new(relayer_repo),
2083 Arc::new(network_repo),
2084 Arc::new(tx_repo),
2085 Arc::new(counter),
2086 Arc::new(job_producer),
2087 )
2088 .unwrap();
2089
2090 let request = JsonRpcRequest {
2091 jsonrpc: "2.0".to_string(),
2092 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2093 method: "eth_blockNumber".to_string(),
2094 params: serde_json::Value::String("[]".to_string()),
2095 }),
2096 id: Some(JsonRpcId::Number(1)),
2097 };
2098
2099 let response = relayer.rpc(request).await.unwrap();
2100 assert!(response.error.is_none());
2101 assert!(response.result.is_some());
2102
2103 if let Some(NetworkRpcResult::Evm(EvmRpcResult::RawRpcResult(result))) = response.result {
2104 assert_eq!(result, serde_json::json!("0x3039")); }
2106 }
2107
2108 #[tokio::test]
2109 async fn test_rpc_unsupported_method() {
2110 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2111 setup_mocks();
2112 let relayer_model = create_test_relayer();
2113
2114 provider
2115 .expect_raw_request_dyn()
2116 .withf(|method, _| method == "eth_unsupportedMethod")
2117 .returning(|_, _| {
2118 Box::pin(async {
2119 Err(ProviderError::Other(
2120 "Unsupported method: eth_unsupportedMethod".to_string(),
2121 ))
2122 })
2123 });
2124
2125 let relayer = EvmRelayer::new(
2126 relayer_model,
2127 signer,
2128 provider,
2129 create_test_evm_network(),
2130 Arc::new(relayer_repo),
2131 Arc::new(network_repo),
2132 Arc::new(tx_repo),
2133 Arc::new(counter),
2134 Arc::new(job_producer),
2135 )
2136 .unwrap();
2137
2138 let request = JsonRpcRequest {
2139 jsonrpc: "2.0".to_string(),
2140 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2141 method: "eth_unsupportedMethod".to_string(),
2142 params: serde_json::Value::String("[]".to_string()),
2143 }),
2144 id: Some(JsonRpcId::Number(1)),
2145 };
2146
2147 let response = relayer.rpc(request).await.unwrap();
2148 assert!(response.result.is_none());
2149 assert!(response.error.is_some());
2150
2151 let error = response.error.unwrap();
2152 assert_eq!(error.code, -32603); }
2154
2155 #[tokio::test]
2156 async fn test_rpc_invalid_params() {
2157 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2158 setup_mocks();
2159 let relayer_model = create_test_relayer();
2160
2161 provider
2162 .expect_raw_request_dyn()
2163 .withf(|method, params| method == "eth_getBalance" && params.as_str() == Some("[]"))
2164 .returning(|_, _| {
2165 Box::pin(async {
2166 Err(ProviderError::Other(
2167 "Missing address parameter".to_string(),
2168 ))
2169 })
2170 });
2171
2172 let relayer = EvmRelayer::new(
2173 relayer_model,
2174 signer,
2175 provider,
2176 create_test_evm_network(),
2177 Arc::new(relayer_repo),
2178 Arc::new(network_repo),
2179 Arc::new(tx_repo),
2180 Arc::new(counter),
2181 Arc::new(job_producer),
2182 )
2183 .unwrap();
2184
2185 let request = JsonRpcRequest {
2186 jsonrpc: "2.0".to_string(),
2187 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2188 method: "eth_getBalance".to_string(),
2189 params: serde_json::Value::String("[]".to_string()), }),
2191 id: Some(JsonRpcId::Number(1)),
2192 };
2193
2194 let response = relayer.rpc(request).await.unwrap();
2195 assert!(response.result.is_none());
2196 assert!(response.error.is_some());
2197
2198 let error = response.error.unwrap();
2199 assert_eq!(error.code, -32603); }
2201
2202 #[tokio::test]
2203 async fn test_rpc_non_evm_request() {
2204 let (provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2205 setup_mocks();
2206 let relayer_model = create_test_relayer();
2207
2208 let relayer = EvmRelayer::new(
2209 relayer_model,
2210 signer,
2211 provider,
2212 create_test_evm_network(),
2213 Arc::new(relayer_repo),
2214 Arc::new(network_repo),
2215 Arc::new(tx_repo),
2216 Arc::new(counter),
2217 Arc::new(job_producer),
2218 )
2219 .unwrap();
2220
2221 let request = JsonRpcRequest {
2222 jsonrpc: "2.0".to_string(),
2223 params: NetworkRpcRequest::Solana(crate::models::SolanaRpcRequest::GetSupportedTokens(
2224 crate::models::SolanaGetSupportedTokensRequestParams {},
2225 )),
2226 id: Some(JsonRpcId::Number(1)),
2227 };
2228
2229 let response = relayer.rpc(request).await.unwrap();
2230 assert!(response.result.is_none());
2231 assert!(response.error.is_some());
2232
2233 let error = response.error.unwrap();
2234 assert_eq!(error.code, -32602); }
2236
2237 #[tokio::test]
2238 async fn test_rpc_raw_request_with_array_params() {
2239 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2240 setup_mocks();
2241 let relayer_model = create_test_relayer();
2242
2243 provider
2244 .expect_raw_request_dyn()
2245 .withf(|method, params| {
2246 method == "eth_getTransactionByHash"
2247 && params.as_array().is_some_and(|arr| {
2248 arr.len() == 1 && arr[0].as_str() == Some("0x1234567890abcdef")
2249 })
2250 })
2251 .returning(|_, _| {
2252 Box::pin(async {
2253 Ok(serde_json::json!({
2254 "hash": "0x1234567890abcdef",
2255 "blockNumber": "0x1",
2256 "gasUsed": "0x5208"
2257 }))
2258 })
2259 });
2260
2261 let relayer = EvmRelayer::new(
2262 relayer_model,
2263 signer,
2264 provider,
2265 create_test_evm_network(),
2266 Arc::new(relayer_repo),
2267 Arc::new(network_repo),
2268 Arc::new(tx_repo),
2269 Arc::new(counter),
2270 Arc::new(job_producer),
2271 )
2272 .unwrap();
2273
2274 let request = JsonRpcRequest {
2275 jsonrpc: "2.0".to_string(),
2276 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2277 method: "eth_getTransactionByHash".to_string(),
2278 params: serde_json::json!(["0x1234567890abcdef"]),
2279 }),
2280 id: Some(JsonRpcId::Number(42)),
2281 };
2282
2283 let response = relayer.rpc(request).await.unwrap();
2284 assert!(response.error.is_none());
2285 assert!(response.result.is_some());
2286 assert_eq!(response.id, Some(JsonRpcId::Number(42)));
2287
2288 if let Some(NetworkRpcResult::Evm(EvmRpcResult::RawRpcResult(result))) = response.result {
2289 assert!(result.get("hash").is_some());
2290 assert!(result.get("blockNumber").is_some());
2291 }
2292 }
2293
2294 #[tokio::test]
2295 async fn test_rpc_raw_request_with_object_params() {
2296 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2297 setup_mocks();
2298 let relayer_model = create_test_relayer();
2299
2300 provider
2301 .expect_raw_request_dyn()
2302 .withf(|method, params| {
2303 method == "eth_call"
2304 && params
2305 .as_object()
2306 .is_some_and(|obj| obj.contains_key("to") && obj.contains_key("data"))
2307 })
2308 .returning(|_, _| {
2309 Box::pin(async {
2310 Ok(serde_json::json!(
2311 "0x0000000000000000000000000000000000000000000000000000000000000001"
2312 ))
2313 })
2314 });
2315
2316 let relayer = EvmRelayer::new(
2317 relayer_model,
2318 signer,
2319 provider,
2320 create_test_evm_network(),
2321 Arc::new(relayer_repo),
2322 Arc::new(network_repo),
2323 Arc::new(tx_repo),
2324 Arc::new(counter),
2325 Arc::new(job_producer),
2326 )
2327 .unwrap();
2328
2329 let request = JsonRpcRequest {
2330 jsonrpc: "2.0".to_string(),
2331 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2332 method: "eth_call".to_string(),
2333 params: serde_json::json!({
2334 "to": "0x742d35Cc6634C0532925a3b844Bc454e4438f44e",
2335 "data": "0x70a08231000000000000000000000000742d35cc6634c0532925a3b844bc454e4438f44e"
2336 }),
2337 }),
2338 id: Some(JsonRpcId::Number(123)),
2339 };
2340
2341 let response = relayer.rpc(request).await.unwrap();
2342 assert!(response.error.is_none());
2343 assert!(response.result.is_some());
2344 assert_eq!(response.id, Some(JsonRpcId::Number(123)));
2345 }
2346
2347 #[tokio::test]
2348 async fn test_rpc_generic_request_with_empty_params() {
2349 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2350 setup_mocks();
2351 let relayer_model = create_test_relayer();
2352
2353 provider
2354 .expect_raw_request_dyn()
2355 .withf(|method, params| method == "net_version" && params.as_str() == Some("[]"))
2356 .returning(|_, _| Box::pin(async { Ok(serde_json::json!("1")) }));
2357
2358 let relayer = EvmRelayer::new(
2359 relayer_model,
2360 signer,
2361 provider,
2362 create_test_evm_network(),
2363 Arc::new(relayer_repo),
2364 Arc::new(network_repo),
2365 Arc::new(tx_repo),
2366 Arc::new(counter),
2367 Arc::new(job_producer),
2368 )
2369 .unwrap();
2370
2371 let request = JsonRpcRequest {
2372 jsonrpc: "2.0".to_string(),
2373 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2374 method: "net_version".to_string(),
2375 params: serde_json::Value::String("[]".to_string()),
2376 }),
2377 id: Some(JsonRpcId::Number(999)),
2378 };
2379
2380 let response = relayer.rpc(request).await.unwrap();
2381 assert!(response.error.is_none());
2382 assert!(response.result.is_some());
2383 assert_eq!(response.id, Some(JsonRpcId::Number(999)));
2384 }
2385
2386 #[tokio::test]
2387 async fn test_rpc_provider_invalid_address_error() {
2388 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2389 setup_mocks();
2390 let relayer_model = create_test_relayer();
2391
2392 provider.expect_raw_request_dyn().returning(|_, _| {
2393 Box::pin(async {
2394 Err(ProviderError::InvalidAddress(
2395 "Invalid address format".to_string(),
2396 ))
2397 })
2398 });
2399
2400 let relayer = EvmRelayer::new(
2401 relayer_model,
2402 signer,
2403 provider,
2404 create_test_evm_network(),
2405 Arc::new(relayer_repo),
2406 Arc::new(network_repo),
2407 Arc::new(tx_repo),
2408 Arc::new(counter),
2409 Arc::new(job_producer),
2410 )
2411 .unwrap();
2412
2413 let request = JsonRpcRequest {
2414 jsonrpc: "2.0".to_string(),
2415 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2416 method: "eth_getBalance".to_string(),
2417 params: serde_json::Value::String(r#"["invalid_address", "latest"]"#.to_string()),
2418 }),
2419 id: Some(JsonRpcId::Number(1)),
2420 };
2421
2422 let response = relayer.rpc(request).await.unwrap();
2423 assert!(response.result.is_none());
2424 assert!(response.error.is_some());
2425
2426 let error = response.error.unwrap();
2427 assert_eq!(error.code, -32602); }
2429
2430 #[tokio::test]
2431 async fn test_rpc_provider_network_configuration_error() {
2432 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2433 setup_mocks();
2434 let relayer_model = create_test_relayer();
2435
2436 provider.expect_raw_request_dyn().returning(|_, _| {
2437 Box::pin(async {
2438 Err(ProviderError::NetworkConfiguration(
2439 "Network not reachable".to_string(),
2440 ))
2441 })
2442 });
2443
2444 let relayer = EvmRelayer::new(
2445 relayer_model,
2446 signer,
2447 provider,
2448 create_test_evm_network(),
2449 Arc::new(relayer_repo),
2450 Arc::new(network_repo),
2451 Arc::new(tx_repo),
2452 Arc::new(counter),
2453 Arc::new(job_producer),
2454 )
2455 .unwrap();
2456
2457 let request = JsonRpcRequest {
2458 jsonrpc: "2.0".to_string(),
2459 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2460 method: "eth_chainId".to_string(),
2461 params: serde_json::Value::String("[]".to_string()),
2462 }),
2463 id: Some(JsonRpcId::Number(2)),
2464 };
2465
2466 let response = relayer.rpc(request).await.unwrap();
2467 assert!(response.result.is_none());
2468 assert!(response.error.is_some());
2469
2470 let error = response.error.unwrap();
2471 assert_eq!(error.code, -33004); }
2473
2474 #[tokio::test]
2475 async fn test_rpc_provider_timeout_error() {
2476 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2477 setup_mocks();
2478 let relayer_model = create_test_relayer();
2479
2480 provider
2481 .expect_raw_request_dyn()
2482 .returning(|_, _| Box::pin(async { Err(ProviderError::Timeout) }));
2483
2484 let relayer = EvmRelayer::new(
2485 relayer_model,
2486 signer,
2487 provider,
2488 create_test_evm_network(),
2489 Arc::new(relayer_repo),
2490 Arc::new(network_repo),
2491 Arc::new(tx_repo),
2492 Arc::new(counter),
2493 Arc::new(job_producer),
2494 )
2495 .unwrap();
2496
2497 let request = JsonRpcRequest {
2498 jsonrpc: "2.0".to_string(),
2499 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2500 method: "eth_blockNumber".to_string(),
2501 params: serde_json::json!([]),
2502 }),
2503 id: Some(JsonRpcId::Number(3)),
2504 };
2505
2506 let response = relayer.rpc(request).await.unwrap();
2507 assert!(response.result.is_none());
2508 assert!(response.error.is_some());
2509
2510 let error = response.error.unwrap();
2511 assert_eq!(error.code, -33000); }
2513
2514 #[tokio::test]
2515 async fn test_rpc_provider_rate_limited_error() {
2516 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2517 setup_mocks();
2518 let relayer_model = create_test_relayer();
2519
2520 provider
2521 .expect_raw_request_dyn()
2522 .returning(|_, _| Box::pin(async { Err(ProviderError::RateLimited) }));
2523
2524 let relayer = EvmRelayer::new(
2525 relayer_model,
2526 signer,
2527 provider,
2528 create_test_evm_network(),
2529 Arc::new(relayer_repo),
2530 Arc::new(network_repo),
2531 Arc::new(tx_repo),
2532 Arc::new(counter),
2533 Arc::new(job_producer),
2534 )
2535 .unwrap();
2536
2537 let request = JsonRpcRequest {
2538 jsonrpc: "2.0".to_string(),
2539 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2540 method: "eth_getBalance".to_string(),
2541 params: serde_json::Value::String(
2542 r#"["0x742d35Cc6634C0532925a3b844Bc454e4438f44e", "latest"]"#.to_string(),
2543 ),
2544 }),
2545 id: Some(JsonRpcId::Number(4)),
2546 };
2547
2548 let response = relayer.rpc(request).await.unwrap();
2549 assert!(response.result.is_none());
2550 assert!(response.error.is_some());
2551
2552 let error = response.error.unwrap();
2553 assert_eq!(error.code, -33001); }
2555
2556 #[tokio::test]
2557 async fn test_rpc_provider_bad_gateway_error() {
2558 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2559 setup_mocks();
2560 let relayer_model = create_test_relayer();
2561
2562 provider
2563 .expect_raw_request_dyn()
2564 .returning(|_, _| Box::pin(async { Err(ProviderError::BadGateway) }));
2565
2566 let relayer = EvmRelayer::new(
2567 relayer_model,
2568 signer,
2569 provider,
2570 create_test_evm_network(),
2571 Arc::new(relayer_repo),
2572 Arc::new(network_repo),
2573 Arc::new(tx_repo),
2574 Arc::new(counter),
2575 Arc::new(job_producer),
2576 )
2577 .unwrap();
2578
2579 let request = JsonRpcRequest {
2580 jsonrpc: "2.0".to_string(),
2581 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2582 method: "eth_gasPrice".to_string(),
2583 params: serde_json::json!([]),
2584 }),
2585 id: Some(JsonRpcId::Number(5)),
2586 };
2587
2588 let response = relayer.rpc(request).await.unwrap();
2589 assert!(response.result.is_none());
2590 assert!(response.error.is_some());
2591
2592 let error = response.error.unwrap();
2593 assert_eq!(error.code, -33002); }
2595
2596 #[tokio::test]
2597 async fn test_rpc_provider_request_error() {
2598 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2599 setup_mocks();
2600 let relayer_model = create_test_relayer();
2601
2602 provider.expect_raw_request_dyn().returning(|_, _| {
2603 Box::pin(async {
2604 Err(ProviderError::RequestError {
2605 error: "Bad request".to_string(),
2606 status_code: 400,
2607 })
2608 })
2609 });
2610
2611 let relayer = EvmRelayer::new(
2612 relayer_model,
2613 signer,
2614 provider,
2615 create_test_evm_network(),
2616 Arc::new(relayer_repo),
2617 Arc::new(network_repo),
2618 Arc::new(tx_repo),
2619 Arc::new(counter),
2620 Arc::new(job_producer),
2621 )
2622 .unwrap();
2623
2624 let request = JsonRpcRequest {
2625 jsonrpc: "2.0".to_string(),
2626 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2627 method: "invalid_method".to_string(),
2628 params: serde_json::Value::String("{}".to_string()),
2629 }),
2630 id: Some(JsonRpcId::Number(6)),
2631 };
2632
2633 let response = relayer.rpc(request).await.unwrap();
2634 assert!(response.result.is_none());
2635 assert!(response.error.is_some());
2636
2637 let error = response.error.unwrap();
2638 assert_eq!(error.code, -33003); }
2640
2641 #[tokio::test]
2642 async fn test_rpc_provider_other_error() {
2643 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2644 setup_mocks();
2645 let relayer_model = create_test_relayer();
2646
2647 provider.expect_raw_request_dyn().returning(|_, _| {
2648 Box::pin(async {
2649 Err(ProviderError::Other(
2650 "Unexpected error occurred".to_string(),
2651 ))
2652 })
2653 });
2654
2655 let relayer = EvmRelayer::new(
2656 relayer_model,
2657 signer,
2658 provider,
2659 create_test_evm_network(),
2660 Arc::new(relayer_repo),
2661 Arc::new(network_repo),
2662 Arc::new(tx_repo),
2663 Arc::new(counter),
2664 Arc::new(job_producer),
2665 )
2666 .unwrap();
2667
2668 let request = JsonRpcRequest {
2669 jsonrpc: "2.0".to_string(),
2670 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2671 method: "eth_getBalance".to_string(),
2672 params: serde_json::json!(["0x742d35Cc6634C0532925a3b844Bc454e4438f44e", "latest"]),
2673 }),
2674 id: Some(JsonRpcId::Number(7)),
2675 };
2676
2677 let response = relayer.rpc(request).await.unwrap();
2678 assert!(response.result.is_none());
2679 assert!(response.error.is_some());
2680
2681 let error = response.error.unwrap();
2682 assert_eq!(error.code, -32603); }
2684
2685 #[tokio::test]
2686 async fn test_rpc_response_preserves_request_id() {
2687 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2688 setup_mocks();
2689 let relayer_model = create_test_relayer();
2690
2691 provider
2692 .expect_raw_request_dyn()
2693 .returning(|_, _| Box::pin(async { Ok(serde_json::json!("0x1")) }));
2694
2695 let relayer = EvmRelayer::new(
2696 relayer_model,
2697 signer,
2698 provider,
2699 create_test_evm_network(),
2700 Arc::new(relayer_repo),
2701 Arc::new(network_repo),
2702 Arc::new(tx_repo),
2703 Arc::new(counter),
2704 Arc::new(job_producer),
2705 )
2706 .unwrap();
2707
2708 let request_id = u64::MAX;
2709 let request = JsonRpcRequest {
2710 jsonrpc: "2.0".to_string(),
2711 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2712 method: "eth_chainId".to_string(),
2713 params: serde_json::Value::String("[]".to_string()),
2714 }),
2715 id: Some(JsonRpcId::Number(request_id as i64)),
2716 };
2717
2718 let response = relayer.rpc(request).await.unwrap();
2719 assert_eq!(response.id, Some(JsonRpcId::Number(request_id as i64)));
2720 assert_eq!(response.jsonrpc, "2.0");
2721 }
2722
2723 #[tokio::test]
2724 async fn test_rpc_handles_complex_json_response() {
2725 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2726 setup_mocks();
2727 let relayer_model = create_test_relayer();
2728
2729 let complex_response = serde_json::json!({
2730 "number": "0x1b4",
2731 "hash": "0xdc0818cf78f21a8e70579cb46a43643f78291264dda342ae31049421c82d21ae",
2732 "parentHash": "0xe99e022112df268ce40b8b654759b4f39c3cc1b8c86b2f4c7da48ba6d8a6ae8b",
2733 "transactions": [
2734 {
2735 "hash": "0x5c504ed432cb51138bcf09aa5e8a410dd4a1e204ef84bfed1be16dfba1b22060",
2736 "from": "0xa7d9ddbe1f17865597fbd27ec712455208b6b76d",
2737 "to": "0xf02c1c8e6114b1dbe8937a39260b5b0a374432bb",
2738 "value": "0xf3dbb76162000"
2739 }
2740 ],
2741 "gasUsed": "0x5208"
2742 });
2743
2744 provider.expect_raw_request_dyn().returning(move |_, _| {
2745 let response = complex_response.clone();
2746 Box::pin(async move { Ok(response) })
2747 });
2748
2749 let relayer = EvmRelayer::new(
2750 relayer_model,
2751 signer,
2752 provider,
2753 create_test_evm_network(),
2754 Arc::new(relayer_repo),
2755 Arc::new(network_repo),
2756 Arc::new(tx_repo),
2757 Arc::new(counter),
2758 Arc::new(job_producer),
2759 )
2760 .unwrap();
2761
2762 let request = JsonRpcRequest {
2763 jsonrpc: "2.0".to_string(),
2764 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2765 method: "eth_getBlockByNumber".to_string(),
2766 params: serde_json::json!(["0x1b4", true]),
2767 }),
2768 id: Some(JsonRpcId::Number(8)),
2769 };
2770
2771 let response = relayer.rpc(request).await.unwrap();
2772 assert!(response.error.is_none());
2773 assert!(response.result.is_some());
2774
2775 if let Some(NetworkRpcResult::Evm(EvmRpcResult::RawRpcResult(result))) = response.result {
2776 assert!(result.get("transactions").is_some());
2777 assert!(result.get("hash").is_some());
2778 assert!(result.get("gasUsed").is_some());
2779 }
2780 }
2781
2782 #[tokio::test]
2783 async fn test_initialize_relayer_disables_when_validation_fails() {
2784 let (
2785 mut provider,
2786 mut relayer_repo,
2787 network_repo,
2788 tx_repo,
2789 mut job_producer,
2790 signer,
2791 mut counter,
2792 ) = setup_mocks();
2793 let mut relayer_model = create_test_relayer();
2794 relayer_model.system_disabled = false; relayer_model.notification_id = Some("test-notification-id".to_string());
2796
2797 provider
2799 .expect_get_transaction_count()
2800 .returning(|_| Box::pin(ready(Err(ProviderError::Other("RPC error".to_string())))));
2801
2802 counter
2803 .expect_get()
2804 .returning(|| Box::pin(ready(Ok(Some(0u64)))));
2805
2806 provider
2808 .expect_get_balance()
2809 .returning(|_| Box::pin(ready(Ok(U256::from(200000000000000000u64)))));
2810
2811 provider
2812 .expect_health_check()
2813 .returning(|| Box::pin(ready(Ok(true))));
2814
2815 let mut disabled_relayer = relayer_model.clone();
2817 disabled_relayer.system_disabled = true;
2818 relayer_repo
2819 .expect_disable_relayer()
2820 .with(eq("test-relayer-id".to_string()), always())
2821 .returning(move |_, _| Ok(disabled_relayer.clone()));
2822
2823 job_producer
2825 .expect_produce_send_notification_job()
2826 .returning(|_, _| Box::pin(ready(Ok(()))));
2827
2828 job_producer
2830 .expect_produce_relayer_health_check_job()
2831 .returning(|_, _| Box::pin(ready(Ok(()))));
2832
2833 let relayer = EvmRelayer::new(
2834 relayer_model,
2835 signer,
2836 provider,
2837 create_test_evm_network(),
2838 Arc::new(relayer_repo),
2839 Arc::new(network_repo),
2840 Arc::new(tx_repo),
2841 Arc::new(counter),
2842 Arc::new(job_producer),
2843 )
2844 .unwrap();
2845
2846 let result = relayer.initialize_relayer().await;
2847 assert!(result.is_ok());
2848 }
2849
2850 #[tokio::test]
2851 async fn test_initialize_relayer_enables_when_validation_passes_and_was_disabled() {
2852 let (
2853 mut provider,
2854 mut relayer_repo,
2855 network_repo,
2856 tx_repo,
2857 job_producer,
2858 signer,
2859 mut counter,
2860 ) = setup_mocks();
2861 let mut relayer_model = create_test_relayer();
2862 relayer_model.system_disabled = true; provider
2866 .expect_get_transaction_count()
2867 .returning(|_| Box::pin(ready(Ok(42u64))));
2868
2869 counter.expect_set().returning(|_| Box::pin(ready(Ok(()))));
2870
2871 counter
2872 .expect_get()
2873 .returning(|| Box::pin(ready(Ok(Some(42u64)))));
2874
2875 provider
2876 .expect_get_balance()
2877 .returning(|_| Box::pin(ready(Ok(U256::from(200000000000000000u64))))); provider
2880 .expect_health_check()
2881 .returning(|| Box::pin(ready(Ok(true))));
2882
2883 let mut enabled_relayer = relayer_model.clone();
2885 enabled_relayer.system_disabled = false;
2886 relayer_repo
2887 .expect_enable_relayer()
2888 .with(eq("test-relayer-id".to_string()))
2889 .returning(move |_| Ok(enabled_relayer.clone()));
2890
2891 let relayer = EvmRelayer::new(
2892 relayer_model,
2893 signer,
2894 provider,
2895 create_test_evm_network(),
2896 Arc::new(relayer_repo),
2897 Arc::new(network_repo),
2898 Arc::new(tx_repo),
2899 Arc::new(counter),
2900 Arc::new(job_producer),
2901 )
2902 .unwrap();
2903
2904 let result = relayer.initialize_relayer().await;
2905 assert!(result.is_ok());
2906 }
2907
2908 #[tokio::test]
2909 async fn test_initialize_relayer_no_action_when_enabled_and_validation_passes() {
2910 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, mut counter) =
2911 setup_mocks();
2912 let mut relayer_model = create_test_relayer();
2913 relayer_model.system_disabled = false; provider
2917 .expect_get_transaction_count()
2918 .returning(|_| Box::pin(ready(Ok(42u64))));
2919
2920 counter.expect_set().returning(|_| Box::pin(ready(Ok(()))));
2921
2922 counter
2923 .expect_get()
2924 .returning(|| Box::pin(ready(Ok(Some(42u64)))));
2925
2926 provider
2927 .expect_get_balance()
2928 .returning(|_| Box::pin(ready(Ok(U256::from(200000000000000000u64))))); provider
2931 .expect_health_check()
2932 .returning(|| Box::pin(ready(Ok(true))));
2933
2934 let relayer = EvmRelayer::new(
2937 relayer_model,
2938 signer,
2939 provider,
2940 create_test_evm_network(),
2941 Arc::new(relayer_repo),
2942 Arc::new(network_repo),
2943 Arc::new(tx_repo),
2944 Arc::new(counter),
2945 Arc::new(job_producer),
2946 )
2947 .unwrap();
2948
2949 let result = relayer.initialize_relayer().await;
2950 assert!(result.is_ok());
2951 }
2952
2953 #[tokio::test]
2954 async fn test_initialize_relayer_sends_notification_when_disabled() {
2955 let (
2956 mut provider,
2957 mut relayer_repo,
2958 network_repo,
2959 tx_repo,
2960 mut job_producer,
2961 signer,
2962 mut counter,
2963 ) = setup_mocks();
2964 let mut relayer_model = create_test_relayer();
2965 relayer_model.system_disabled = false; relayer_model.notification_id = Some("test-notification-id".to_string());
2967
2968 provider
2970 .expect_get_transaction_count()
2971 .returning(|_| Box::pin(ready(Ok(42u64))));
2972
2973 counter.expect_set().returning(|_| Box::pin(ready(Ok(()))));
2974
2975 counter
2976 .expect_get()
2977 .returning(|| Box::pin(ready(Ok(Some(42u64)))));
2978
2979 provider
2980 .expect_get_balance()
2981 .returning(|_| Box::pin(ready(Ok(U256::from(200000000000000000u64))))); provider.expect_health_check().returning(|| {
2984 Box::pin(ready(Err(ProviderError::Other(
2985 "RPC validation failed".to_string(),
2986 ))))
2987 });
2988
2989 let mut disabled_relayer = relayer_model.clone();
2991 disabled_relayer.system_disabled = true;
2992 relayer_repo
2993 .expect_disable_relayer()
2994 .with(eq("test-relayer-id".to_string()), always())
2995 .returning(move |_, _| Ok(disabled_relayer.clone()));
2996
2997 job_producer
2999 .expect_produce_send_notification_job()
3000 .returning(|_, _| Box::pin(ready(Ok(()))));
3001
3002 job_producer
3004 .expect_produce_relayer_health_check_job()
3005 .returning(|_, _| Box::pin(ready(Ok(()))));
3006
3007 let relayer = EvmRelayer::new(
3008 relayer_model,
3009 signer,
3010 provider,
3011 create_test_evm_network(),
3012 Arc::new(relayer_repo),
3013 Arc::new(network_repo),
3014 Arc::new(tx_repo),
3015 Arc::new(counter),
3016 Arc::new(job_producer),
3017 )
3018 .unwrap();
3019
3020 let result = relayer.initialize_relayer().await;
3021 assert!(result.is_ok());
3022 }
3023
3024 #[tokio::test]
3025 async fn test_initialize_relayer_no_notification_when_no_notification_id() {
3026 let (
3027 mut provider,
3028 mut relayer_repo,
3029 network_repo,
3030 tx_repo,
3031 mut job_producer,
3032 signer,
3033 mut counter,
3034 ) = setup_mocks();
3035 let mut relayer_model = create_test_relayer();
3036 relayer_model.system_disabled = false; relayer_model.notification_id = None; provider
3041 .expect_get_transaction_count()
3042 .returning(|_| Box::pin(ready(Ok(42u64))));
3043
3044 counter.expect_set().returning(|_| Box::pin(ready(Ok(()))));
3045
3046 counter
3047 .expect_get()
3048 .returning(|| Box::pin(ready(Ok(Some(42u64)))));
3049
3050 provider
3051 .expect_get_balance()
3052 .returning(|_| Box::pin(ready(Ok(U256::from(50000000000000000u64))))); provider
3055 .expect_health_check()
3056 .returning(|| Box::pin(ready(Ok(true))));
3057
3058 let mut disabled_relayer = relayer_model.clone();
3060 disabled_relayer.system_disabled = true;
3061 relayer_repo
3062 .expect_disable_relayer()
3063 .with(eq("test-relayer-id".to_string()), always())
3064 .returning(move |_, _| Ok(disabled_relayer.clone()));
3065
3066 job_producer
3069 .expect_produce_relayer_health_check_job()
3070 .returning(|_, _| Box::pin(ready(Ok(()))));
3071
3072 let relayer = EvmRelayer::new(
3073 relayer_model,
3074 signer,
3075 provider,
3076 create_test_evm_network(),
3077 Arc::new(relayer_repo),
3078 Arc::new(network_repo),
3079 Arc::new(tx_repo),
3080 Arc::new(counter),
3081 Arc::new(job_producer),
3082 )
3083 .unwrap();
3084
3085 let result = relayer.initialize_relayer().await;
3086 assert!(result.is_ok());
3087 }
3088}