openzeppelin_relayer/services/provider/
rpc_selector.rs

1//! # RPC Provider Selector
2//!
3//! This module provides functionality for dynamically selecting RPC endpoints based on configured priorities,
4//! health status, and selection strategies.
5//!
6//! ## Features
7//!
8//! - **Weighted selection**: Providers can be assigned different weights to control selection probability
9//! - **Round-robin fallback**: If weighted selection fails or weights are equal, round-robin is used
10//! - **Health tracking**: Failed providers are temporarily excluded from selection
11//! - **Automatic recovery**: Failed providers are automatically recovered after a configurable period
12use std::collections::HashSet;
13use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
14use std::sync::Arc;
15
16use eyre::Result;
17use parking_lot::RwLock;
18use rand::distr::weighted::WeightedIndex;
19use rand::prelude::*;
20use serde::Serialize;
21use thiserror::Error;
22use tracing::info;
23
24use crate::models::RpcConfig;
25use crate::services::provider::rpc_health_store::RpcHealthStore;
26
27#[derive(Error, Debug, Serialize)]
28pub enum RpcSelectorError {
29    #[error("No providers available")]
30    NoProviders,
31    #[error("Client initialization failed: {0}")]
32    ClientInitializationError(String),
33    #[error("Weighted index error: {0}")]
34    WeightedIndexError(String),
35    #[error("All available providers have failed")]
36    AllProvidersFailed,
37}
38
39/// Manages selection of RPC endpoints based on configuration.
40#[derive(Debug)]
41pub struct RpcSelector {
42    /// RPC configurations
43    configs: Arc<RwLock<Vec<RpcConfig>>>,
44    /// Pre-computed weighted distribution for faster provider selection
45    weights_dist: Option<Arc<WeightedIndex<u8>>>,
46    /// Counter for round-robin selection as a fallback or for equal weights
47    next_index: Arc<AtomicUsize>,
48    /// Currently selected provider index
49    current_index: Arc<AtomicUsize>,
50    /// Flag indicating whether a current provider is valid
51    has_current: Arc<AtomicBool>,
52    /// Number of consecutive failures before pausing a provider
53    failure_threshold: u32,
54    /// Duration in seconds to pause a provider after reaching failure threshold
55    pause_duration_secs: u64,
56    /// Duration in seconds after which failures are considered stale and reset
57    failure_expiration_secs: u64,
58}
59
60impl RpcSelector {
61    /// Creates a new RpcSelector instance.
62    ///
63    /// # Arguments
64    /// * `configs` - RPC configurations
65    /// * `failure_threshold` - Number of consecutive failures before pausing a provider.
66    ///   Defaults to [`DEFAULT_PROVIDER_FAILURE_THRESHOLD`] if not provided via env var `PROVIDER_FAILURE_THRESHOLD`.
67    /// * `pause_duration_secs` - Duration in seconds to pause a provider after reaching failure threshold.
68    ///   Defaults to [`DEFAULT_PROVIDER_PAUSE_DURATION_SECS`] if not provided via env var `PROVIDER_PAUSE_DURATION_SECS`.
69    /// * `failure_expiration_secs` - Duration in seconds after which failures are considered stale and reset.
70    ///   Defaults to [`DEFAULT_PROVIDER_FAILURE_EXPIRATION_SECS`] (60 seconds).
71    ///
72    /// # Returns
73    /// * `Result<Self>` - A new selector instance or an error
74    ///
75    /// # Note
76    /// These values are typically loaded from `ServerConfig::from_env()` which reads from environment variables:
77    /// - `PROVIDER_FAILURE_THRESHOLD` (default: 3, legacy: `RPC_FAILURE_THRESHOLD`)
78    /// - `PROVIDER_PAUSE_DURATION_SECS` (default: 60, legacy: `RPC_PAUSE_DURATION_SECS`)
79    pub fn new(
80        configs: Vec<RpcConfig>,
81        failure_threshold: u32,
82        pause_duration_secs: u64,
83        failure_expiration_secs: u64,
84    ) -> Result<Self, RpcSelectorError> {
85        if configs.is_empty() {
86            return Err(RpcSelectorError::NoProviders);
87        }
88
89        // Create the weights distribution based on provided weights
90        let weights_dist = Self::create_weights_distribution(&configs, &HashSet::new());
91
92        let selector = Self {
93            configs: Arc::new(RwLock::new(configs)),
94            weights_dist,
95            next_index: Arc::new(AtomicUsize::new(0)),
96            current_index: Arc::new(AtomicUsize::new(0)),
97            has_current: Arc::new(AtomicBool::new(false)), // Initially no current provider
98            failure_threshold,
99            pause_duration_secs,
100            failure_expiration_secs,
101        };
102
103        // Randomize the starting index to avoid always starting with the same provider
104        let mut rng = rand::rng();
105        selector.next_index.store(
106            rng.random_range(0..selector.configs.read().len()),
107            Ordering::Relaxed,
108        );
109
110        Ok(selector)
111    }
112
113    /// Creates a new RpcSelector instance with default failure threshold and pause duration.
114    ///
115    /// This is a convenience method primarily for testing. In production code, use `new()` with
116    /// values from `ServerConfig::from_env()`.
117    ///
118    /// # Arguments
119    /// * `configs` - RPC configurations
120    ///
121    /// # Returns
122    /// * `Result<Self>` - A new selector instance or an error
123    pub fn new_with_defaults(configs: Vec<RpcConfig>) -> Result<Self, RpcSelectorError> {
124        Self::new(
125            configs,
126            crate::config::ServerConfig::get_provider_failure_threshold(),
127            crate::config::ServerConfig::get_provider_pause_duration_secs(),
128            crate::config::ServerConfig::get_provider_failure_expiration_secs(),
129        )
130    }
131
132    /// Gets the number of available providers
133    ///
134    /// # Returns
135    /// * `usize` - The number of providers in the selector
136    pub fn provider_count(&self) -> usize {
137        self.configs.read().len()
138    }
139
140    /// Gets the number of available (non-paused) providers
141    ///
142    /// # Returns
143    /// * `usize` - The number of non-paused providers
144    pub fn available_provider_count(&self) -> usize {
145        let health_store = RpcHealthStore::instance();
146        let expiration = chrono::Duration::seconds(self.failure_expiration_secs as i64);
147        self.configs
148            .read()
149            .iter()
150            .filter(|c| !health_store.is_paused(&c.url, self.failure_threshold, expiration))
151            .count()
152    }
153
154    /// Gets the current RPC configurations.
155    ///
156    /// # Returns
157    /// * `Vec<RpcConfig>` - The current configurations
158    pub fn get_configs(&self) -> Vec<RpcConfig> {
159        self.configs.read().clone()
160    }
161
162    /// Marks the current endpoint as failed and forces selection of a different endpoint.
163    ///
164    /// This method is used when a provider consistently fails, and we want to try a different one.
165    /// It adds the current provider to the failed providers set and will avoid selecting it again.
166    pub fn mark_current_as_failed(&self) {
167        info!("Marking current provider as failed");
168        // Only proceed if we have a current provider
169        if self.has_current.load(Ordering::Relaxed) {
170            let current = self.current_index.load(Ordering::Relaxed);
171            let configs = self.configs.read();
172            let config = &configs[current];
173
174            // Mark this provider as failed in the health store
175            let health_store = RpcHealthStore::instance();
176            use chrono::Duration;
177            health_store.mark_failed(
178                &config.url,
179                self.failure_threshold,
180                Duration::seconds(self.pause_duration_secs as i64),
181                Duration::seconds(self.failure_expiration_secs as i64),
182            );
183
184            // Clear the current provider
185            self.has_current.store(false, Ordering::Relaxed);
186
187            // Move round-robin index forward to avoid selecting the same provider again
188            if configs.len() > 1 {
189                self.next_index.fetch_add(1, Ordering::Relaxed);
190            }
191        }
192    }
193
194    /// Creates a weighted distribution for selecting RPC endpoints based on their weights.
195    ///
196    /// # Arguments
197    /// * `configs` - A slice of RPC configurations with weights
198    /// * `excluded_indices` - A set of indices to exclude from the distribution
199    ///
200    /// # Returns
201    /// * `Option<Arc<WeightedIndex<u8>>>` - A weighted distribution if configs have different weights, None otherwise
202    fn create_weights_distribution(
203        configs: &[RpcConfig],
204        excluded_indices: &HashSet<usize>,
205    ) -> Option<Arc<WeightedIndex<u8>>> {
206        // Collect weights, using 0 for excluded providers
207        let weights: Vec<u8> = configs
208            .iter()
209            .enumerate()
210            .map(|(idx, config)| {
211                if excluded_indices.contains(&idx) {
212                    0
213                } else {
214                    config.get_weight()
215                }
216            })
217            .collect();
218
219        // Count available providers with non-zero weight
220        let available_count = weights.iter().filter(|&&w| w > 0).count();
221        if available_count == 0 {
222            return None;
223        }
224
225        let first_non_zero_weight = weights.iter().find(|&&w| w > 0).copied();
226        if let Some(first_weight) = first_non_zero_weight {
227            // First check for the original equal weights case
228            let all_equal = weights
229                .iter()
230                .filter(|&&w| w > 0)
231                .all(|&w| w == first_weight);
232
233            if all_equal {
234                return None;
235            }
236        }
237
238        // Create weighted distribution
239        match WeightedIndex::new(&weights) {
240            Ok(dist) => Some(Arc::new(dist)),
241            Err(_) => None,
242        }
243    }
244
245    /// Attempts weighted selection of a provider.
246    ///
247    /// # Arguments
248    /// * `configs` - RPC configurations
249    /// * `excluded_urls` - URLs of providers that have already been tried
250    /// * `allow_paused` - If true, allows selection of paused providers
251    /// * `health_store` - Health store instance
252    /// * `expiration` - Duration after which failures expire
253    ///
254    /// # Returns
255    /// * `Option<(usize, String)>` - Some(index, url) if a provider was selected, None otherwise
256    fn try_weighted_selection(
257        &self,
258        configs: &[RpcConfig],
259        excluded_urls: &std::collections::HashSet<String>,
260        allow_paused: bool,
261        health_store: &RpcHealthStore,
262        expiration: chrono::Duration,
263    ) -> Option<(usize, String)> {
264        let dist = self.weights_dist.as_ref()?;
265        let mut rng = rand::rng();
266
267        const MAX_ATTEMPTS: usize = 10;
268        for _ in 0..MAX_ATTEMPTS {
269            let index = dist.sample(&mut rng);
270            // Skip providers with zero weight
271            if configs[index].get_weight() == 0 {
272                continue;
273            }
274            // Skip providers already tried in this failover cycle
275            if excluded_urls.contains(&configs[index].url) {
276                continue;
277            }
278            // Check health status unless paused providers are allowed
279            if !allow_paused
280                && health_store.is_paused(&configs[index].url, self.failure_threshold, expiration)
281            {
282                continue;
283            }
284            // Found a suitable provider
285            self.current_index.store(index, Ordering::Relaxed);
286            self.has_current.store(true, Ordering::Relaxed);
287            return Some((index, configs[index].url.clone()));
288        }
289        None
290    }
291
292    /// Attempts round-robin selection of a provider.
293    ///
294    /// # Arguments
295    /// * `configs` - RPC configurations
296    /// * `excluded_urls` - URLs of providers that have already been tried
297    /// * `allow_paused` - If true, allows selection of paused providers
298    /// * `health_store` - Health store instance
299    /// * `expiration` - Duration after which failures expire
300    /// * `start_index` - Starting index for round-robin iteration
301    ///
302    /// # Returns
303    /// * `Option<(usize, String)>` - Some(index, url) if a provider was selected, None otherwise
304    fn try_round_robin_selection(
305        &self,
306        configs: &[RpcConfig],
307        excluded_urls: &std::collections::HashSet<String>,
308        allow_paused: bool,
309        health_store: &RpcHealthStore,
310        expiration: chrono::Duration,
311        start_index: usize,
312    ) -> Option<(usize, String)> {
313        let len = configs.len();
314        for i in 0..len {
315            let index = (start_index + i) % len;
316            // Skip providers with zero weight
317            if configs[index].get_weight() == 0 {
318                continue;
319            }
320            // Skip providers already tried in this failover cycle
321            if excluded_urls.contains(&configs[index].url) {
322                continue;
323            }
324            // Check health status unless paused providers are allowed
325            if !allow_paused
326                && health_store.is_paused(&configs[index].url, self.failure_threshold, expiration)
327            {
328                continue;
329            }
330            // Found a suitable provider
331            // Update the next_index atomically to point after this provider
332            self.next_index.store((index + 1) % len, Ordering::Relaxed);
333            self.current_index.store(index, Ordering::Relaxed);
334            self.has_current.store(true, Ordering::Relaxed);
335            return Some((index, configs[index].url.clone()));
336        }
337        None
338    }
339
340    /// Gets the URL of the next RPC endpoint based on the selection strategy.
341    ///
342    /// This method first tries to select non-paused providers. If no non-paused providers
343    /// are available, it falls back to paused providers as a last resort, since they might
344    /// have recovered.
345    ///
346    /// # Arguments
347    /// * `excluded_urls` - URLs of providers that have already been tried in the current failover cycle
348    fn select_url_internal(
349        &self,
350        excluded_urls: &std::collections::HashSet<String>,
351    ) -> Result<String, RpcSelectorError> {
352        let configs = self.configs.read();
353        if configs.is_empty() {
354            return Err(RpcSelectorError::NoProviders);
355        }
356
357        let health_store = RpcHealthStore::instance();
358        let expiration = chrono::Duration::seconds(self.failure_expiration_secs as i64);
359
360        // For a single provider, handle special case
361        if configs.len() == 1 {
362            // Skip providers with zero weight
363            if configs[0].get_weight() == 0 {
364                return Err(RpcSelectorError::AllProvidersFailed);
365            }
366            // Skip if already tried
367            if excluded_urls.contains(&configs[0].url) {
368                return Err(RpcSelectorError::AllProvidersFailed);
369            }
370            // Even if paused, try it as last resort
371            self.current_index.store(0, Ordering::Relaxed);
372            self.has_current.store(true, Ordering::Relaxed);
373            return Ok(configs[0].url.clone());
374        }
375
376        // First, try to find a non-paused provider
377        // Try weighted selection first if available
378        if let Some((_, url)) = self.try_weighted_selection(
379            &configs,
380            excluded_urls,
381            false, // allow_paused = false
382            health_store,
383            expiration,
384        ) {
385            return Ok(url);
386        }
387
388        // Fall back to round-robin selection for non-paused providers
389        let start_index = self.next_index.load(Ordering::Relaxed) % configs.len();
390        if let Some((_, url)) = self.try_round_robin_selection(
391            &configs,
392            excluded_urls,
393            false, // allow_paused = false
394            health_store,
395            expiration,
396            start_index,
397        ) {
398            return Ok(url);
399        }
400
401        // If we get here, no non-paused providers are available
402        // Fall back to paused providers as a last resort
403        tracing::warn!(
404            "No non-paused providers available, falling back to paused providers as last resort"
405        );
406
407        // Try weighted selection for paused providers
408        if let Some((_, url)) = self.try_weighted_selection(
409            &configs,
410            excluded_urls,
411            true, // allow_paused = true
412            health_store,
413            expiration,
414        ) {
415            return Ok(url);
416        }
417
418        // Fall back to round-robin for paused providers
419        if let Some((_, url)) = self.try_round_robin_selection(
420            &configs,
421            excluded_urls,
422            true, // allow_paused = true
423            health_store,
424            expiration,
425            start_index,
426        ) {
427            return Ok(url);
428        }
429
430        // If we get here, all providers have zero weight (shouldn't happen in practice)
431        Err(RpcSelectorError::AllProvidersFailed)
432    }
433
434    /// Gets the URL of the currently selected RPC endpoint.
435    ///
436    /// # Returns
437    /// * `Result<String, RpcSelectorError>` - The URL of the current provider, or an error
438    pub fn get_current_url(&self) -> Result<String, RpcSelectorError> {
439        self.select_url_internal(&std::collections::HashSet::new())
440    }
441
442    /// Gets the URL of the next RPC endpoint, excluding providers that have already been tried.
443    ///
444    /// # Arguments
445    /// * `excluded_urls` - URLs of providers that have already been tried in the current failover cycle
446    ///
447    /// # Returns
448    /// * `Result<String, RpcSelectorError>` - The URL of the next provider, or an error
449    pub fn get_next_url(
450        &self,
451        excluded_urls: &std::collections::HashSet<String>,
452    ) -> Result<String, RpcSelectorError> {
453        self.select_url_internal(excluded_urls)
454    }
455
456    /// Gets the URL of the next RPC endpoint (for backward compatibility in tests).
457    /// This method doesn't exclude any providers - use `get_next_url()` with excluded URLs in production code.
458    #[cfg(test)]
459    pub fn select_url(&self) -> Result<String, RpcSelectorError> {
460        self.select_url_internal(&std::collections::HashSet::new())
461    }
462
463    /// Gets a client for the selected RPC endpoint.
464    ///
465    /// # Arguments
466    /// * `initializer` - A function that takes a URL string and returns a `Result<T>`
467    /// * `excluded_urls` - URLs of providers that have already been tried in the current failover cycle
468    ///
469    /// # Returns
470    /// * `Result<T>` - The client instance or an error
471    pub fn get_client<T>(
472        &self,
473        initializer: impl Fn(&str) -> Result<T>,
474        excluded_urls: &std::collections::HashSet<String>,
475    ) -> Result<T, RpcSelectorError> {
476        let url = self.select_url_internal(excluded_urls)?;
477
478        initializer(&url).map_err(|e| RpcSelectorError::ClientInitializationError(e.to_string()))
479    }
480}
481
482// Implement Clone for RpcSelector manually since the generic T doesn't require Clone
483impl Clone for RpcSelector {
484    fn clone(&self) -> Self {
485        Self {
486            configs: Arc::new(RwLock::new(self.configs.read().clone())),
487            weights_dist: self.weights_dist.clone(),
488            next_index: Arc::clone(&self.next_index),
489            current_index: Arc::clone(&self.current_index),
490            has_current: Arc::clone(&self.has_current),
491            failure_threshold: self.failure_threshold,
492            pause_duration_secs: self.pause_duration_secs,
493            failure_expiration_secs: self.failure_expiration_secs,
494        }
495    }
496}
497
498#[cfg(test)]
499mod tests {
500    use super::*;
501    use crate::services::provider::rpc_health_store::RpcHealthStore;
502    use serial_test::serial;
503    use std::sync::Arc;
504    use std::thread;
505
506    #[test]
507    fn test_create_weights_distribution_single_config() {
508        let configs = vec![RpcConfig {
509            url: "https://example.com/rpc".to_string(),
510            weight: 1,
511            ..Default::default()
512        }];
513
514        let excluded = HashSet::new();
515        let result = RpcSelector::create_weights_distribution(&configs, &excluded);
516        assert!(result.is_none());
517    }
518
519    #[test]
520    fn test_create_weights_distribution_equal_weights() {
521        let configs = vec![
522            RpcConfig {
523                url: "https://example1.com/rpc".to_string(),
524                weight: 5,
525                ..Default::default()
526            },
527            RpcConfig {
528                url: "https://example2.com/rpc".to_string(),
529                weight: 5,
530                ..Default::default()
531            },
532            RpcConfig {
533                url: "https://example3.com/rpc".to_string(),
534                weight: 5,
535                ..Default::default()
536            },
537        ];
538
539        let excluded = HashSet::new();
540        let result = RpcSelector::create_weights_distribution(&configs, &excluded);
541        assert!(result.is_none());
542    }
543
544    #[test]
545    fn test_create_weights_distribution_different_weights() {
546        let configs = vec![
547            RpcConfig {
548                url: "https://example1.com/rpc".to_string(),
549                weight: 1,
550                ..Default::default()
551            },
552            RpcConfig {
553                url: "https://example2.com/rpc".to_string(),
554                weight: 2,
555                ..Default::default()
556            },
557            RpcConfig {
558                url: "https://example3.com/rpc".to_string(),
559                weight: 3,
560                ..Default::default()
561            },
562        ];
563
564        let excluded = HashSet::new();
565        let result = RpcSelector::create_weights_distribution(&configs, &excluded);
566        assert!(result.is_some());
567    }
568
569    #[test]
570    fn test_create_weights_distribution_with_excluded() {
571        let configs = vec![
572            RpcConfig {
573                url: "https://example1.com/rpc".to_string(),
574                weight: 1,
575                ..Default::default()
576            },
577            RpcConfig {
578                url: "https://example2.com/rpc".to_string(),
579                weight: 2,
580                ..Default::default()
581            },
582            RpcConfig {
583                url: "https://example3.com/rpc".to_string(),
584                weight: 3,
585                ..Default::default()
586            },
587        ];
588
589        // Exclude the first provider
590        let mut excluded = HashSet::new();
591        excluded.insert(0);
592
593        let result = RpcSelector::create_weights_distribution(&configs, &excluded);
594        assert!(result.is_some());
595
596        // Exclude two providers (with only one remaining, should return None)
597        excluded.insert(1);
598        let result = RpcSelector::create_weights_distribution(&configs, &excluded);
599        assert!(result.is_none());
600    }
601
602    #[test]
603    fn test_rpc_selector_new_empty_configs() {
604        let configs: Vec<RpcConfig> = vec![];
605        let result = RpcSelector::new_with_defaults(configs);
606        assert!(result.is_err());
607        assert!(matches!(result.unwrap_err(), RpcSelectorError::NoProviders));
608    }
609
610    #[test]
611    fn test_rpc_selector_new_single_config() {
612        let configs = vec![RpcConfig {
613            url: "https://example.com/rpc".to_string(),
614            weight: 1,
615            ..Default::default()
616        }];
617
618        let result = RpcSelector::new_with_defaults(configs);
619        assert!(result.is_ok());
620        let selector = result.unwrap();
621        assert!(selector.weights_dist.is_none());
622    }
623
624    #[test]
625    fn test_rpc_selector_new_multiple_equal_weights() {
626        let configs = vec![
627            RpcConfig {
628                url: "https://example1.com/rpc".to_string(),
629                weight: 5,
630                ..Default::default()
631            },
632            RpcConfig {
633                url: "https://example2.com/rpc".to_string(),
634                weight: 5,
635                ..Default::default()
636            },
637        ];
638
639        let result = RpcSelector::new_with_defaults(configs);
640        assert!(result.is_ok());
641        let selector = result.unwrap();
642        assert!(selector.weights_dist.is_none());
643    }
644
645    #[test]
646    fn test_rpc_selector_new_multiple_different_weights() {
647        let configs = vec![
648            RpcConfig {
649                url: "https://example1.com/rpc".to_string(),
650                weight: 1,
651                ..Default::default()
652            },
653            RpcConfig {
654                url: "https://example2.com/rpc".to_string(),
655                weight: 3,
656                ..Default::default()
657            },
658        ];
659
660        let result = RpcSelector::new_with_defaults(configs);
661        assert!(result.is_ok());
662        let selector = result.unwrap();
663        assert!(selector.weights_dist.is_some());
664    }
665
666    #[test]
667    fn test_rpc_selector_select_url_single_provider() {
668        let configs = vec![RpcConfig {
669            url: "https://example.com/rpc".to_string(),
670            weight: 1,
671            ..Default::default()
672        }];
673
674        let selector = RpcSelector::new_with_defaults(configs).unwrap();
675        let result = selector.select_url();
676        assert!(result.is_ok());
677        assert_eq!(result.unwrap(), "https://example.com/rpc");
678        assert!(selector.has_current.load(Ordering::Relaxed));
679    }
680
681    #[test]
682    fn test_rpc_selector_select_url_round_robin() {
683        let configs = vec![
684            RpcConfig {
685                url: "https://example1.com/rpc".to_string(),
686                weight: 1,
687                ..Default::default()
688            },
689            RpcConfig {
690                url: "https://example2.com/rpc".to_string(),
691                weight: 1,
692                ..Default::default()
693            },
694        ];
695
696        let selector = RpcSelector::new_with_defaults(configs).unwrap();
697
698        // First call should return the first URL
699        let first_url = selector.select_url().unwrap();
700        // Second call should return the second URL due to round-robin
701        let second_url = selector.select_url().unwrap();
702        // Third call should return the first URL again
703        let third_url = selector.select_url().unwrap();
704
705        // We don't know which URL comes first, but the sequence should alternate
706        assert_ne!(first_url, second_url);
707        assert_eq!(first_url, third_url);
708    }
709
710    #[test]
711    fn test_rpc_selector_get_client_success() {
712        let configs = vec![RpcConfig {
713            url: "https://example.com/rpc".to_string(),
714            weight: 1,
715            ..Default::default()
716        }];
717
718        let selector = RpcSelector::new_with_defaults(configs).unwrap();
719
720        // Create a simple initializer function that returns the URL as a string
721        let initializer = |url: &str| -> Result<String> { Ok(url.to_string()) };
722
723        let result = selector.get_client(initializer, &std::collections::HashSet::new());
724        assert!(result.is_ok());
725        assert_eq!(result.unwrap(), "https://example.com/rpc");
726    }
727
728    #[test]
729    fn test_rpc_selector_get_client_failure() {
730        let configs = vec![RpcConfig {
731            url: "https://example.com/rpc".to_string(),
732            weight: 1,
733            ..Default::default()
734        }];
735
736        let selector = RpcSelector::new_with_defaults(configs).unwrap();
737
738        // Create a failing initializer function
739        let initializer =
740            |_url: &str| -> Result<String> { Err(eyre::eyre!("Initialization error")) };
741
742        let result = selector.get_client(initializer, &std::collections::HashSet::new());
743        assert!(result.is_err());
744        assert!(matches!(
745            result.unwrap_err(),
746            RpcSelectorError::ClientInitializationError(_)
747        ));
748    }
749
750    #[test]
751    fn test_rpc_selector_clone() {
752        let configs = vec![
753            RpcConfig {
754                url: "https://example1.com/rpc".to_string(),
755                weight: 1,
756                ..Default::default()
757            },
758            RpcConfig {
759                url: "https://example2.com/rpc".to_string(),
760                weight: 3,
761                ..Default::default()
762            },
763        ];
764
765        let selector = RpcSelector::new_with_defaults(configs).unwrap();
766        let cloned = selector.clone();
767
768        // Check that the cloned selector has the same configuration
769        assert_eq!(selector.configs.read().len(), cloned.configs.read().len());
770        assert_eq!(selector.configs.read()[0].url, cloned.configs.read()[0].url);
771        assert_eq!(selector.configs.read()[1].url, cloned.configs.read()[1].url);
772
773        // Check that weights distribution is also cloned
774        assert_eq!(
775            selector.weights_dist.is_some(),
776            cloned.weights_dist.is_some()
777        );
778    }
779
780    #[test]
781    #[serial]
782    fn test_mark_current_as_failed_single_provider() {
783        // Clear health store to ensure clean state
784        RpcHealthStore::instance().clear_all();
785
786        // With a single provider, marking as failed multiple times (to reach threshold) will pause it,
787        // but it can still be selected as a last resort
788        let configs = vec![RpcConfig {
789            url: "https://test-single-provider.example.com/rpc".to_string(),
790            weight: 1,
791            ..Default::default()
792        }];
793
794        // Create selector with threshold=1 for testing
795        let selector = RpcSelector::new(configs, 1, 60, 60).unwrap();
796        let _initial_url = selector.select_url().unwrap();
797
798        // Mark as failed once (with threshold=1, this will pause it)
799        selector.mark_current_as_failed();
800
801        // With threshold=1, available_provider_count should be 0
802        assert_eq!(selector.available_provider_count(), 0);
803
804        // But select_url should still work (selecting paused provider as last resort)
805        let next_url = selector.select_url();
806        assert!(next_url.is_ok());
807        assert_eq!(
808            next_url.unwrap(),
809            "https://test-single-provider.example.com/rpc"
810        );
811    }
812
813    #[test]
814    #[serial]
815    fn test_mark_current_as_failed_multiple_providers() {
816        // Clear health store to ensure clean state
817        RpcHealthStore::instance().clear_all();
818
819        // With multiple providers, marking as failed (with threshold=1) will pause them,
820        // but they can still be selected as a last resort
821        let configs = vec![
822            RpcConfig {
823                url: "https://test-multi1.example.com/rpc".to_string(),
824                weight: 5,
825                ..Default::default()
826            },
827            RpcConfig {
828                url: "https://test-multi2.example.com/rpc".to_string(),
829                weight: 5,
830                ..Default::default()
831            },
832            RpcConfig {
833                url: "https://test-multi3.example.com/rpc".to_string(),
834                weight: 5,
835                ..Default::default()
836            },
837        ];
838
839        // Create selector with threshold=1 for testing
840        let selector = RpcSelector::new(configs, 1, 60, 60).unwrap();
841
842        // Get the first URL
843        let url1 = selector.select_url().unwrap().to_string();
844
845        // Mark as failed (with threshold=1, this pauses it)
846        selector.mark_current_as_failed();
847        // Available count should decrease
848        assert_eq!(selector.available_provider_count(), 2);
849
850        // Next selection should prefer non-paused providers
851        let url2 = selector.select_url().unwrap().to_string();
852        // Should be different from the paused one
853        assert_ne!(url1, url2);
854
855        // Mark the second URL as failed too
856        selector.mark_current_as_failed();
857        assert_eq!(selector.available_provider_count(), 1);
858
859        let url3 = selector.select_url().unwrap().to_string();
860        // Should get the third URL (non-paused)
861        assert_ne!(url1, url3);
862        assert_ne!(url2, url3);
863
864        // Mark the third URL as failed too
865        selector.mark_current_as_failed();
866        assert_eq!(selector.available_provider_count(), 0);
867
868        // Now all URLs are paused, but select_url should still work (selecting paused providers as last resort)
869        let url4 = selector.select_url();
870        assert!(url4.is_ok());
871        // Should return one of the paused providers
872        let url4_str = url4.unwrap();
873        assert!(
874            url4_str == "https://test-multi1.example.com/rpc"
875                || url4_str == "https://test-multi2.example.com/rpc"
876                || url4_str == "https://test-multi3.example.com/rpc"
877        );
878    }
879
880    #[test]
881    #[serial]
882    fn test_mark_current_as_failed_weighted() {
883        // Clear health store to ensure clean state
884        RpcHealthStore::instance().clear_all();
885
886        // Test with weighted selection
887        let configs = vec![
888            RpcConfig {
889                url: "https://test-weighted1.example.com/rpc".to_string(),
890                weight: 1, // Low weight
891                ..Default::default()
892            },
893            RpcConfig {
894                url: "https://test-weighted2.example.com/rpc".to_string(),
895                weight: 10, // High weight
896                ..Default::default()
897            },
898        ];
899
900        // Create selector with threshold=1 for testing
901        let selector = RpcSelector::new(configs, 1, 60, 60).unwrap();
902        assert!(selector.weights_dist.is_some()); // Confirm we're using weighted selection
903
904        // Get a URL
905        let url1 = selector.select_url().unwrap().to_string();
906
907        // Mark it as failed (with threshold=1, this pauses it)
908        selector.mark_current_as_failed();
909        assert_eq!(selector.available_provider_count(), 1);
910
911        // Get another URL, it should prefer the non-paused one
912        let url2 = selector.select_url().unwrap().to_string();
913        assert_ne!(url1, url2);
914
915        // Mark this one as failed too
916        selector.mark_current_as_failed();
917        assert_eq!(selector.available_provider_count(), 0);
918
919        // With all providers paused, select_url should still work (selecting paused providers as last resort)
920        let url3 = selector.select_url();
921        assert!(url3.is_ok());
922        let url3_str = url3.unwrap();
923        assert!(
924            url3_str == "https://test-weighted1.example.com/rpc"
925                || url3_str == "https://test-weighted2.example.com/rpc"
926        );
927    }
928
929    #[test]
930    fn test_provider_count() {
931        // Test with no providers
932        let configs: Vec<RpcConfig> = vec![];
933        let result = RpcSelector::new_with_defaults(configs);
934        assert!(result.is_err());
935
936        // Test with a single provider
937        let configs = vec![RpcConfig {
938            url: "https://example.com/rpc".to_string(),
939            weight: 1,
940            ..Default::default()
941        }];
942        let selector = RpcSelector::new_with_defaults(configs).unwrap();
943        assert_eq!(selector.provider_count(), 1);
944
945        // Test with multiple providers
946        let configs = vec![
947            RpcConfig {
948                url: "https://example1.com/rpc".to_string(),
949                weight: 1,
950                ..Default::default()
951            },
952            RpcConfig {
953                url: "https://example2.com/rpc".to_string(),
954                weight: 2,
955                ..Default::default()
956            },
957            RpcConfig {
958                url: "https://example3.com/rpc".to_string(),
959                weight: 3,
960                ..Default::default()
961            },
962        ];
963        let selector = RpcSelector::new_with_defaults(configs).unwrap();
964        assert_eq!(selector.provider_count(), 3);
965    }
966
967    #[test]
968    #[serial]
969    fn test_available_provider_count() {
970        // Clear health store to ensure clean state
971        RpcHealthStore::instance().clear_all();
972
973        let configs = vec![
974            RpcConfig {
975                url: "https://test-available1.example.com/rpc".to_string(),
976                weight: 1,
977                ..Default::default()
978            },
979            RpcConfig {
980                url: "https://test-available2.example.com/rpc".to_string(),
981                weight: 2,
982                ..Default::default()
983            },
984            RpcConfig {
985                url: "https://test-available3.example.com/rpc".to_string(),
986                weight: 3,
987                ..Default::default()
988            },
989        ];
990
991        // Create selector with threshold=1 for testing
992        let selector = RpcSelector::new(configs, 1, 60, 60).unwrap();
993        assert_eq!(selector.provider_count(), 3);
994        assert_eq!(selector.available_provider_count(), 3);
995
996        // Mark one provider as failed (with threshold=1, this pauses it)
997        selector.select_url().unwrap(); // Select a provider first
998        selector.mark_current_as_failed();
999        // Available count should decrease (only non-paused providers)
1000        assert_eq!(selector.available_provider_count(), 2);
1001
1002        // Mark another provider as failed
1003        selector.select_url().unwrap(); // Select another provider
1004        selector.mark_current_as_failed();
1005        assert_eq!(selector.available_provider_count(), 1);
1006    }
1007
1008    #[test]
1009    fn test_get_current_url() {
1010        let configs = vec![
1011            RpcConfig::new("https://example1.com/rpc".to_string()),
1012            RpcConfig::new("https://example2.com/rpc".to_string()),
1013        ];
1014
1015        let selector = RpcSelector::new_with_defaults(configs).unwrap();
1016
1017        // Should return a valid URL
1018        let url = selector.get_current_url();
1019        assert!(url.is_ok());
1020        let url_str = url.unwrap();
1021        assert!(
1022            url_str == "https://example1.com/rpc" || url_str == "https://example2.com/rpc",
1023            "Unexpected URL: {}",
1024            url_str
1025        );
1026    }
1027
1028    #[test]
1029    #[serial]
1030    fn test_concurrent_usage() {
1031        // Clear health store to ensure clean state
1032        RpcHealthStore::instance().clear_all();
1033
1034        // Test RpcSelector with concurrent access from multiple threads
1035        let configs = vec![
1036            RpcConfig::new("https://test-concurrent1.example.com/rpc".to_string()),
1037            RpcConfig::new("https://test-concurrent2.example.com/rpc".to_string()),
1038            RpcConfig::new("https://test-concurrent3.example.com/rpc".to_string()),
1039        ];
1040
1041        // Create selector with threshold=1 for testing
1042        let selector = RpcSelector::new(configs, 1, 60, 60).unwrap();
1043        let selector_arc = Arc::new(selector);
1044
1045        let mut handles = Vec::with_capacity(10);
1046
1047        // Launch 10 threads that select and mark providers
1048        for _ in 0..10 {
1049            let selector_clone = Arc::clone(&selector_arc);
1050            let handle = thread::spawn(move || {
1051                let url = selector_clone.select_url().unwrap().to_string();
1052                if url.contains("test-concurrent1") {
1053                    // Only mark example1 as failed
1054                    selector_clone.mark_current_as_failed();
1055                }
1056                url
1057            });
1058            handles.push(handle);
1059        }
1060
1061        // Collect results
1062        let mut urls = Vec::new();
1063        for handle in handles {
1064            urls.push(handle.join().unwrap());
1065        }
1066
1067        // Check that at least some threads got different URLs
1068        let unique_urls: std::collections::HashSet<String> = urls.into_iter().collect();
1069        assert!(unique_urls.len() > 1, "Expected multiple unique URLs");
1070
1071        // After all threads, example1 should be marked as failed (paused)
1072        // Selections should prefer non-paused providers
1073        let mut found_non_example1 = false;
1074        for _ in 0..10 {
1075            let url = selector_arc.select_url().unwrap().to_string();
1076            if !url.contains("test-concurrent1") {
1077                found_non_example1 = true;
1078            }
1079        }
1080
1081        assert!(found_non_example1, "Should prefer non-paused providers");
1082    }
1083
1084    #[test]
1085    fn test_consecutive_mark_as_failed() {
1086        let configs = vec![
1087            RpcConfig::new("https://example1.com/rpc".to_string()),
1088            RpcConfig::new("https://example2.com/rpc".to_string()),
1089        ];
1090
1091        let selector = RpcSelector::new_with_defaults(configs).unwrap();
1092
1093        // First call to select a provider
1094        selector.select_url().unwrap();
1095
1096        // Mark as failed twice consecutively without selecting in between
1097        selector.mark_current_as_failed();
1098        selector.mark_current_as_failed(); // This should be a no-op since has_current is now 0
1099
1100        // We should still be able to select a provider (since only one was marked failed)
1101        let result = selector.select_url();
1102        assert!(result.is_ok());
1103    }
1104
1105    #[test]
1106    #[serial]
1107    fn test_weighted_to_round_robin_fallback() {
1108        // Clear health store to ensure clean state
1109        RpcHealthStore::instance().clear_all();
1110
1111        let configs = vec![
1112            RpcConfig {
1113                url: "https://test-wrr1.example.com/rpc".to_string(),
1114                weight: 10, // High weight
1115                ..Default::default()
1116            },
1117            RpcConfig {
1118                url: "https://test-wrr2.example.com/rpc".to_string(),
1119                weight: 1, // Low weight
1120                ..Default::default()
1121            },
1122            RpcConfig {
1123                url: "https://test-wrr3.example.com/rpc".to_string(),
1124                weight: 1, // Low weight
1125                ..Default::default()
1126            },
1127        ];
1128
1129        // Create selector with threshold=1 for testing
1130        let selector = RpcSelector::new(configs, 1, 60, 60).unwrap();
1131        assert!(selector.weights_dist.is_some()); // Using weighted selection
1132
1133        // Mock a situation where weighted selection would fail multiple times
1134        // by marking the high-weight provider as failed
1135        let mut selected_first = false;
1136
1137        // Try multiple times - the first provider should be selected more often due to weight
1138        for _ in 0..10 {
1139            let url = selector.select_url().unwrap();
1140            if url.contains("test-wrr1") {
1141                selected_first = true;
1142                // Mark the high-weight provider as failed (pauses it)
1143                selector.mark_current_as_failed();
1144                break;
1145            }
1146        }
1147
1148        assert!(
1149            selected_first,
1150            "High-weight provider should have been selected"
1151        );
1152
1153        // After marking it failed (paused), selections should prefer the other providers (non-paused)
1154        let mut seen_urls = HashSet::new();
1155        for _ in 0..10 {
1156            let url = selector.select_url().unwrap().to_string();
1157            seen_urls.insert(url);
1158        }
1159
1160        // Should have seen at least example2 and example3 (non-paused providers)
1161        assert!(seen_urls.len() >= 2);
1162        assert!(
1163            !seen_urls.iter().any(|url| url.contains("test-wrr1")),
1164            "Paused provider should not be selected (prefer non-paused)"
1165        );
1166    }
1167
1168    #[test]
1169    fn test_zero_weight_providers() {
1170        let configs = vec![
1171            RpcConfig {
1172                url: "https://example1.com/rpc".to_string(),
1173                weight: 0, // Zero weight
1174                ..Default::default()
1175            },
1176            RpcConfig {
1177                url: "https://example2.com/rpc".to_string(),
1178                weight: 5, // Normal weight
1179                ..Default::default()
1180            },
1181        ];
1182
1183        let selector = RpcSelector::new_with_defaults(configs).unwrap();
1184
1185        // With weighted selection, should never select the zero-weight provider
1186        let mut seen_urls = HashSet::new();
1187        for _ in 0..10 {
1188            let url = selector.select_url().unwrap().to_string();
1189            seen_urls.insert(url);
1190        }
1191
1192        assert_eq!(seen_urls.len(), 1);
1193        assert!(
1194            seen_urls.iter().next().unwrap().contains("example2"),
1195            "Only the non-zero weight provider should be selected"
1196        );
1197    }
1198
1199    #[test]
1200    #[serial]
1201    fn test_extreme_weight_differences() {
1202        let configs = vec![
1203            RpcConfig {
1204                url: "https://example1.com/rpc".to_string(),
1205                weight: 100, // Very high weight
1206                ..Default::default()
1207            },
1208            RpcConfig {
1209                url: "https://example2.com/rpc".to_string(),
1210                weight: 1, // Very low weight
1211                ..Default::default()
1212            },
1213        ];
1214
1215        let selector = RpcSelector::new_with_defaults(configs).unwrap();
1216
1217        // High weight provider should be selected much more frequently
1218        let mut count_high = 0;
1219
1220        for _ in 0..100 {
1221            let url = selector.select_url().unwrap().to_string();
1222            if url.contains("example1") {
1223                count_high += 1;
1224            }
1225
1226            // Reset to clear current selection
1227            selector.has_current.store(false, Ordering::Relaxed);
1228        }
1229
1230        // High-weight provider should be selected at least 90% of the time
1231        assert!(
1232            count_high > 90,
1233            "High-weight provider selected only {}/{} times",
1234            count_high,
1235            100
1236        );
1237    }
1238
1239    #[test]
1240    fn test_mark_unselected_as_failed() {
1241        let configs = vec![
1242            RpcConfig::new("https://example1.com/rpc".to_string()),
1243            RpcConfig::new("https://example2.com/rpc".to_string()),
1244        ];
1245
1246        let selector = RpcSelector::new_with_defaults(configs).unwrap();
1247
1248        // Without selecting, mark as failed (should be a no-op)
1249        selector.mark_current_as_failed();
1250
1251        // Should still be able to select both providers
1252        let mut seen_urls = HashSet::new();
1253        for _ in 0..10 {
1254            let url = selector.select_url().unwrap().to_string();
1255            seen_urls.insert(url);
1256
1257            // Reset for next iteration
1258            selector.has_current.store(false, Ordering::Relaxed);
1259        }
1260
1261        assert_eq!(
1262            seen_urls.len(),
1263            2,
1264            "Both providers should still be available"
1265        );
1266    }
1267
1268    #[test]
1269    fn test_rpc_selector_error_serialization() {
1270        let error = RpcSelectorError::NoProviders;
1271        let json = serde_json::to_string(&error).unwrap();
1272        assert!(json.contains("NoProviders"));
1273
1274        let error = RpcSelectorError::ClientInitializationError("test error".to_string());
1275        let json = serde_json::to_string(&error).unwrap();
1276        assert!(json.contains("ClientInitializationError"));
1277        assert!(json.contains("test error"));
1278
1279        let error = RpcSelectorError::WeightedIndexError("index error".to_string());
1280        let json = serde_json::to_string(&error).unwrap();
1281        assert!(json.contains("WeightedIndexError"));
1282        assert!(json.contains("index error"));
1283
1284        let error = RpcSelectorError::AllProvidersFailed;
1285        let json = serde_json::to_string(&error).unwrap();
1286        assert!(json.contains("AllProvidersFailed"));
1287    }
1288
1289    #[cfg(test)]
1290    mod rate_limiting_tests {
1291        use super::*;
1292        use crate::services::provider::rpc_health_store::RpcHealthStore;
1293
1294        /// Test that RpcSelector switches to the second RPC when the first one is rate-limited.
1295        ///
1296        /// This test simulates a scenario where:
1297        /// 1. Two RPC configs are set up with equal weights
1298        /// 2. The first RPC starts returning rate limit errors (429)
1299        /// 3. The selector should switch to the second RPC
1300        /// 4. The first RPC should be marked as failed and excluded from selection
1301        #[test]
1302        #[serial]
1303        fn test_rpc_selector_switches_on_rate_limit() {
1304            RpcHealthStore::instance().clear_all();
1305            let configs = vec![
1306                RpcConfig {
1307                    url: "https://test-rate-limit1.example.com".to_string(),
1308                    weight: 100,
1309                    ..Default::default()
1310                },
1311                RpcConfig {
1312                    url: "https://test-rate-limit2.example.com".to_string(),
1313                    weight: 100,
1314                    ..Default::default()
1315                },
1316            ];
1317
1318            // Create selector with threshold=1 for testing
1319            let selector = RpcSelector::new(configs, 1, 60, 60).unwrap();
1320
1321            // Initially, both providers should be available
1322            assert_eq!(selector.available_provider_count(), 2);
1323
1324            // Select the first provider
1325            let first_url = selector.select_url().unwrap();
1326
1327            // Verify we got a valid URL
1328            assert!(
1329                first_url == "https://test-rate-limit1.example.com"
1330                    || first_url == "https://test-rate-limit2.example.com"
1331            );
1332
1333            // Simulate rate limiting: mark the current provider as failed
1334            // This simulates what happens when a provider returns HTTP 429 after retries are exhausted
1335            selector.mark_current_as_failed();
1336
1337            // Now only one provider should be available (non-paused)
1338            assert_eq!(selector.available_provider_count(), 1);
1339
1340            // The next selection should prefer the non-paused provider
1341            let second_url = selector.select_url().unwrap();
1342
1343            // Verify we got a different URL (the non-paused one)
1344            assert_ne!(first_url, second_url);
1345
1346            // Verify the failed provider is not selected again (prefer non-paused)
1347            let third_url = selector.select_url().unwrap();
1348            assert_eq!(second_url, third_url); // Should keep using the working provider
1349
1350            // Verify the failed provider is excluded from preferred selection
1351            let mut selected_urls = std::collections::HashSet::new();
1352            for _ in 0..10 {
1353                let url = selector.select_url().unwrap();
1354                selected_urls.insert(url.to_string());
1355            }
1356
1357            // Should only select from the non-failed provider (preferred)
1358            assert_eq!(selected_urls.len(), 1);
1359            assert!(!selected_urls.contains(&first_url.to_string()));
1360            assert!(selected_urls.contains(&second_url.to_string()));
1361        }
1362
1363        /// Test that RpcSelector handles rate limiting with weighted selection.
1364        ///
1365        /// This test verifies that even with weighted selection, a rate-limited provider
1366        /// is excluded and the selector falls back to the other provider.
1367        #[test]
1368        #[serial]
1369        fn test_rpc_selector_rate_limit_with_weighted_selection() {
1370            RpcHealthStore::instance().clear_all();
1371            let configs = vec![
1372                RpcConfig {
1373                    url: "https://test-weighted-rl1.example.com".to_string(),
1374                    weight: 80, // Higher weight, should be preferred
1375                    ..Default::default()
1376                },
1377                RpcConfig {
1378                    url: "https://test-weighted-rl2.example.com".to_string(),
1379                    weight: 20, // Lower weight
1380                    ..Default::default()
1381                },
1382            ];
1383
1384            // Create selector with threshold=1 for testing
1385            let selector = RpcSelector::new(configs, 1, 60, 60).unwrap();
1386
1387            // Select multiple times - with weighted selection, rpc1 should be selected more often
1388            let mut rpc1_count = 0;
1389            let mut rpc2_count = 0;
1390
1391            for _ in 0..20 {
1392                let url = selector.select_url().unwrap();
1393                if url == "https://test-weighted-rl1.example.com" {
1394                    rpc1_count += 1;
1395                } else {
1396                    rpc2_count += 1;
1397                }
1398            }
1399
1400            // With weighted selection, rpc1 should be selected more often
1401            assert!(rpc1_count > rpc2_count);
1402
1403            // Now simulate rate limiting on rpc1
1404            // First, select rpc1
1405            let mut selected_rpc1 = false;
1406            for _ in 0..10 {
1407                let url = selector.select_url().unwrap();
1408                if url == "https://test-weighted-rl1.example.com" {
1409                    selector.mark_current_as_failed();
1410                    selected_rpc1 = true;
1411                    break;
1412                }
1413            }
1414            assert!(selected_rpc1, "Should have selected rpc1 at least once");
1415
1416            // After marking rpc1 as failed (paused), selections should prefer rpc2 (non-paused)
1417            for _ in 0..20 {
1418                let url = selector.select_url().unwrap();
1419                assert_eq!(url, "https://test-weighted-rl2.example.com");
1420            }
1421
1422            // Verify only one provider is available (non-paused)
1423            assert_eq!(selector.available_provider_count(), 1);
1424        }
1425
1426        /// Test that a rate-limited provider stays failed.
1427        ///
1428        /// This test verifies that failed providers remain failed,
1429        /// which simulates persistence of health state.
1430        #[test]
1431        #[serial]
1432        fn test_rpc_selector_rate_limit_recovery() {
1433            RpcHealthStore::instance().clear_all();
1434            let configs = vec![
1435                RpcConfig {
1436                    url: "https://test-recovery1.example.com".to_string(),
1437                    weight: 100,
1438                    ..Default::default()
1439                },
1440                RpcConfig {
1441                    url: "https://test-recovery2.example.com".to_string(),
1442                    weight: 100,
1443                    ..Default::default()
1444                },
1445            ];
1446
1447            // Create selector with threshold=1 for testing
1448            let selector = RpcSelector::new(configs, 1, 60, 60).unwrap();
1449
1450            // Select first provider
1451            let first_url = selector.select_url().unwrap();
1452
1453            // Mark it as failed (simulating rate limit, with threshold=1 this pauses it)
1454            selector.mark_current_as_failed();
1455            assert_eq!(selector.available_provider_count(), 1);
1456
1457            // Next selection should prefer the other provider (non-paused)
1458            let second_url = selector.select_url().unwrap();
1459            assert_ne!(first_url, second_url);
1460
1461            // Verify only the working provider is selected (prefer non-paused)
1462            for _ in 0..10 {
1463                let url = selector.select_url().unwrap();
1464                assert_eq!(url, second_url);
1465            }
1466
1467            // Since we persist health, the failed provider stays failed (paused)
1468            assert_eq!(selector.available_provider_count(), 1);
1469        }
1470
1471        /// Test that when both providers are rate-limited, the selector handles it gracefully.
1472        #[test]
1473        #[serial]
1474        fn test_rpc_selector_both_providers_rate_limited() {
1475            RpcHealthStore::instance().clear_all();
1476            let configs = vec![
1477                RpcConfig {
1478                    url: "https://test-both-rl1.example.com".to_string(),
1479                    weight: 100,
1480                    ..Default::default()
1481                },
1482                RpcConfig {
1483                    url: "https://test-both-rl2.example.com".to_string(),
1484                    weight: 100,
1485                    ..Default::default()
1486                },
1487            ];
1488
1489            // Create selector with threshold=1 for testing
1490            let selector = RpcSelector::new(configs, 1, 60, 60).unwrap();
1491
1492            // Select and mark first provider as failed (pauses it)
1493            selector.select_url().unwrap();
1494            selector.mark_current_as_failed();
1495            assert_eq!(selector.available_provider_count(), 1);
1496
1497            // Select and mark second provider as failed (pauses it)
1498            selector.select_url().unwrap();
1499            selector.mark_current_as_failed();
1500            assert_eq!(selector.available_provider_count(), 0);
1501
1502            // Now all providers are paused, but select_url should still work (selecting paused providers as last resort)
1503            let result = selector.select_url();
1504            assert!(result.is_ok());
1505            let url = result.unwrap();
1506            assert!(
1507                url == "https://test-both-rl1.example.com"
1508                    || url == "https://test-both-rl2.example.com"
1509            );
1510        }
1511
1512        /// Test that rate limiting works correctly with round-robin fallback.
1513        ///
1514        /// This test verifies that when weighted selection fails due to rate limiting,
1515        /// the selector correctly falls back to round-robin selection.
1516        #[test]
1517        #[serial]
1518        fn test_rpc_selector_rate_limit_round_robin_fallback() {
1519            RpcHealthStore::instance().clear_all();
1520            let configs = vec![
1521                RpcConfig {
1522                    url: "https://test-rr-fallback1.example.com".to_string(),
1523                    weight: 100,
1524                    ..Default::default()
1525                },
1526                RpcConfig {
1527                    url: "https://test-rr-fallback2.example.com".to_string(),
1528                    weight: 100,
1529                    ..Default::default()
1530                },
1531                RpcConfig {
1532                    url: "https://test-rr-fallback3.example.com".to_string(),
1533                    weight: 100,
1534                    ..Default::default()
1535                },
1536            ];
1537
1538            // Create selector with threshold=1 for testing
1539            let selector = RpcSelector::new(configs, 1, 60, 60).unwrap();
1540
1541            // Mark rpc1 as failed (simulating rate limit)
1542            selector.select_url().unwrap();
1543            let first_url = selector.get_current_url().unwrap();
1544
1545            // If we got rpc1, mark it as failed
1546            if first_url == "https://test-rr-fallback1.example.com" {
1547                selector.mark_current_as_failed();
1548            } else {
1549                // Otherwise, select until we get rpc1, then mark it as failed
1550                loop {
1551                    let url = selector.select_url().unwrap();
1552                    if url == "https://test-rr-fallback1.example.com" {
1553                        selector.mark_current_as_failed();
1554                        break;
1555                    }
1556                }
1557            }
1558
1559            // Now rpc1 should be paused, and selections should prefer rpc2 and rpc3 (non-paused)
1560            let mut selected_urls = std::collections::HashSet::new();
1561            for _ in 0..20 {
1562                let url = selector.select_url().unwrap();
1563                selected_urls.insert(url.to_string());
1564                // rpc1 should not be selected (prefer non-paused)
1565                assert_ne!(url, "https://test-rr-fallback1.example.com");
1566            }
1567
1568            // Should have selected from both rpc2 and rpc3
1569            assert!(selected_urls.contains("https://test-rr-fallback2.example.com"));
1570            assert!(selected_urls.contains("https://test-rr-fallback3.example.com"));
1571            assert_eq!(selected_urls.len(), 2);
1572        }
1573
1574        #[test]
1575        #[serial]
1576        fn test_select_url_excludes_tried_providers() {
1577            RpcHealthStore::instance().clear_all();
1578            let configs = vec![
1579                RpcConfig {
1580                    url: "https://provider1.com".to_string(),
1581                    weight: 1,
1582                    ..Default::default()
1583                },
1584                RpcConfig {
1585                    url: "https://provider2.com".to_string(),
1586                    weight: 1,
1587                    ..Default::default()
1588                },
1589                RpcConfig {
1590                    url: "https://provider3.com".to_string(),
1591                    weight: 1,
1592                    ..Default::default()
1593                },
1594            ];
1595
1596            let selector = RpcSelector::new_with_defaults(configs).unwrap();
1597
1598            // Exclude provider1
1599            let mut excluded = std::collections::HashSet::new();
1600            excluded.insert("https://provider1.com".to_string());
1601
1602            // Should select provider2 or provider3, not provider1
1603            for _ in 0..10 {
1604                let url = selector.get_next_url(&excluded).unwrap();
1605                assert_ne!(url, "https://provider1.com");
1606            }
1607        }
1608
1609        #[test]
1610        #[serial]
1611        fn test_select_url_fallback_to_paused_providers() {
1612            RpcHealthStore::instance().clear_all();
1613            let configs = vec![
1614                RpcConfig {
1615                    url: "https://provider1.com".to_string(),
1616                    weight: 1,
1617                    ..Default::default()
1618                },
1619                RpcConfig {
1620                    url: "https://provider2.com".to_string(),
1621                    weight: 1,
1622                    ..Default::default()
1623                },
1624            ];
1625
1626            let selector = RpcSelector::new_with_defaults(configs).unwrap();
1627            let health_store = RpcHealthStore::instance();
1628            let expiration = chrono::Duration::seconds(60);
1629
1630            // Pause both providers
1631            health_store.mark_failed(
1632                "https://provider1.com",
1633                3,
1634                chrono::Duration::seconds(60),
1635                expiration,
1636            );
1637            health_store.mark_failed(
1638                "https://provider1.com",
1639                3,
1640                chrono::Duration::seconds(60),
1641                expiration,
1642            );
1643            health_store.mark_failed(
1644                "https://provider1.com",
1645                3,
1646                chrono::Duration::seconds(60),
1647                expiration,
1648            );
1649
1650            health_store.mark_failed(
1651                "https://provider2.com",
1652                3,
1653                chrono::Duration::seconds(60),
1654                expiration,
1655            );
1656            health_store.mark_failed(
1657                "https://provider2.com",
1658                3,
1659                chrono::Duration::seconds(60),
1660                expiration,
1661            );
1662            health_store.mark_failed(
1663                "https://provider2.com",
1664                3,
1665                chrono::Duration::seconds(60),
1666                expiration,
1667            );
1668
1669            // Both should be paused
1670            assert!(health_store.is_paused("https://provider1.com", 3, expiration));
1671            assert!(health_store.is_paused("https://provider2.com", 3, expiration));
1672
1673            // Should still be able to select (fallback to paused providers)
1674            let url = selector
1675                .get_next_url(&std::collections::HashSet::new())
1676                .unwrap();
1677            assert!(url == "https://provider1.com" || url == "https://provider2.com");
1678        }
1679
1680        #[test]
1681        #[serial]
1682        fn test_select_url_single_provider_excluded() {
1683            RpcHealthStore::instance().clear_all();
1684            let configs = vec![RpcConfig {
1685                url: "https://single-provider.com".to_string(),
1686                weight: 1,
1687                ..Default::default()
1688            }];
1689
1690            let selector = RpcSelector::new_with_defaults(configs).unwrap();
1691
1692            // Exclude the only provider
1693            let mut excluded = std::collections::HashSet::new();
1694            excluded.insert("https://single-provider.com".to_string());
1695
1696            // Should return error
1697            let result = selector.get_next_url(&excluded);
1698            assert!(result.is_err());
1699            assert!(matches!(
1700                result.unwrap_err(),
1701                RpcSelectorError::AllProvidersFailed
1702            ));
1703        }
1704
1705        #[test]
1706        #[serial]
1707        fn test_select_url_all_providers_excluded() {
1708            RpcHealthStore::instance().clear_all();
1709            let configs = vec![
1710                RpcConfig {
1711                    url: "https://provider1.com".to_string(),
1712                    weight: 1,
1713                    ..Default::default()
1714                },
1715                RpcConfig {
1716                    url: "https://provider2.com".to_string(),
1717                    weight: 1,
1718                    ..Default::default()
1719                },
1720            ];
1721
1722            let selector = RpcSelector::new_with_defaults(configs).unwrap();
1723
1724            // Exclude all providers
1725            let mut excluded = std::collections::HashSet::new();
1726            excluded.insert("https://provider1.com".to_string());
1727            excluded.insert("https://provider2.com".to_string());
1728
1729            // Should return error
1730            let result = selector.get_next_url(&excluded);
1731            assert!(result.is_err());
1732            assert!(matches!(
1733                result.unwrap_err(),
1734                RpcSelectorError::AllProvidersFailed
1735            ));
1736        }
1737
1738        #[test]
1739        #[serial]
1740        fn test_select_url_excluded_providers_with_weighted_selection() {
1741            RpcHealthStore::instance().clear_all();
1742            let configs = vec![
1743                RpcConfig {
1744                    url: "https://provider1.com".to_string(),
1745                    weight: 10,
1746                    ..Default::default()
1747                },
1748                RpcConfig {
1749                    url: "https://provider2.com".to_string(),
1750                    weight: 1,
1751                    ..Default::default()
1752                },
1753                RpcConfig {
1754                    url: "https://provider3.com".to_string(),
1755                    weight: 1,
1756                    ..Default::default()
1757                },
1758            ];
1759
1760            let selector = RpcSelector::new_with_defaults(configs).unwrap();
1761
1762            // Exclude provider1 (highest weight)
1763            let mut excluded = std::collections::HashSet::new();
1764            excluded.insert("https://provider1.com".to_string());
1765
1766            // Should select from provider2 or provider3, never provider1
1767            for _ in 0..20 {
1768                let url = selector.get_next_url(&excluded).unwrap();
1769                assert_ne!(url, "https://provider1.com");
1770            }
1771        }
1772    }
1773}