1use rand::Rng;
24use std::collections::HashSet;
25use std::future::Future;
26use std::time::Duration;
27
28use super::rpc_selector::RpcSelector;
29use crate::config::ServerConfig;
30use crate::constants::RETRY_JITTER_PERCENT;
31use crate::metrics::RPC_CALL_LATENCY;
32use std::time::Instant;
33
34pub fn calculate_retry_delay(attempt: u8, base_delay_ms: u64, max_delay_ms: u64) -> Duration {
44 if base_delay_ms == 0 || max_delay_ms == 0 {
45 return Duration::from_millis(0);
46 }
47
48 let exp_backoff = if attempt > 63 {
50 max_delay_ms
51 } else {
52 let multiplier = 1u64.checked_shl(attempt as u32).unwrap_or(u64::MAX);
54 base_delay_ms.saturating_mul(multiplier)
55 };
56
57 let delay_ms = exp_backoff.min(max_delay_ms);
58
59 apply_jitter(delay_ms)
60}
61
62fn apply_jitter(delay_ms: u64) -> Duration {
74 if delay_ms == 0 {
75 return Duration::from_millis(0);
76 }
77
78 let jitter_range = (delay_ms as f64 * RETRY_JITTER_PERCENT).floor() as u64;
80
81 if jitter_range == 0 {
82 return Duration::from_millis(delay_ms);
83 }
84
85 let mut rng = rand::rng();
86 let jitter_value = rng.random_range(0..=jitter_range);
87
88 let final_delay = if rng.random_bool(0.5) {
89 delay_ms.saturating_add(jitter_value)
90 } else {
91 delay_ms.saturating_sub(jitter_value)
92 };
93
94 Duration::from_millis(final_delay)
95}
96
97#[derive(Debug)]
99enum InternalRetryError<E> {
100 NonRetriable(E),
101 RetriesExhausted(E),
102}
103
104#[derive(Debug, Clone)]
106pub struct RetryConfig {
107 pub max_retries: u8,
109 pub max_failovers: u8,
111 pub base_delay_ms: u64,
113 pub max_delay_ms: u64,
115}
116
117impl RetryConfig {
118 pub fn new(max_retries: u8, max_failovers: u8, base_delay_ms: u64, max_delay_ms: u64) -> Self {
130 if (base_delay_ms == 0) != (max_delay_ms == 0) {
132 panic!(
133 "Delay values must be consistent: both zero (no delays) or both non-zero. Got base_delay_ms={base_delay_ms}, max_delay_ms={max_delay_ms}"
134 );
135 }
136
137 if base_delay_ms > 0 && max_delay_ms > 0 && max_delay_ms < base_delay_ms {
139 panic!(
140 "max_delay_ms ({max_delay_ms}) must be >= base_delay_ms ({base_delay_ms}) when both are non-zero"
141 );
142 }
143
144 Self {
145 max_retries,
146 max_failovers,
147 base_delay_ms,
148 max_delay_ms,
149 }
150 }
151
152 pub fn from_env() -> Self {
154 let config = ServerConfig::from_env();
155 Self::new(
156 config.provider_max_retries,
157 config.provider_max_failovers,
158 config.provider_retry_base_delay_ms,
159 config.provider_retry_max_delay_ms,
160 )
161 }
162}
163
164pub async fn retry_rpc_call<P, T, E, F, Fut, I>(
194 selector: &RpcSelector,
195 operation_name: &str,
196 is_retriable_error: impl Fn(&E) -> bool,
197 should_mark_provider_failed: impl Fn(&E) -> bool,
198 provider_initializer: I,
199 operation: F,
200 config: Option<RetryConfig>,
201) -> Result<T, E>
202where
203 P: Clone,
204 E: std::fmt::Display + From<String>,
205 F: Fn(P) -> Fut,
206 Fut: Future<Output = Result<T, E>>,
207 I: Fn(&str) -> Result<P, E>,
208{
209 let config = config.unwrap_or_else(RetryConfig::from_env);
210 let total_providers = selector.provider_count();
211 let max_failovers = std::cmp::min(config.max_failovers as usize, total_providers - 1);
212 let mut failover_count = 0;
213 let mut total_attempts = 0;
214 let mut last_error = None;
215 let mut tried_urls = HashSet::new();
217
218 tracing::debug!(
219 operation_name = %operation_name,
220 max_retries = %config.max_retries,
221 max_failovers = %max_failovers,
222 total_providers = %total_providers,
223 "starting rpc call"
224 );
225
226 while failover_count <= max_failovers && selector.provider_count() > 0 {
228 let (provider, provider_url) =
230 match get_provider(selector, operation_name, &provider_initializer, &tried_urls) {
231 Ok((provider, url)) => {
232 tried_urls.insert(url.clone());
234 (provider, url)
235 }
236 Err(e) => {
237 last_error = Some(e);
238 failover_count += 1;
239
240 if failover_count > max_failovers || selector.provider_count() == 0 {
242 break;
243 }
244
245 selector.mark_current_as_failed();
247 continue;
248 }
249 };
250
251 tracing::debug!(
252 provider_url = %provider_url,
253 operation_name = %operation_name,
254 tried_providers = %tried_urls.len(),
255 "selected provider"
256 );
257
258 match try_with_retries(
260 &provider,
261 &provider_url,
262 operation_name,
263 &operation,
264 &is_retriable_error,
265 &config,
266 &mut total_attempts,
267 )
268 .await
269 {
270 Ok(result) => {
271 tracing::debug!(
272 operation_name = %operation_name,
273 provider_url = %provider_url,
274 total_attempts = %total_attempts,
275 "rpc call succeeded"
276 );
277 return Ok(result);
278 }
279 Err(internal_err) => {
280 match internal_err {
281 InternalRetryError::NonRetriable(original_err) => {
282 if should_mark_provider_failed(&original_err) {
284 tracing::warn!(
285 error = %original_err,
286 provider_url = %provider_url,
287 operation_name = %operation_name,
288 "non-retriable error should mark provider as failed, marking as failed and switching to next provider"
289 );
290 selector.mark_current_as_failed();
291 }
292 return Err(original_err);
293 }
294 InternalRetryError::RetriesExhausted(original_err) => {
295 last_error = Some(original_err);
296
297 tracing::warn!(
299 max_retries = %config.max_retries,
300 provider_url = %provider_url,
301 operation_name = %operation_name,
302 error = %last_error.as_ref().unwrap(),
303 failover_count = %(failover_count + 1),
304 max_failovers = %max_failovers,
305 "all retry attempts failed, marking as failed and switching to next provider"
306 );
307 selector.mark_current_as_failed();
308 failover_count += 1;
309 }
310 }
311 }
312 }
313 }
314
315 match &last_error {
316 Some(e) => {
317 tracing::error!(
318 operation_name = %operation_name,
319 total_attempts = %total_attempts,
320 failover_count = %failover_count,
321 error = %e,
322 "rpc call failed after attempts across providers"
323 );
324 }
325 None => {
326 tracing::error!(
327 operation_name = %operation_name,
328 total_attempts = %total_attempts,
329 failover_count = %failover_count,
330 "rpc call failed after attempts across providers with no error details"
331 );
332 }
333 }
334
335 let error_message = match &last_error {
336 Some(e) => format!(
337 "RPC call '{operation_name}' failed after {total_attempts} total attempts across {failover_count} providers: {e}"
338 ),
339 None => format!(
340 "RPC call '{operation_name}' failed after {total_attempts} total attempts across {failover_count} providers with no error details"
341 )
342 };
343
344 Err(last_error.unwrap_or_else(|| E::from(error_message)))
346}
347
348fn get_provider<P, E, I>(
350 selector: &RpcSelector,
351 operation_name: &str,
352 provider_initializer: &I,
353 excluded_urls: &HashSet<String>,
354) -> Result<(P, String), E>
355where
356 E: std::fmt::Display + From<String>,
357 I: Fn(&str) -> Result<P, E>,
358{
359 let provider_url = selector
361 .get_client(|url| Ok::<_, eyre::Report>(url.to_string()), excluded_urls)
362 .map_err(|e| {
363 let err_msg = format!("Failed to get provider URL for {operation_name}: {e}");
364 tracing::warn!(operation_name = %operation_name, error = %e, "failed to get provider url");
365 E::from(err_msg)
366 })?;
367
368 let provider = provider_initializer(&provider_url).map_err(|e| {
370 tracing::warn!(
371 provider_url = %provider_url,
372 operation_name = %operation_name,
373 error = %e,
374 "failed to initialize provider"
375 );
376 e
377 })?;
378
379 Ok((provider, provider_url))
380}
381
382async fn try_with_retries<P, T, E, F, Fut>(
384 provider: &P,
385 provider_url: &str,
386 operation_name: &str,
387 operation: &F,
388 is_retriable_error: &impl Fn(&E) -> bool,
389 config: &RetryConfig,
390 total_attempts: &mut usize,
391) -> Result<T, InternalRetryError<E>>
392where
393 P: Clone,
394 E: std::fmt::Display + From<String>,
395 F: Fn(P) -> Fut,
396 Fut: Future<Output = Result<T, E>>,
397{
398 if config.max_retries <= 1 {
400 *total_attempts += 1;
401 let start_time = Instant::now();
402 let result = operation(provider.clone())
403 .await
404 .map_err(InternalRetryError::NonRetriable);
405
406 if result.is_ok() {
408 let latency_seconds = start_time.elapsed().as_secs_f64();
409 RPC_CALL_LATENCY
410 .with_label_values(&["unknown", "unknown", operation_name])
411 .observe(latency_seconds);
412 }
413
414 return result;
415 }
416
417 for current_attempt_idx in 0..config.max_retries {
418 *total_attempts += 1;
419
420 let start_time = Instant::now();
422
423 match operation(provider.clone()).await {
424 Ok(result) => {
425 let latency_seconds = start_time.elapsed().as_secs_f64();
427 RPC_CALL_LATENCY
428 .with_label_values(&["unknown", "unknown", operation_name])
429 .observe(latency_seconds);
430
431 tracing::debug!(
432 operation_name = %operation_name,
433 provider_url = %provider_url,
434 attempt = %(current_attempt_idx + 1),
435 max_retries = %config.max_retries,
436 total_attempts = %*total_attempts,
437 latency_seconds = %latency_seconds,
438 "rpc call succeeded"
439 );
440 return Ok(result);
441 }
442 Err(e) => {
443 let is_retriable = is_retriable_error(&e);
444 let is_last_attempt = current_attempt_idx + 1 >= config.max_retries;
445
446 tracing::warn!(
447 operation_name = %operation_name,
448 provider_url = %provider_url,
449 attempt = %(current_attempt_idx + 1),
450 max_retries = %config.max_retries,
451 error = %e,
452 retriable = %is_retriable,
453 "rpc call failed (will retry if retriable)"
454 );
455
456 if !is_retriable {
457 return Err(InternalRetryError::NonRetriable(e));
458 }
459
460 if is_last_attempt {
461 tracing::warn!(
462 max_retries = %config.max_retries,
463 operation_name = %operation_name,
464 provider_url = %provider_url,
465 error = %e,
466 "all retries exhausted"
467 );
468 return Err(InternalRetryError::RetriesExhausted(e));
469 }
470
471 let delay = calculate_retry_delay(
473 current_attempt_idx + 1,
474 config.base_delay_ms,
475 config.max_delay_ms,
476 );
477
478 tracing::debug!(
479 operation_name = %operation_name,
480 provider_url = %provider_url,
481 delay = ?delay,
482 next_attempt = %(current_attempt_idx + 2),
483 max_retries = %config.max_retries,
484 "retrying rpc call after delay"
485 );
486 tokio::time::sleep(delay).await;
487 }
488 }
489 }
490
491 unreachable!(
492 "Loop should have returned if max_retries > 1; max_retries=0 or 1 case is handled above."
493 );
494}
495
496#[cfg(test)]
497mod tests {
498 use super::*;
499 use crate::models::RpcConfig;
500 use crate::services::provider::rpc_health_store::RpcHealthStore;
501 use lazy_static::lazy_static;
502 use serial_test::serial;
503 use std::cmp::Ordering;
504 use std::collections::HashSet;
505 use std::env;
506 use std::sync::atomic::{AtomicU8, Ordering as AtomicOrdering};
507 use std::sync::Arc;
508 use std::sync::Mutex;
509
510 lazy_static! {
512 static ref RETRY_TEST_ENV_MUTEX: Mutex<()> = Mutex::new(());
513 }
514
515 #[derive(Debug, Clone)]
517 struct TestError(String);
518
519 impl std::fmt::Display for TestError {
520 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
521 write!(f, "TestError: {}", self.0)
522 }
523 }
524
525 impl From<String> for TestError {
526 fn from(msg: String) -> Self {
527 TestError(msg)
528 }
529 }
530
531 struct EnvGuard {
533 keys: Vec<String>,
534 old_values: Vec<Option<String>>,
535 }
536
537 impl EnvGuard {
538 fn new() -> Self {
539 Self {
540 keys: Vec::new(),
541 old_values: Vec::new(),
542 }
543 }
544
545 fn set(&mut self, key: &str, value: &str) {
546 let old_value = env::var(key).ok();
547 self.keys.push(key.to_string());
548 self.old_values.push(old_value);
549 env::set_var(key, value);
550 }
551 }
552
553 impl Drop for EnvGuard {
554 fn drop(&mut self) {
555 for i in 0..self.keys.len() {
556 match &self.old_values[i] {
557 Some(value) => env::set_var(&self.keys[i], value),
558 None => env::remove_var(&self.keys[i]),
559 }
560 }
561 }
562 }
563
564 fn setup_test_env() -> EnvGuard {
566 let mut guard = EnvGuard::new();
567 guard.set("API_KEY", "fake-api-key-for-tests-01234567890123456789");
568 guard.set("PROVIDER_MAX_RETRIES", "2");
569 guard.set("PROVIDER_MAX_FAILOVERS", "1");
570 guard.set("PROVIDER_RETRY_BASE_DELAY_MS", "1");
571 guard.set("PROVIDER_RETRY_MAX_DELAY_MS", "5");
572 guard.set("PROVIDER_FAILURE_THRESHOLD", "1");
573 guard.set("REDIS_URL", "redis://localhost:6379");
574 guard.set(
575 "RELAYER_PRIVATE_KEY",
576 "0x1234567890123456789012345678901234567890123456789012345678901234",
577 );
578 RpcHealthStore::instance().clear_all();
581 guard
582 }
583
584 #[test]
585 fn test_calculate_retry_delay() {
586 let base_delay_ms = 10;
588 let max_delay_ms = 10000;
589
590 let expected_backoffs = [
591 10, 20, 40, 80, 160, 320, ];
598
599 for (i, expected) in expected_backoffs.iter().enumerate() {
600 let attempt = i as u8;
601 let delay = calculate_retry_delay(attempt, base_delay_ms, max_delay_ms);
602
603 let min_expected = (*expected as f64 * (1.0 - RETRY_JITTER_PERCENT)).floor() as u128;
604 let max_expected = (*expected as f64 * (1.0 + RETRY_JITTER_PERCENT)).ceil() as u128;
605
606 assert!(
607 (min_expected..=max_expected).contains(&delay.as_millis()),
608 "Delay {} outside expected range {}..={}",
609 delay.as_millis(),
610 min_expected,
611 max_expected
612 );
613 }
614
615 let base_delay_ms = 100;
617 let max_delay_ms = 1000;
618 let delay = calculate_retry_delay(4, base_delay_ms, max_delay_ms);
619 let min_expected = (max_delay_ms as f64 * (1.0 - RETRY_JITTER_PERCENT)).floor() as u128;
620 let max_expected = (max_delay_ms as f64 * (1.0 + RETRY_JITTER_PERCENT)).ceil() as u128;
621 assert!(
622 (min_expected..=max_expected).contains(&delay.as_millis()),
623 "Delay {} outside expected range {}..={}",
624 delay.as_millis(),
625 min_expected,
626 max_expected
627 );
628
629 assert_eq!(calculate_retry_delay(5, 0, 1000).as_millis(), 0);
631 assert_eq!(calculate_retry_delay(5, 100, 0).as_millis(), 0);
632 assert_eq!(calculate_retry_delay(5, 0, 0).as_millis(), 0);
633
634 let max_delay_ms = 10_000;
636 let delay = calculate_retry_delay(u8::MAX, 1, max_delay_ms);
637 assert!(
638 delay.as_millis()
639 <= (max_delay_ms as f64 * (1.0 + RETRY_JITTER_PERCENT)).ceil() as u128
640 );
641 }
642
643 #[test]
644 fn test_apply_jitter() {
645 let base_delay = 1000;
646 let jittered = apply_jitter(base_delay);
647
648 let min_expected = (base_delay as f64 * (1.0 - RETRY_JITTER_PERCENT)).floor() as u64;
649 let max_expected = (base_delay as f64 * (1.0 + RETRY_JITTER_PERCENT)).ceil() as u64;
650
651 assert!(
652 (min_expected as u128..=max_expected as u128).contains(&jittered.as_millis()),
653 "Jittered value {} outside expected range {}..={}",
654 jittered.as_millis(),
655 min_expected,
656 max_expected
657 );
658
659 assert_eq!(apply_jitter(0).as_millis(), 0);
661
662 for delay in 1..5 {
664 let jittered = apply_jitter(delay);
665 let jitter_range = (delay as f64 * RETRY_JITTER_PERCENT).floor() as u64;
666
667 if jitter_range == 0 {
668 assert_eq!(jittered.as_millis(), delay as u128);
669 } else {
670 let min_expected = delay.saturating_sub(jitter_range);
671 let max_expected = delay.saturating_add(jitter_range);
672 assert!(
673 (min_expected as u128..=max_expected as u128).contains(&jittered.as_millis()),
674 "Jittered value {} outside expected range {}..={}",
675 jittered.as_millis(),
676 min_expected,
677 max_expected
678 );
679 }
680 }
681
682 let base_delay = 10000;
683 let iterations = 200;
684 let mut additions = 0;
685 let mut subtractions = 0;
686
687 for _ in 0..iterations {
688 let jittered = apply_jitter(base_delay);
689 let j_millis = jittered.as_millis();
690 let b_delay = base_delay as u128;
691
692 match j_millis.cmp(&b_delay) {
693 Ordering::Greater => {
694 additions += 1;
695 }
696 Ordering::Less => {
697 subtractions += 1;
698 }
699 Ordering::Equal => {}
700 }
701 }
702
703 assert!(additions > 0, "No additions were observed");
704 assert!(subtractions > 0, "No subtractions were observed");
705 }
706
707 #[test]
708 fn test_retry_config() {
709 let config = RetryConfig::new(5, 2, 100, 5000);
710 assert_eq!(config.max_retries, 5);
711 assert_eq!(config.max_failovers, 2);
712 assert_eq!(config.base_delay_ms, 100);
713 assert_eq!(config.max_delay_ms, 5000);
714 }
715
716 #[test]
717 fn test_retry_config_from_env() {
718 let _lock = RETRY_TEST_ENV_MUTEX
719 .lock()
720 .unwrap_or_else(|e| e.into_inner());
721 let mut guard = setup_test_env();
722 guard.set("REDIS_URL", "redis://localhost:6379");
724 guard.set(
725 "RELAYER_PRIVATE_KEY",
726 "0x1234567890123456789012345678901234567890123456789012345678901234",
727 );
728
729 let config = RetryConfig::from_env();
730 assert_eq!(config.max_retries, 2);
731 assert_eq!(config.max_failovers, 1);
732 assert_eq!(config.base_delay_ms, 1);
733 assert_eq!(config.max_delay_ms, 5);
734 }
735
736 #[test]
737 fn test_calculate_retry_delay_edge_cases() {
738 let delay = calculate_retry_delay(0, 100, 1000);
740 let min_expected = (100.0 * (1.0 - RETRY_JITTER_PERCENT)).floor() as u128;
741 let max_expected = (100.0 * (1.0 + RETRY_JITTER_PERCENT)).ceil() as u128;
742 assert!(
743 (min_expected..=max_expected).contains(&delay.as_millis()),
744 "Delay {} outside expected range {}..={}",
745 delay.as_millis(),
746 min_expected,
747 max_expected
748 );
749
750 let delay = calculate_retry_delay(5, 100, 100);
752 let min_expected = (100.0 * (1.0 - RETRY_JITTER_PERCENT)).floor() as u128;
753 let max_expected = (100.0 * (1.0 + RETRY_JITTER_PERCENT)).ceil() as u128;
754 assert!(
755 (min_expected..=max_expected).contains(&delay.as_millis()),
756 "Delay {} outside expected range {}..={}",
757 delay.as_millis(),
758 min_expected,
759 max_expected
760 );
761
762 let delay = calculate_retry_delay(60, 1000, u64::MAX);
764 assert!(delay.as_millis() > 0);
765
766 let delay = calculate_retry_delay(1, 1, 1);
768 assert_eq!(delay.as_millis(), 1);
769 }
770
771 #[test]
772 fn test_retry_config_validation() {
773 let _config = RetryConfig::new(3, 1, 100, 1000);
775 let _config = RetryConfig::new(3, 1, 0, 0); let _config = RetryConfig::new(3, 1, 100, 100); let _config = RetryConfig::new(0, 0, 1, 1); let _config = RetryConfig::new(255, 255, 1, 1000); }
780
781 #[test]
782 #[should_panic(
783 expected = "max_delay_ms (50) must be >= base_delay_ms (100) when both are non-zero"
784 )]
785 fn test_retry_config_validation_panic_delay_ordering() {
786 let _config = RetryConfig::new(3, 1, 100, 50);
788 }
789
790 #[test]
791 #[should_panic(
792 expected = "Delay values must be consistent: both zero (no delays) or both non-zero"
793 )]
794 fn test_retry_config_validation_panic_inconsistent_delays_base_zero() {
795 let _config = RetryConfig::new(3, 1, 0, 1000);
797 }
798
799 #[test]
800 #[should_panic(
801 expected = "Delay values must be consistent: both zero (no delays) or both non-zero"
802 )]
803 fn test_retry_config_validation_panic_inconsistent_delays_max_zero() {
804 let _config = RetryConfig::new(3, 1, 100, 0);
806 }
807
808 #[test]
809 fn test_get_provider() {
810 let _guard = setup_test_env();
811
812 let configs = vec![
813 RpcConfig::new("http://localhost:8545".to_string()),
814 RpcConfig::new("http://localhost:8546".to_string()),
815 ];
816 let selector = RpcSelector::new_with_defaults(configs).expect("Failed to create selector");
817
818 let initializer =
819 |url: &str| -> Result<String, TestError> { Ok(format!("provider-{}", url)) };
820
821 let result = get_provider(&selector, "test_operation", &initializer, &HashSet::new());
822 assert!(result.is_ok());
823 let (provider, url) = result.unwrap();
824 assert!(url == "http://localhost:8545" || url == "http://localhost:8546");
826 assert_eq!(provider, format!("provider-{}", url));
827
828 let initializer = |_: &str| -> Result<String, TestError> {
829 Err(TestError("Failed to initialize".to_string()))
830 };
831
832 let result = get_provider(&selector, "test_operation", &initializer, &HashSet::new());
833 assert!(result.is_err());
834 let err = result.unwrap_err();
835 assert!(format!("{}", err).contains("Failed to initialize"));
836 }
837
838 #[tokio::test]
839 async fn test_try_with_retries() {
840 let provider = "test_provider".to_string();
841 let provider_url = "http://localhost:8545";
842 let mut total_attempts = 0;
843 let config = RetryConfig::new(3, 1, 5, 10);
844
845 let operation = |p: String| async move {
846 assert_eq!(p, "test_provider");
847 Ok::<_, TestError>(42)
848 };
849
850 let result = try_with_retries(
851 &provider,
852 provider_url,
853 "test_operation",
854 &operation,
855 &|_| false,
856 &config,
857 &mut total_attempts,
858 )
859 .await;
860
861 assert!(result.is_ok());
862 assert_eq!(result.unwrap(), 42);
863 assert_eq!(total_attempts, 1);
864
865 let attempts = Arc::new(AtomicU8::new(0));
866 let attempts_clone = attempts.clone();
867 let operation = move |_: String| {
868 let attempts = attempts_clone.clone();
869 async move {
870 let current = attempts.fetch_add(1, AtomicOrdering::SeqCst);
871 if current < 2 {
872 Err(TestError("Retriable error".to_string()))
873 } else {
874 Ok(42)
875 }
876 }
877 };
878
879 let mut total_attempts = 0;
880 let result = try_with_retries(
881 &provider,
882 provider_url,
883 "test_operation",
884 &operation,
885 &|_| true,
886 &config,
887 &mut total_attempts,
888 )
889 .await;
890
891 assert!(result.is_ok());
892 assert_eq!(result.unwrap(), 42);
893 assert_eq!(total_attempts, 3);
894
895 let operation = |_: String| async { Err(TestError("Non-retriable error".to_string())) };
897
898 let mut total_attempts = 0;
899 let result: Result<i32, InternalRetryError<TestError>> = try_with_retries(
900 &provider,
901 provider_url,
902 "test_operation",
903 &operation,
904 &|_| false,
905 &config,
906 &mut total_attempts,
907 )
908 .await;
909
910 assert!(result.is_err());
911 assert_eq!(total_attempts, 1);
912 let err = result.unwrap_err();
913 assert!(matches!(err, InternalRetryError::NonRetriable(_)));
914
915 let operation = |_: String| async { Err(TestError("Always fails".to_string())) };
917
918 let mut total_attempts = 0;
919 let result: Result<i32, InternalRetryError<TestError>> = try_with_retries(
920 &provider,
921 provider_url,
922 "test_operation",
923 &operation,
924 &|_| true,
925 &config,
926 &mut total_attempts,
927 )
928 .await;
929
930 assert!(result.is_err());
931 assert_eq!(total_attempts, 3); let error = result.unwrap_err();
933 assert!(matches!(error, InternalRetryError::RetriesExhausted(_)));
934 }
935
936 #[tokio::test]
937 async fn test_try_with_retries_max_retries_zero() {
938 let provider = "test_provider".to_string();
939 let provider_url = "http://localhost:8545";
940 let mut total_attempts = 0;
941 let config = RetryConfig::new(0, 1, 5, 10);
942
943 let operation = |_p: String| async move { Ok::<_, TestError>(42) };
945
946 let result = try_with_retries(
947 &provider,
948 provider_url,
949 "test_operation",
950 &operation,
951 &|_| false,
952 &config,
953 &mut total_attempts,
954 )
955 .await;
956
957 assert!(result.is_ok());
958 assert_eq!(result.unwrap(), 42);
959
960 let operation = |_: String| async { Err(TestError("Always fails".to_string())) };
962
963 let mut total_attempts = 0;
964 let result: Result<i32, InternalRetryError<TestError>> = try_with_retries(
965 &provider,
966 provider_url,
967 "test_operation",
968 &operation,
969 &|_| true,
970 &config,
971 &mut total_attempts,
972 )
973 .await;
974
975 assert!(result.is_err());
976 let error = result.unwrap_err();
977 assert!(matches!(error, InternalRetryError::NonRetriable(_))); }
979
980 #[tokio::test]
981 async fn test_try_with_retries_max_retries_one() {
982 let provider = "test_provider".to_string();
983 let provider_url = "http://localhost:8545";
984 let mut total_attempts = 0;
985 let config = RetryConfig::new(1, 1, 5, 10);
986
987 let operation = |p: String| async move {
989 assert_eq!(p, "test_provider");
990 Ok::<_, TestError>(42)
991 };
992
993 let result = try_with_retries(
994 &provider,
995 provider_url,
996 "test_operation",
997 &operation,
998 &|_| false,
999 &config,
1000 &mut total_attempts,
1001 )
1002 .await;
1003
1004 assert!(result.is_ok());
1005 assert_eq!(result.unwrap(), 42);
1006
1007 let operation = |_: String| async { Err(TestError("Always fails".to_string())) };
1009
1010 let mut total_attempts = 0;
1011 let result: Result<i32, InternalRetryError<TestError>> = try_with_retries(
1012 &provider,
1013 provider_url,
1014 "test_operation",
1015 &operation,
1016 &|_| true,
1017 &config,
1018 &mut total_attempts,
1019 )
1020 .await;
1021
1022 assert!(result.is_err());
1023 let error = result.unwrap_err();
1024 assert!(matches!(error, InternalRetryError::NonRetriable(_))); }
1026
1027 #[tokio::test]
1028 #[serial]
1029 async fn test_non_retriable_error_does_not_mark_provider_failed() {
1030 let _guard = setup_test_env();
1031 RpcHealthStore::instance().clear_all();
1032
1033 let configs = vec![
1035 RpcConfig::new("http://localhost:9986".to_string()),
1036 RpcConfig::new("http://localhost:9985".to_string()),
1037 ];
1038 let selector = RpcSelector::new_with_defaults(configs).expect("Failed to create selector");
1039
1040 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1041
1042 let operation =
1044 |_provider: String| async move { Err(TestError("Non-retriable error".to_string())) };
1045
1046 let config = RetryConfig::new(3, 1, 0, 0);
1047
1048 let initial_available_count = selector.available_provider_count();
1050 assert_eq!(
1051 initial_available_count, 2,
1052 "Both providers should be available after clearing"
1053 );
1054
1055 let result: Result<i32, TestError> = retry_rpc_call(
1056 &selector,
1057 "test_operation",
1058 |_| false, |_| false, provider_initializer,
1061 operation,
1062 Some(config),
1063 )
1064 .await;
1065
1066 assert!(result.is_err());
1067
1068 let final_available_count = selector.available_provider_count();
1070 assert_eq!(
1071 initial_available_count, final_available_count,
1072 "Provider count should remain the same for non-retriable errors"
1073 );
1074 }
1075
1076 #[tokio::test]
1077 #[serial]
1078 async fn test_retriable_error_marks_provider_failed_after_retries_exhausted() {
1079 let _guard = setup_test_env();
1080 RpcHealthStore::instance().clear_all();
1081
1082 let configs = vec![
1084 RpcConfig::new("http://localhost:9984".to_string()),
1085 RpcConfig::new("http://localhost:9983".to_string()),
1086 ];
1087 let selector = RpcSelector::new_with_defaults(configs).expect("Failed to create selector");
1088
1089 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1090
1091 let operation = |_provider: String| async { Err(TestError("Retriable error".to_string())) };
1093
1094 let config = RetryConfig::new(2, 1, 0, 0); let initial_available_count = selector.available_provider_count();
1098 assert_eq!(
1099 initial_available_count, 2,
1100 "Both providers should be available after clearing"
1101 );
1102
1103 let result: Result<i32, TestError> = retry_rpc_call(
1104 &selector,
1105 "test_operation",
1106 |_| true, |_| true, provider_initializer,
1109 operation,
1110 Some(config),
1111 )
1112 .await;
1113
1114 assert!(result.is_err());
1115
1116 let final_available_count = selector.available_provider_count();
1118 assert!(final_available_count < initial_available_count,
1119 "At least one provider should be marked as failed after retriable errors exhaust retries");
1120 }
1121
1122 #[tokio::test]
1123 #[serial]
1124 async fn test_retry_rpc_call_success() {
1125 let _guard = setup_test_env();
1126 RpcHealthStore::instance().clear_all();
1127
1128 let configs = vec![
1129 RpcConfig::new("http://localhost:8545".to_string()),
1130 RpcConfig::new("http://localhost:8546".to_string()),
1131 ];
1132 let selector = RpcSelector::new_with_defaults(configs).expect("Failed to create selector");
1133
1134 let attempts = Arc::new(AtomicU8::new(0));
1135 let attempts_clone = attempts.clone();
1136
1137 let provider_initializer =
1138 |_url: &str| -> Result<String, TestError> { Ok("mock_provider".to_string()) };
1139
1140 let operation = move |_provider: String| {
1141 let attempts = attempts_clone.clone();
1142 async move {
1143 attempts.fetch_add(1, AtomicOrdering::SeqCst);
1144 Ok::<_, TestError>(42)
1145 }
1146 };
1147
1148 let config = RetryConfig::new(1, 1, 0, 0);
1149
1150 let result = retry_rpc_call(
1151 &selector,
1152 "test_operation",
1153 |_| false, |_| false, provider_initializer,
1156 operation,
1157 Some(config),
1158 )
1159 .await;
1160
1161 assert!(result.is_ok(), "Expected OK result but got: {:?}", result);
1162 assert_eq!(result.unwrap(), 42);
1163 assert_eq!(attempts.load(AtomicOrdering::SeqCst), 1); }
1165
1166 #[tokio::test]
1167 #[serial]
1168 async fn test_retry_rpc_call_with_provider_failover() {
1169 let _guard = setup_test_env();
1170 RpcHealthStore::instance().clear_all();
1171
1172 let configs = vec![
1173 RpcConfig::new("http://localhost:8545".to_string()),
1174 RpcConfig::new("http://localhost:8546".to_string()),
1175 ];
1176 let selector = RpcSelector::new_with_defaults(configs).expect("Failed to create selector");
1177
1178 let current_provider = Arc::new(Mutex::new(String::new()));
1179 let current_provider_clone = current_provider.clone();
1180
1181 let provider_initializer = move |url: &str| -> Result<String, TestError> {
1182 let mut provider = current_provider_clone.lock().unwrap();
1183 *provider = url.to_string();
1184 Ok(url.to_string())
1185 };
1186
1187 let operation = move |provider: String| async move {
1188 if provider.contains("8545") {
1189 Err(TestError("First provider error".to_string()))
1190 } else {
1191 Ok(42)
1192 }
1193 };
1194
1195 let config = RetryConfig::new(2, 1, 0, 0); let result = retry_rpc_call(
1198 &selector,
1199 "test_operation",
1200 |_| true, |_| true, provider_initializer,
1203 operation,
1204 Some(config),
1205 )
1206 .await;
1207
1208 assert!(result.is_ok(), "Expected OK result but got: {:?}", result);
1209 assert_eq!(result.unwrap(), 42);
1210
1211 let final_provider = current_provider.lock().unwrap().clone();
1213 assert!(
1214 final_provider.contains("8546"),
1215 "Wrong provider selected: {}",
1216 final_provider
1217 );
1218 }
1219
1220 #[tokio::test]
1221 #[serial]
1222 async fn test_retry_rpc_call_all_providers_fail() {
1223 let _guard = setup_test_env();
1224 RpcHealthStore::instance().clear_all();
1225
1226 let configs = vec![
1227 RpcConfig::new("http://localhost:8545".to_string()),
1228 RpcConfig::new("http://localhost:8546".to_string()),
1229 ];
1230 let selector = RpcSelector::new_with_defaults(configs).expect("Failed to create selector");
1231
1232 let provider_initializer =
1233 |_: &str| -> Result<String, TestError> { Ok("mock_provider".to_string()) };
1234
1235 let operation = |_: String| async { Err(TestError("Always fails".to_string())) };
1236
1237 let config = RetryConfig::new(2, 1, 0, 0); let result: Result<i32, TestError> = retry_rpc_call(
1240 &selector,
1241 "test_operation",
1242 |_| true, |_| false, provider_initializer,
1245 operation,
1246 Some(config),
1247 )
1248 .await;
1249
1250 assert!(result.is_err(), "Expected an error but got: {:?}", result);
1251 }
1252
1253 #[tokio::test]
1254 async fn test_retry_rpc_call_with_default_config() {
1255 let (_guard, selector) = {
1256 let _lock = RETRY_TEST_ENV_MUTEX
1257 .lock()
1258 .unwrap_or_else(|e| e.into_inner());
1259 let guard = setup_test_env();
1260
1261 let configs = vec![RpcConfig::new("http://localhost:8545".to_string())];
1262 let selector =
1263 RpcSelector::new_with_defaults(configs).expect("Failed to create selector");
1264 (guard, selector)
1265 };
1266
1267 let provider_initializer =
1268 |_url: &str| -> Result<String, TestError> { Ok("mock_provider".to_string()) };
1269
1270 let operation = |_provider: String| async move { Ok::<_, TestError>(42) };
1271
1272 let result = retry_rpc_call(
1274 &selector,
1275 "test_operation",
1276 |_| false,
1277 |_| false,
1278 provider_initializer,
1279 operation,
1280 None, )
1282 .await;
1283
1284 assert!(result.is_ok());
1285 assert_eq!(result.unwrap(), 42);
1286 }
1287
1288 #[tokio::test]
1289 #[serial]
1290 async fn test_retry_rpc_call_provider_initialization_failures() {
1291 let _guard = setup_test_env();
1292 RpcHealthStore::instance().clear_all();
1293
1294 let configs = vec![
1296 RpcConfig::new("http://localhost:9988".to_string()),
1297 RpcConfig::new("http://localhost:9987".to_string()),
1298 ];
1299 let selector = RpcSelector::new_with_defaults(configs).expect("Failed to create selector");
1300
1301 let attempt_count = Arc::new(AtomicU8::new(0));
1302 let attempt_count_clone = attempt_count.clone();
1303
1304 let attempted_urls = Arc::new(std::sync::Mutex::new(Vec::new()));
1306 let attempted_urls_clone = attempted_urls.clone();
1307
1308 let provider_initializer = move |url: &str| -> Result<String, TestError> {
1311 let count = attempt_count_clone.fetch_add(1, AtomicOrdering::SeqCst);
1312 attempted_urls_clone.lock().unwrap().push(url.to_string());
1313 if count == 0 {
1314 Err(TestError("First provider init failed".to_string()))
1316 } else {
1317 Ok(url.to_string())
1318 }
1319 };
1320
1321 let operation = |_provider: String| async move { Ok::<_, TestError>(42) };
1322
1323 let config = RetryConfig::new(2, 1, 0, 0);
1324
1325 let result = retry_rpc_call(
1326 &selector,
1327 "test_operation",
1328 |_| true,
1329 |_| false,
1330 provider_initializer,
1331 operation,
1332 Some(config),
1333 )
1334 .await;
1335
1336 assert!(result.is_ok());
1337 assert_eq!(result.unwrap(), 42);
1338
1339 let final_count = attempt_count.load(AtomicOrdering::SeqCst);
1341 assert_eq!(
1342 final_count, 2,
1343 "Expected exactly 2 provider init attempts, got {}",
1344 final_count
1345 );
1346
1347 let urls = attempted_urls.lock().unwrap();
1349 assert_eq!(urls.len(), 2, "Expected 2 URLs attempted, got {:?}", urls);
1350 assert_ne!(
1351 urls[0], urls[1],
1352 "Expected different URLs to be tried, got {:?}",
1353 urls
1354 );
1355 }
1356
1357 #[test]
1358 fn test_get_provider_selector_errors() {
1359 let _guard = setup_test_env();
1360
1361 let configs = vec![RpcConfig::new("http://localhost:8545".to_string())];
1363 let selector = RpcSelector::new_with_defaults(configs).expect("Failed to create selector");
1364
1365 let _ = selector.get_current_url().unwrap(); selector.mark_current_as_failed(); let provider_initializer =
1370 |url: &str| -> Result<String, TestError> { Ok(format!("provider-{}", url)) };
1371
1372 let result = get_provider(
1375 &selector,
1376 "test_operation",
1377 &provider_initializer,
1378 &HashSet::new(),
1379 );
1380 assert!(result.is_ok());
1381 let (provider, url) = result.unwrap();
1382 assert_eq!(url, "http://localhost:8545");
1383 assert_eq!(provider, "provider-http://localhost:8545");
1384 }
1385
1386 #[tokio::test]
1387 #[serial]
1388 async fn test_last_provider_never_marked_as_failed() {
1389 let _guard = setup_test_env();
1390 RpcHealthStore::instance().clear_all();
1391
1392 let unique_url = "http://localhost:9999".to_string();
1394 let configs = vec![RpcConfig::new(unique_url.clone())];
1395 let selector = RpcSelector::new_with_defaults(configs).expect("Failed to create selector");
1396
1397 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1398
1399 let operation = |_provider: String| async { Err(TestError("Always fails".to_string())) };
1401
1402 let config = RetryConfig::new(2, 1, 0, 0); let initial_available_count = selector.available_provider_count();
1406 assert_eq!(
1407 initial_available_count, 1,
1408 "Provider should be available after clearing health store"
1409 );
1410
1411 let result: Result<i32, TestError> = retry_rpc_call(
1412 &selector,
1413 "test_operation",
1414 |_| true, |_| true, provider_initializer,
1417 operation,
1418 Some(config),
1419 )
1420 .await;
1421
1422 assert!(result.is_err());
1423
1424 let final_available_count = selector.available_provider_count();
1426 assert_eq!(
1427 final_available_count, 0,
1428 "Provider should be marked as failed, but selector can still use paused providers"
1429 );
1430 }
1431
1432 #[tokio::test]
1433 #[serial]
1434 async fn test_last_provider_behavior_with_multiple_providers() {
1435 let _guard = setup_test_env();
1436 RpcHealthStore::instance().clear_all();
1437
1438 let configs = vec![
1440 RpcConfig::new("http://localhost:9991".to_string()),
1441 RpcConfig::new("http://localhost:9990".to_string()),
1442 RpcConfig::new("http://localhost:9989".to_string()),
1443 ];
1444 let selector = RpcSelector::new_with_defaults(configs).expect("Failed to create selector");
1445
1446 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1447
1448 let operation = |_provider: String| async { Err(TestError("Always fails".to_string())) };
1450
1451 let config = RetryConfig::new(2, 2, 0, 0); let initial_available_count = selector.available_provider_count();
1455 assert_eq!(
1456 initial_available_count, 3,
1457 "All 3 providers should be available after clearing"
1458 );
1459
1460 let result: Result<i32, TestError> = retry_rpc_call(
1461 &selector,
1462 "test_operation",
1463 |_| true, |_| true, provider_initializer,
1466 operation,
1467 Some(config),
1468 )
1469 .await;
1470
1471 assert!(result.is_err());
1472
1473 let final_available_count = selector.available_provider_count();
1475 assert_eq!(
1476 final_available_count, 0,
1477 "All providers should be marked as failed, but paused providers can still be used"
1478 );
1479 }
1480
1481 #[tokio::test]
1482 #[serial]
1483 async fn test_non_retriable_error_should_mark_provider_failed() {
1484 let _guard = setup_test_env();
1485 RpcHealthStore::instance().clear_all();
1486
1487 let configs = vec![
1489 RpcConfig::new("http://localhost:9995".to_string()),
1490 RpcConfig::new("http://localhost:9994".to_string()),
1491 ];
1492 let selector = RpcSelector::new_with_defaults(configs).expect("Failed to create selector");
1493
1494 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1495
1496 let operation = |_provider: String| async move {
1498 Err(TestError("Critical non-retriable error".to_string()))
1499 };
1500
1501 let config = RetryConfig::new(3, 1, 0, 0);
1502
1503 let initial_available_count = selector.available_provider_count();
1505 assert_eq!(
1506 initial_available_count, 2,
1507 "Both providers should be available after clearing health store"
1508 );
1509
1510 let result: Result<i32, TestError> = retry_rpc_call(
1511 &selector,
1512 "test_operation",
1513 |_| false, |e| e.0.contains("Critical"), provider_initializer,
1516 operation,
1517 Some(config),
1518 )
1519 .await;
1520
1521 assert!(result.is_err());
1522
1523 let final_available_count = selector.available_provider_count();
1525 assert_eq!(final_available_count, 1,
1526 "Provider should be marked as failed when should_mark_provider_failed returns true for non-retriable error");
1527 }
1528
1529 #[tokio::test]
1530 #[serial]
1531 async fn test_non_retriable_error_should_not_mark_provider_failed() {
1532 let _guard = setup_test_env();
1533 RpcHealthStore::instance().clear_all();
1534
1535 let configs = vec![
1537 RpcConfig::new("http://localhost:9997".to_string()),
1538 RpcConfig::new("http://localhost:9996".to_string()),
1539 ];
1540 let selector = RpcSelector::new_with_defaults(configs).expect("Failed to create selector");
1541
1542 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1543
1544 let operation = |_provider: String| async move {
1546 Err(TestError("Minor non-retriable error".to_string()))
1547 };
1548
1549 let config = RetryConfig::new(3, 1, 0, 0);
1550
1551 let initial_available_count = selector.available_provider_count();
1553 assert_eq!(
1554 initial_available_count, 2,
1555 "Both providers should be available after clearing health store"
1556 );
1557
1558 let result: Result<i32, TestError> = retry_rpc_call(
1559 &selector,
1560 "test_operation",
1561 |_| false, |e| e.0.contains("Critical"), provider_initializer,
1564 operation,
1565 Some(config),
1566 )
1567 .await;
1568
1569 assert!(result.is_err());
1570
1571 let final_available_count = selector.available_provider_count();
1573 assert_eq!(final_available_count, initial_available_count,
1574 "Provider should NOT be marked as failed when should_mark_provider_failed returns false for non-retriable error");
1575 }
1576
1577 #[tokio::test]
1578 #[serial]
1579 async fn test_retriable_error_ignores_should_mark_provider_failed() {
1580 let _guard = setup_test_env();
1581 RpcHealthStore::instance().clear_all();
1582
1583 let configs = vec![
1585 RpcConfig::new("http://localhost:9982".to_string()),
1586 RpcConfig::new("http://localhost:9981".to_string()),
1587 ];
1588 let selector = RpcSelector::new_with_defaults(configs).expect("Failed to create selector");
1589
1590 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1591
1592 let operation =
1594 |_provider: String| async { Err(TestError("Retriable network error".to_string())) };
1595
1596 let config = RetryConfig::new(2, 1, 0, 0); let initial_available_count = selector.available_provider_count();
1600 assert_eq!(
1601 initial_available_count, 2,
1602 "Both providers should be available after clearing"
1603 );
1604
1605 let result: Result<i32, TestError> = retry_rpc_call(
1606 &selector,
1607 "test_operation",
1608 |_| true, |_| false, provider_initializer,
1611 operation,
1612 Some(config),
1613 )
1614 .await;
1615
1616 assert!(result.is_err());
1617
1618 let final_available_count = selector.available_provider_count();
1621 assert!(final_available_count < initial_available_count,
1622 "Provider should be marked as failed when retriable errors exhaust retries, regardless of should_mark_provider_failed");
1623 }
1624
1625 #[tokio::test]
1626 #[serial]
1627 async fn test_mixed_error_scenarios_with_different_marking_behavior() {
1628 let _guard = setup_test_env();
1629 RpcHealthStore::instance().clear_all();
1630
1631 let configs = vec![
1634 RpcConfig::new("http://localhost:9993".to_string()),
1635 RpcConfig::new("http://localhost:9992".to_string()),
1636 ];
1637 let selector = RpcSelector::new_with_defaults(configs).expect("Failed to create selector");
1638
1639 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1640
1641 let operation =
1642 |_provider: String| async move { Err(TestError("Critical network error".to_string())) };
1643
1644 let config = RetryConfig::new(1, 1, 0, 0);
1645 let initial_count = selector.available_provider_count();
1646 assert_eq!(
1647 initial_count, 2,
1648 "Both providers should be available after clearing"
1649 );
1650
1651 let result: Result<i32, TestError> = retry_rpc_call(
1652 &selector,
1653 "test_operation",
1654 |_| false, |e| e.0.contains("Critical"), provider_initializer,
1657 operation,
1658 Some(config.clone()),
1659 )
1660 .await;
1661
1662 assert!(result.is_err());
1663 let after_critical_count = selector.available_provider_count();
1664 assert_eq!(
1665 after_critical_count,
1666 initial_count - 1,
1667 "Critical error should mark provider as failed"
1668 );
1669
1670 let operation =
1672 |_provider: String| async move { Err(TestError("Minor validation error".to_string())) };
1673
1674 let result: Result<i32, TestError> = retry_rpc_call(
1675 &selector,
1676 "test_operation",
1677 |_| false, |e| e.0.contains("Critical"), provider_initializer,
1680 operation,
1681 Some(config),
1682 )
1683 .await;
1684
1685 assert!(result.is_err());
1686 let final_count = selector.available_provider_count();
1687 assert_eq!(
1688 final_count, after_critical_count,
1689 "Minor error should NOT mark provider as failed"
1690 );
1691 }
1692
1693 #[tokio::test]
1694 #[serial]
1695 async fn test_should_mark_provider_failed_respects_last_provider_protection() {
1696 let _guard = setup_test_env();
1697 RpcHealthStore::instance().clear_all();
1698
1699 let unique_url = "http://localhost:9998".to_string();
1701 let configs = vec![RpcConfig::new(unique_url.clone())];
1702 let selector = RpcSelector::new_with_defaults(configs).expect("Failed to create selector");
1703
1704 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1705
1706 let operation =
1708 |_provider: String| async move { Err(TestError("Critical network error".to_string())) };
1709
1710 let config = RetryConfig::new(1, 1, 0, 0);
1711
1712 let initial_available_count = selector.available_provider_count();
1714 assert_eq!(
1715 initial_available_count, 1,
1716 "Provider should be available after clearing health store"
1717 );
1718
1719 let result: Result<i32, TestError> = retry_rpc_call(
1720 &selector,
1721 "test_operation",
1722 |_| false, |e| e.0.contains("Critical"), provider_initializer,
1725 operation,
1726 Some(config),
1727 )
1728 .await;
1729
1730 assert!(result.is_err());
1731
1732 let final_available_count = selector.available_provider_count();
1734 assert_eq!(
1735 final_available_count, 0,
1736 "Provider should be marked as failed, but selector can still use paused providers"
1737 );
1738 }
1739
1740 #[tokio::test]
1741 #[serial]
1742 async fn test_should_mark_provider_failed_with_multiple_providers_last_protection() {
1743 let _guard = setup_test_env();
1744 RpcHealthStore::instance().clear_all();
1745
1746 let configs = vec![
1748 RpcConfig::new("http://localhost:8545".to_string()),
1749 RpcConfig::new("http://localhost:8546".to_string()),
1750 ];
1751 let selector = RpcSelector::new_with_defaults(configs).expect("Failed to create selector");
1752
1753 let attempt_count = Arc::new(AtomicU8::new(0));
1754 let attempt_count_clone = attempt_count.clone();
1755
1756 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1757
1758 let operation = move |_provider: String| {
1760 let attempt_count = attempt_count_clone.clone();
1761 async move {
1762 let count = attempt_count.fetch_add(1, AtomicOrdering::SeqCst);
1763 Err(TestError(format!("Critical error #{}", count)))
1764 }
1765 };
1766
1767 let config = RetryConfig::new(1, 1, 0, 0); let initial_available_count = selector.available_provider_count();
1771 assert_eq!(initial_available_count, 2);
1772
1773 let result: Result<i32, TestError> = retry_rpc_call(
1774 &selector,
1775 "test_operation",
1776 |_| false, |e| e.0.contains("Critical"), provider_initializer,
1779 operation,
1780 Some(config),
1781 )
1782 .await;
1783
1784 assert!(result.is_err());
1785
1786 let final_available_count = selector.available_provider_count();
1788 assert_eq!(
1789 final_available_count, 1,
1790 "First provider should be marked as failed, second provider remains available"
1791 );
1792 }
1793
1794 #[tokio::test]
1795 #[serial]
1796 async fn test_tried_urls_tracking_prevents_duplicate_selection() {
1797 let _guard = setup_test_env();
1798 RpcHealthStore::instance().clear_all();
1799
1800 let configs = vec![
1801 RpcConfig::new("http://localhost:8545".to_string()),
1802 RpcConfig::new("http://localhost:8546".to_string()),
1803 RpcConfig::new("http://localhost:8547".to_string()),
1804 ];
1805 let selector = RpcSelector::new_with_defaults(configs).expect("Failed to create selector");
1806
1807 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1808
1809 let operation = |provider: String| async move {
1811 if provider.contains("8545") {
1812 Err(TestError("Provider 1 failed".to_string()))
1813 } else if provider.contains("8546") {
1814 Err(TestError("Provider 2 failed".to_string()))
1815 } else {
1816 Ok(42)
1817 }
1818 };
1819
1820 let config = RetryConfig::new(2, 10, 0, 0); let result: Result<i32, TestError> = retry_rpc_call(
1823 &selector,
1824 "test_operation",
1825 |_| true, |_| true, provider_initializer,
1828 operation,
1829 Some(config),
1830 )
1831 .await;
1832
1833 assert!(result.is_ok());
1834 assert_eq!(result.unwrap(), 42);
1835 }
1836
1837 #[tokio::test]
1838 #[serial]
1839 async fn test_all_providers_tried_returns_error() {
1840 let _guard = setup_test_env();
1841 RpcHealthStore::instance().clear_all();
1842
1843 let configs = vec![
1844 RpcConfig::new("http://localhost:8545".to_string()),
1845 RpcConfig::new("http://localhost:8546".to_string()),
1846 ];
1847 let selector = RpcSelector::new_with_defaults(configs).expect("Failed to create selector");
1848
1849 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1850
1851 let operation = |_provider: String| async { Err(TestError("Always fails".to_string())) };
1853
1854 let config = RetryConfig::new(2, 10, 0, 0); let result: Result<i32, TestError> = retry_rpc_call(
1857 &selector,
1858 "test_operation",
1859 |_| true, |_| true, provider_initializer,
1862 operation,
1863 Some(config),
1864 )
1865 .await;
1866
1867 assert!(result.is_err());
1868 assert_eq!(
1870 selector.available_provider_count(),
1871 0,
1872 "Both providers should be marked as failed"
1873 );
1874 }
1875
1876 #[tokio::test]
1877 #[serial]
1878 async fn test_tried_urls_passed_to_selector() {
1879 let _guard = setup_test_env();
1880 RpcHealthStore::instance().clear_all();
1881
1882 let url1 = "http://localhost:9980".to_string();
1884 let url2 = "http://localhost:9979".to_string();
1885 let url3 = "http://localhost:9978".to_string();
1886 let configs = vec![
1887 RpcConfig::new(url1.clone()),
1888 RpcConfig::new(url2.clone()),
1889 RpcConfig::new(url3.clone()),
1890 ];
1891 let selector = RpcSelector::new_with_defaults(configs).expect("Failed to create selector");
1892
1893 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1894
1895 let selected_providers = Arc::new(Mutex::new(Vec::new()));
1897 let selected_providers_clone = selected_providers.clone();
1898 let url3_clone = url3.clone();
1899
1900 let operation = move |provider: String| {
1901 let selected = selected_providers_clone.clone();
1902 let url3 = url3_clone.clone();
1903 async move {
1904 let mut selected_guard = selected.lock().unwrap();
1905 selected_guard.push(provider.clone());
1906
1907 if provider == url3 {
1909 Ok(42)
1910 } else {
1911 Err(TestError("Provider failed".to_string()))
1912 }
1913 }
1914 };
1915
1916 let config = RetryConfig::new(2, 10, 0, 0); let result: Result<i32, TestError> = retry_rpc_call(
1919 &selector,
1920 "test_operation",
1921 |_| true,
1922 |_| true,
1923 provider_initializer,
1924 operation,
1925 Some(config),
1926 )
1927 .await;
1928
1929 assert!(
1930 result.is_ok(),
1931 "Operation should succeed eventually, got error: {:?}",
1932 result
1933 );
1934 let selected = selected_providers.lock().unwrap();
1935 let unique_providers: HashSet<_> = selected.iter().collect();
1937 assert!(
1938 unique_providers.len() >= 1,
1939 "Should have tried at least 1 provider: {:?}",
1940 selected
1941 );
1942 assert!(
1944 unique_providers.contains(&url3),
1945 "Should have tried provider 3: {:?}",
1946 selected
1947 );
1948 assert!(
1952 selected.len() >= 1,
1953 "Should have at least 1 total attempt, got: {}",
1954 selected.len()
1955 );
1956 }
1957}