1use std::collections::HashSet;
13use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
14use std::sync::Arc;
15
16use eyre::Result;
17use parking_lot::RwLock;
18use rand::distr::weighted::WeightedIndex;
19use rand::prelude::*;
20use serde::Serialize;
21use thiserror::Error;
22use tracing::info;
23
24use crate::models::RpcConfig;
25use crate::services::provider::rpc_health_store::RpcHealthStore;
26
27#[derive(Error, Debug, Serialize)]
28pub enum RpcSelectorError {
29 #[error("No providers available")]
30 NoProviders,
31 #[error("Client initialization failed: {0}")]
32 ClientInitializationError(String),
33 #[error("Weighted index error: {0}")]
34 WeightedIndexError(String),
35 #[error("All available providers have failed")]
36 AllProvidersFailed,
37}
38
39#[derive(Debug)]
41pub struct RpcSelector {
42 configs: Arc<RwLock<Vec<RpcConfig>>>,
44 weights_dist: Option<Arc<WeightedIndex<u8>>>,
46 next_index: Arc<AtomicUsize>,
48 current_index: Arc<AtomicUsize>,
50 has_current: Arc<AtomicBool>,
52 failure_threshold: u32,
54 pause_duration_secs: u64,
56 failure_expiration_secs: u64,
58}
59
60impl RpcSelector {
61 pub fn new(
80 configs: Vec<RpcConfig>,
81 failure_threshold: u32,
82 pause_duration_secs: u64,
83 failure_expiration_secs: u64,
84 ) -> Result<Self, RpcSelectorError> {
85 if configs.is_empty() {
86 return Err(RpcSelectorError::NoProviders);
87 }
88
89 let weights_dist = Self::create_weights_distribution(&configs, &HashSet::new());
91
92 let selector = Self {
93 configs: Arc::new(RwLock::new(configs)),
94 weights_dist,
95 next_index: Arc::new(AtomicUsize::new(0)),
96 current_index: Arc::new(AtomicUsize::new(0)),
97 has_current: Arc::new(AtomicBool::new(false)), failure_threshold,
99 pause_duration_secs,
100 failure_expiration_secs,
101 };
102
103 let mut rng = rand::rng();
105 selector.next_index.store(
106 rng.random_range(0..selector.configs.read().len()),
107 Ordering::Relaxed,
108 );
109
110 Ok(selector)
111 }
112
113 pub fn new_with_defaults(configs: Vec<RpcConfig>) -> Result<Self, RpcSelectorError> {
124 Self::new(
125 configs,
126 crate::config::ServerConfig::get_provider_failure_threshold(),
127 crate::config::ServerConfig::get_provider_pause_duration_secs(),
128 crate::config::ServerConfig::get_provider_failure_expiration_secs(),
129 )
130 }
131
132 pub fn provider_count(&self) -> usize {
137 self.configs.read().len()
138 }
139
140 pub fn available_provider_count(&self) -> usize {
145 let health_store = RpcHealthStore::instance();
146 let expiration = chrono::Duration::seconds(self.failure_expiration_secs as i64);
147 self.configs
148 .read()
149 .iter()
150 .filter(|c| !health_store.is_paused(&c.url, self.failure_threshold, expiration))
151 .count()
152 }
153
154 pub fn get_configs(&self) -> Vec<RpcConfig> {
159 self.configs.read().clone()
160 }
161
162 pub fn mark_current_as_failed(&self) {
167 info!("Marking current provider as failed");
168 if self.has_current.load(Ordering::Relaxed) {
170 let current = self.current_index.load(Ordering::Relaxed);
171 let configs = self.configs.read();
172 let config = &configs[current];
173
174 let health_store = RpcHealthStore::instance();
176 use chrono::Duration;
177 health_store.mark_failed(
178 &config.url,
179 self.failure_threshold,
180 Duration::seconds(self.pause_duration_secs as i64),
181 Duration::seconds(self.failure_expiration_secs as i64),
182 );
183
184 self.has_current.store(false, Ordering::Relaxed);
186
187 if configs.len() > 1 {
189 self.next_index.fetch_add(1, Ordering::Relaxed);
190 }
191 }
192 }
193
194 fn create_weights_distribution(
203 configs: &[RpcConfig],
204 excluded_indices: &HashSet<usize>,
205 ) -> Option<Arc<WeightedIndex<u8>>> {
206 let weights: Vec<u8> = configs
208 .iter()
209 .enumerate()
210 .map(|(idx, config)| {
211 if excluded_indices.contains(&idx) {
212 0
213 } else {
214 config.get_weight()
215 }
216 })
217 .collect();
218
219 let available_count = weights.iter().filter(|&&w| w > 0).count();
221 if available_count == 0 {
222 return None;
223 }
224
225 let first_non_zero_weight = weights.iter().find(|&&w| w > 0).copied();
226 if let Some(first_weight) = first_non_zero_weight {
227 let all_equal = weights
229 .iter()
230 .filter(|&&w| w > 0)
231 .all(|&w| w == first_weight);
232
233 if all_equal {
234 return None;
235 }
236 }
237
238 match WeightedIndex::new(&weights) {
240 Ok(dist) => Some(Arc::new(dist)),
241 Err(_) => None,
242 }
243 }
244
245 fn try_weighted_selection(
257 &self,
258 configs: &[RpcConfig],
259 excluded_urls: &std::collections::HashSet<String>,
260 allow_paused: bool,
261 health_store: &RpcHealthStore,
262 expiration: chrono::Duration,
263 ) -> Option<(usize, String)> {
264 let dist = self.weights_dist.as_ref()?;
265 let mut rng = rand::rng();
266
267 const MAX_ATTEMPTS: usize = 10;
268 for _ in 0..MAX_ATTEMPTS {
269 let index = dist.sample(&mut rng);
270 if configs[index].get_weight() == 0 {
272 continue;
273 }
274 if excluded_urls.contains(&configs[index].url) {
276 continue;
277 }
278 if !allow_paused
280 && health_store.is_paused(&configs[index].url, self.failure_threshold, expiration)
281 {
282 continue;
283 }
284 self.current_index.store(index, Ordering::Relaxed);
286 self.has_current.store(true, Ordering::Relaxed);
287 return Some((index, configs[index].url.clone()));
288 }
289 None
290 }
291
292 fn try_round_robin_selection(
305 &self,
306 configs: &[RpcConfig],
307 excluded_urls: &std::collections::HashSet<String>,
308 allow_paused: bool,
309 health_store: &RpcHealthStore,
310 expiration: chrono::Duration,
311 start_index: usize,
312 ) -> Option<(usize, String)> {
313 let len = configs.len();
314 for i in 0..len {
315 let index = (start_index + i) % len;
316 if configs[index].get_weight() == 0 {
318 continue;
319 }
320 if excluded_urls.contains(&configs[index].url) {
322 continue;
323 }
324 if !allow_paused
326 && health_store.is_paused(&configs[index].url, self.failure_threshold, expiration)
327 {
328 continue;
329 }
330 self.next_index.store((index + 1) % len, Ordering::Relaxed);
333 self.current_index.store(index, Ordering::Relaxed);
334 self.has_current.store(true, Ordering::Relaxed);
335 return Some((index, configs[index].url.clone()));
336 }
337 None
338 }
339
340 fn select_url_internal(
349 &self,
350 excluded_urls: &std::collections::HashSet<String>,
351 ) -> Result<String, RpcSelectorError> {
352 let configs = self.configs.read();
353 if configs.is_empty() {
354 return Err(RpcSelectorError::NoProviders);
355 }
356
357 let health_store = RpcHealthStore::instance();
358 let expiration = chrono::Duration::seconds(self.failure_expiration_secs as i64);
359
360 if configs.len() == 1 {
362 if configs[0].get_weight() == 0 {
364 return Err(RpcSelectorError::AllProvidersFailed);
365 }
366 if excluded_urls.contains(&configs[0].url) {
368 return Err(RpcSelectorError::AllProvidersFailed);
369 }
370 self.current_index.store(0, Ordering::Relaxed);
372 self.has_current.store(true, Ordering::Relaxed);
373 return Ok(configs[0].url.clone());
374 }
375
376 if let Some((_, url)) = self.try_weighted_selection(
379 &configs,
380 excluded_urls,
381 false, health_store,
383 expiration,
384 ) {
385 return Ok(url);
386 }
387
388 let start_index = self.next_index.load(Ordering::Relaxed) % configs.len();
390 if let Some((_, url)) = self.try_round_robin_selection(
391 &configs,
392 excluded_urls,
393 false, health_store,
395 expiration,
396 start_index,
397 ) {
398 return Ok(url);
399 }
400
401 tracing::warn!(
404 "No non-paused providers available, falling back to paused providers as last resort"
405 );
406
407 if let Some((_, url)) = self.try_weighted_selection(
409 &configs,
410 excluded_urls,
411 true, health_store,
413 expiration,
414 ) {
415 return Ok(url);
416 }
417
418 if let Some((_, url)) = self.try_round_robin_selection(
420 &configs,
421 excluded_urls,
422 true, health_store,
424 expiration,
425 start_index,
426 ) {
427 return Ok(url);
428 }
429
430 Err(RpcSelectorError::AllProvidersFailed)
432 }
433
434 pub fn get_current_url(&self) -> Result<String, RpcSelectorError> {
439 self.select_url_internal(&std::collections::HashSet::new())
440 }
441
442 pub fn get_next_url(
450 &self,
451 excluded_urls: &std::collections::HashSet<String>,
452 ) -> Result<String, RpcSelectorError> {
453 self.select_url_internal(excluded_urls)
454 }
455
456 #[cfg(test)]
459 pub fn select_url(&self) -> Result<String, RpcSelectorError> {
460 self.select_url_internal(&std::collections::HashSet::new())
461 }
462
463 pub fn get_client<T>(
472 &self,
473 initializer: impl Fn(&str) -> Result<T>,
474 excluded_urls: &std::collections::HashSet<String>,
475 ) -> Result<T, RpcSelectorError> {
476 let url = self.select_url_internal(excluded_urls)?;
477
478 initializer(&url).map_err(|e| RpcSelectorError::ClientInitializationError(e.to_string()))
479 }
480}
481
482impl Clone for RpcSelector {
484 fn clone(&self) -> Self {
485 Self {
486 configs: Arc::new(RwLock::new(self.configs.read().clone())),
487 weights_dist: self.weights_dist.clone(),
488 next_index: Arc::clone(&self.next_index),
489 current_index: Arc::clone(&self.current_index),
490 has_current: Arc::clone(&self.has_current),
491 failure_threshold: self.failure_threshold,
492 pause_duration_secs: self.pause_duration_secs,
493 failure_expiration_secs: self.failure_expiration_secs,
494 }
495 }
496}
497
498#[cfg(test)]
499mod tests {
500 use super::*;
501 use crate::services::provider::rpc_health_store::RpcHealthStore;
502 use serial_test::serial;
503 use std::sync::Arc;
504 use std::thread;
505
506 #[test]
507 fn test_create_weights_distribution_single_config() {
508 let configs = vec![RpcConfig {
509 url: "https://example.com/rpc".to_string(),
510 weight: 1,
511 ..Default::default()
512 }];
513
514 let excluded = HashSet::new();
515 let result = RpcSelector::create_weights_distribution(&configs, &excluded);
516 assert!(result.is_none());
517 }
518
519 #[test]
520 fn test_create_weights_distribution_equal_weights() {
521 let configs = vec![
522 RpcConfig {
523 url: "https://example1.com/rpc".to_string(),
524 weight: 5,
525 ..Default::default()
526 },
527 RpcConfig {
528 url: "https://example2.com/rpc".to_string(),
529 weight: 5,
530 ..Default::default()
531 },
532 RpcConfig {
533 url: "https://example3.com/rpc".to_string(),
534 weight: 5,
535 ..Default::default()
536 },
537 ];
538
539 let excluded = HashSet::new();
540 let result = RpcSelector::create_weights_distribution(&configs, &excluded);
541 assert!(result.is_none());
542 }
543
544 #[test]
545 fn test_create_weights_distribution_different_weights() {
546 let configs = vec![
547 RpcConfig {
548 url: "https://example1.com/rpc".to_string(),
549 weight: 1,
550 ..Default::default()
551 },
552 RpcConfig {
553 url: "https://example2.com/rpc".to_string(),
554 weight: 2,
555 ..Default::default()
556 },
557 RpcConfig {
558 url: "https://example3.com/rpc".to_string(),
559 weight: 3,
560 ..Default::default()
561 },
562 ];
563
564 let excluded = HashSet::new();
565 let result = RpcSelector::create_weights_distribution(&configs, &excluded);
566 assert!(result.is_some());
567 }
568
569 #[test]
570 fn test_create_weights_distribution_with_excluded() {
571 let configs = vec![
572 RpcConfig {
573 url: "https://example1.com/rpc".to_string(),
574 weight: 1,
575 ..Default::default()
576 },
577 RpcConfig {
578 url: "https://example2.com/rpc".to_string(),
579 weight: 2,
580 ..Default::default()
581 },
582 RpcConfig {
583 url: "https://example3.com/rpc".to_string(),
584 weight: 3,
585 ..Default::default()
586 },
587 ];
588
589 let mut excluded = HashSet::new();
591 excluded.insert(0);
592
593 let result = RpcSelector::create_weights_distribution(&configs, &excluded);
594 assert!(result.is_some());
595
596 excluded.insert(1);
598 let result = RpcSelector::create_weights_distribution(&configs, &excluded);
599 assert!(result.is_none());
600 }
601
602 #[test]
603 fn test_rpc_selector_new_empty_configs() {
604 let configs: Vec<RpcConfig> = vec![];
605 let result = RpcSelector::new_with_defaults(configs);
606 assert!(result.is_err());
607 assert!(matches!(result.unwrap_err(), RpcSelectorError::NoProviders));
608 }
609
610 #[test]
611 fn test_rpc_selector_new_single_config() {
612 let configs = vec![RpcConfig {
613 url: "https://example.com/rpc".to_string(),
614 weight: 1,
615 ..Default::default()
616 }];
617
618 let result = RpcSelector::new_with_defaults(configs);
619 assert!(result.is_ok());
620 let selector = result.unwrap();
621 assert!(selector.weights_dist.is_none());
622 }
623
624 #[test]
625 fn test_rpc_selector_new_multiple_equal_weights() {
626 let configs = vec![
627 RpcConfig {
628 url: "https://example1.com/rpc".to_string(),
629 weight: 5,
630 ..Default::default()
631 },
632 RpcConfig {
633 url: "https://example2.com/rpc".to_string(),
634 weight: 5,
635 ..Default::default()
636 },
637 ];
638
639 let result = RpcSelector::new_with_defaults(configs);
640 assert!(result.is_ok());
641 let selector = result.unwrap();
642 assert!(selector.weights_dist.is_none());
643 }
644
645 #[test]
646 fn test_rpc_selector_new_multiple_different_weights() {
647 let configs = vec![
648 RpcConfig {
649 url: "https://example1.com/rpc".to_string(),
650 weight: 1,
651 ..Default::default()
652 },
653 RpcConfig {
654 url: "https://example2.com/rpc".to_string(),
655 weight: 3,
656 ..Default::default()
657 },
658 ];
659
660 let result = RpcSelector::new_with_defaults(configs);
661 assert!(result.is_ok());
662 let selector = result.unwrap();
663 assert!(selector.weights_dist.is_some());
664 }
665
666 #[test]
667 fn test_rpc_selector_select_url_single_provider() {
668 let configs = vec![RpcConfig {
669 url: "https://example.com/rpc".to_string(),
670 weight: 1,
671 ..Default::default()
672 }];
673
674 let selector = RpcSelector::new_with_defaults(configs).unwrap();
675 let result = selector.select_url();
676 assert!(result.is_ok());
677 assert_eq!(result.unwrap(), "https://example.com/rpc");
678 assert!(selector.has_current.load(Ordering::Relaxed));
679 }
680
681 #[test]
682 fn test_rpc_selector_select_url_round_robin() {
683 let configs = vec![
684 RpcConfig {
685 url: "https://example1.com/rpc".to_string(),
686 weight: 1,
687 ..Default::default()
688 },
689 RpcConfig {
690 url: "https://example2.com/rpc".to_string(),
691 weight: 1,
692 ..Default::default()
693 },
694 ];
695
696 let selector = RpcSelector::new_with_defaults(configs).unwrap();
697
698 let first_url = selector.select_url().unwrap();
700 let second_url = selector.select_url().unwrap();
702 let third_url = selector.select_url().unwrap();
704
705 assert_ne!(first_url, second_url);
707 assert_eq!(first_url, third_url);
708 }
709
710 #[test]
711 fn test_rpc_selector_get_client_success() {
712 let configs = vec![RpcConfig {
713 url: "https://example.com/rpc".to_string(),
714 weight: 1,
715 ..Default::default()
716 }];
717
718 let selector = RpcSelector::new_with_defaults(configs).unwrap();
719
720 let initializer = |url: &str| -> Result<String> { Ok(url.to_string()) };
722
723 let result = selector.get_client(initializer, &std::collections::HashSet::new());
724 assert!(result.is_ok());
725 assert_eq!(result.unwrap(), "https://example.com/rpc");
726 }
727
728 #[test]
729 fn test_rpc_selector_get_client_failure() {
730 let configs = vec![RpcConfig {
731 url: "https://example.com/rpc".to_string(),
732 weight: 1,
733 ..Default::default()
734 }];
735
736 let selector = RpcSelector::new_with_defaults(configs).unwrap();
737
738 let initializer =
740 |_url: &str| -> Result<String> { Err(eyre::eyre!("Initialization error")) };
741
742 let result = selector.get_client(initializer, &std::collections::HashSet::new());
743 assert!(result.is_err());
744 assert!(matches!(
745 result.unwrap_err(),
746 RpcSelectorError::ClientInitializationError(_)
747 ));
748 }
749
750 #[test]
751 fn test_rpc_selector_clone() {
752 let configs = vec![
753 RpcConfig {
754 url: "https://example1.com/rpc".to_string(),
755 weight: 1,
756 ..Default::default()
757 },
758 RpcConfig {
759 url: "https://example2.com/rpc".to_string(),
760 weight: 3,
761 ..Default::default()
762 },
763 ];
764
765 let selector = RpcSelector::new_with_defaults(configs).unwrap();
766 let cloned = selector.clone();
767
768 assert_eq!(selector.configs.read().len(), cloned.configs.read().len());
770 assert_eq!(selector.configs.read()[0].url, cloned.configs.read()[0].url);
771 assert_eq!(selector.configs.read()[1].url, cloned.configs.read()[1].url);
772
773 assert_eq!(
775 selector.weights_dist.is_some(),
776 cloned.weights_dist.is_some()
777 );
778 }
779
780 #[test]
781 #[serial]
782 fn test_mark_current_as_failed_single_provider() {
783 RpcHealthStore::instance().clear_all();
785
786 let configs = vec![RpcConfig {
789 url: "https://test-single-provider.example.com/rpc".to_string(),
790 weight: 1,
791 ..Default::default()
792 }];
793
794 let selector = RpcSelector::new(configs, 1, 60, 60).unwrap();
796 let _initial_url = selector.select_url().unwrap();
797
798 selector.mark_current_as_failed();
800
801 assert_eq!(selector.available_provider_count(), 0);
803
804 let next_url = selector.select_url();
806 assert!(next_url.is_ok());
807 assert_eq!(
808 next_url.unwrap(),
809 "https://test-single-provider.example.com/rpc"
810 );
811 }
812
813 #[test]
814 #[serial]
815 fn test_mark_current_as_failed_multiple_providers() {
816 RpcHealthStore::instance().clear_all();
818
819 let configs = vec![
822 RpcConfig {
823 url: "https://test-multi1.example.com/rpc".to_string(),
824 weight: 5,
825 ..Default::default()
826 },
827 RpcConfig {
828 url: "https://test-multi2.example.com/rpc".to_string(),
829 weight: 5,
830 ..Default::default()
831 },
832 RpcConfig {
833 url: "https://test-multi3.example.com/rpc".to_string(),
834 weight: 5,
835 ..Default::default()
836 },
837 ];
838
839 let selector = RpcSelector::new(configs, 1, 60, 60).unwrap();
841
842 let url1 = selector.select_url().unwrap().to_string();
844
845 selector.mark_current_as_failed();
847 assert_eq!(selector.available_provider_count(), 2);
849
850 let url2 = selector.select_url().unwrap().to_string();
852 assert_ne!(url1, url2);
854
855 selector.mark_current_as_failed();
857 assert_eq!(selector.available_provider_count(), 1);
858
859 let url3 = selector.select_url().unwrap().to_string();
860 assert_ne!(url1, url3);
862 assert_ne!(url2, url3);
863
864 selector.mark_current_as_failed();
866 assert_eq!(selector.available_provider_count(), 0);
867
868 let url4 = selector.select_url();
870 assert!(url4.is_ok());
871 let url4_str = url4.unwrap();
873 assert!(
874 url4_str == "https://test-multi1.example.com/rpc"
875 || url4_str == "https://test-multi2.example.com/rpc"
876 || url4_str == "https://test-multi3.example.com/rpc"
877 );
878 }
879
880 #[test]
881 #[serial]
882 fn test_mark_current_as_failed_weighted() {
883 RpcHealthStore::instance().clear_all();
885
886 let configs = vec![
888 RpcConfig {
889 url: "https://test-weighted1.example.com/rpc".to_string(),
890 weight: 1, ..Default::default()
892 },
893 RpcConfig {
894 url: "https://test-weighted2.example.com/rpc".to_string(),
895 weight: 10, ..Default::default()
897 },
898 ];
899
900 let selector = RpcSelector::new(configs, 1, 60, 60).unwrap();
902 assert!(selector.weights_dist.is_some()); let url1 = selector.select_url().unwrap().to_string();
906
907 selector.mark_current_as_failed();
909 assert_eq!(selector.available_provider_count(), 1);
910
911 let url2 = selector.select_url().unwrap().to_string();
913 assert_ne!(url1, url2);
914
915 selector.mark_current_as_failed();
917 assert_eq!(selector.available_provider_count(), 0);
918
919 let url3 = selector.select_url();
921 assert!(url3.is_ok());
922 let url3_str = url3.unwrap();
923 assert!(
924 url3_str == "https://test-weighted1.example.com/rpc"
925 || url3_str == "https://test-weighted2.example.com/rpc"
926 );
927 }
928
929 #[test]
930 fn test_provider_count() {
931 let configs: Vec<RpcConfig> = vec![];
933 let result = RpcSelector::new_with_defaults(configs);
934 assert!(result.is_err());
935
936 let configs = vec![RpcConfig {
938 url: "https://example.com/rpc".to_string(),
939 weight: 1,
940 ..Default::default()
941 }];
942 let selector = RpcSelector::new_with_defaults(configs).unwrap();
943 assert_eq!(selector.provider_count(), 1);
944
945 let configs = vec![
947 RpcConfig {
948 url: "https://example1.com/rpc".to_string(),
949 weight: 1,
950 ..Default::default()
951 },
952 RpcConfig {
953 url: "https://example2.com/rpc".to_string(),
954 weight: 2,
955 ..Default::default()
956 },
957 RpcConfig {
958 url: "https://example3.com/rpc".to_string(),
959 weight: 3,
960 ..Default::default()
961 },
962 ];
963 let selector = RpcSelector::new_with_defaults(configs).unwrap();
964 assert_eq!(selector.provider_count(), 3);
965 }
966
967 #[test]
968 #[serial]
969 fn test_available_provider_count() {
970 RpcHealthStore::instance().clear_all();
972
973 let configs = vec![
974 RpcConfig {
975 url: "https://test-available1.example.com/rpc".to_string(),
976 weight: 1,
977 ..Default::default()
978 },
979 RpcConfig {
980 url: "https://test-available2.example.com/rpc".to_string(),
981 weight: 2,
982 ..Default::default()
983 },
984 RpcConfig {
985 url: "https://test-available3.example.com/rpc".to_string(),
986 weight: 3,
987 ..Default::default()
988 },
989 ];
990
991 let selector = RpcSelector::new(configs, 1, 60, 60).unwrap();
993 assert_eq!(selector.provider_count(), 3);
994 assert_eq!(selector.available_provider_count(), 3);
995
996 selector.select_url().unwrap(); selector.mark_current_as_failed();
999 assert_eq!(selector.available_provider_count(), 2);
1001
1002 selector.select_url().unwrap(); selector.mark_current_as_failed();
1005 assert_eq!(selector.available_provider_count(), 1);
1006 }
1007
1008 #[test]
1009 fn test_get_current_url() {
1010 let configs = vec![
1011 RpcConfig::new("https://example1.com/rpc".to_string()),
1012 RpcConfig::new("https://example2.com/rpc".to_string()),
1013 ];
1014
1015 let selector = RpcSelector::new_with_defaults(configs).unwrap();
1016
1017 let url = selector.get_current_url();
1019 assert!(url.is_ok());
1020 let url_str = url.unwrap();
1021 assert!(
1022 url_str == "https://example1.com/rpc" || url_str == "https://example2.com/rpc",
1023 "Unexpected URL: {}",
1024 url_str
1025 );
1026 }
1027
1028 #[test]
1029 #[serial]
1030 fn test_concurrent_usage() {
1031 RpcHealthStore::instance().clear_all();
1033
1034 let configs = vec![
1036 RpcConfig::new("https://test-concurrent1.example.com/rpc".to_string()),
1037 RpcConfig::new("https://test-concurrent2.example.com/rpc".to_string()),
1038 RpcConfig::new("https://test-concurrent3.example.com/rpc".to_string()),
1039 ];
1040
1041 let selector = RpcSelector::new(configs, 1, 60, 60).unwrap();
1043 let selector_arc = Arc::new(selector);
1044
1045 let mut handles = Vec::with_capacity(10);
1046
1047 for _ in 0..10 {
1049 let selector_clone = Arc::clone(&selector_arc);
1050 let handle = thread::spawn(move || {
1051 let url = selector_clone.select_url().unwrap().to_string();
1052 if url.contains("test-concurrent1") {
1053 selector_clone.mark_current_as_failed();
1055 }
1056 url
1057 });
1058 handles.push(handle);
1059 }
1060
1061 let mut urls = Vec::new();
1063 for handle in handles {
1064 urls.push(handle.join().unwrap());
1065 }
1066
1067 let unique_urls: std::collections::HashSet<String> = urls.into_iter().collect();
1069 assert!(unique_urls.len() > 1, "Expected multiple unique URLs");
1070
1071 let mut found_non_example1 = false;
1074 for _ in 0..10 {
1075 let url = selector_arc.select_url().unwrap().to_string();
1076 if !url.contains("test-concurrent1") {
1077 found_non_example1 = true;
1078 }
1079 }
1080
1081 assert!(found_non_example1, "Should prefer non-paused providers");
1082 }
1083
1084 #[test]
1085 fn test_consecutive_mark_as_failed() {
1086 let configs = vec![
1087 RpcConfig::new("https://example1.com/rpc".to_string()),
1088 RpcConfig::new("https://example2.com/rpc".to_string()),
1089 ];
1090
1091 let selector = RpcSelector::new_with_defaults(configs).unwrap();
1092
1093 selector.select_url().unwrap();
1095
1096 selector.mark_current_as_failed();
1098 selector.mark_current_as_failed(); let result = selector.select_url();
1102 assert!(result.is_ok());
1103 }
1104
1105 #[test]
1106 #[serial]
1107 fn test_weighted_to_round_robin_fallback() {
1108 RpcHealthStore::instance().clear_all();
1110
1111 let configs = vec![
1112 RpcConfig {
1113 url: "https://test-wrr1.example.com/rpc".to_string(),
1114 weight: 10, ..Default::default()
1116 },
1117 RpcConfig {
1118 url: "https://test-wrr2.example.com/rpc".to_string(),
1119 weight: 1, ..Default::default()
1121 },
1122 RpcConfig {
1123 url: "https://test-wrr3.example.com/rpc".to_string(),
1124 weight: 1, ..Default::default()
1126 },
1127 ];
1128
1129 let selector = RpcSelector::new(configs, 1, 60, 60).unwrap();
1131 assert!(selector.weights_dist.is_some()); let mut selected_first = false;
1136
1137 for _ in 0..10 {
1139 let url = selector.select_url().unwrap();
1140 if url.contains("test-wrr1") {
1141 selected_first = true;
1142 selector.mark_current_as_failed();
1144 break;
1145 }
1146 }
1147
1148 assert!(
1149 selected_first,
1150 "High-weight provider should have been selected"
1151 );
1152
1153 let mut seen_urls = HashSet::new();
1155 for _ in 0..10 {
1156 let url = selector.select_url().unwrap().to_string();
1157 seen_urls.insert(url);
1158 }
1159
1160 assert!(seen_urls.len() >= 2);
1162 assert!(
1163 !seen_urls.iter().any(|url| url.contains("test-wrr1")),
1164 "Paused provider should not be selected (prefer non-paused)"
1165 );
1166 }
1167
1168 #[test]
1169 fn test_zero_weight_providers() {
1170 let configs = vec![
1171 RpcConfig {
1172 url: "https://example1.com/rpc".to_string(),
1173 weight: 0, ..Default::default()
1175 },
1176 RpcConfig {
1177 url: "https://example2.com/rpc".to_string(),
1178 weight: 5, ..Default::default()
1180 },
1181 ];
1182
1183 let selector = RpcSelector::new_with_defaults(configs).unwrap();
1184
1185 let mut seen_urls = HashSet::new();
1187 for _ in 0..10 {
1188 let url = selector.select_url().unwrap().to_string();
1189 seen_urls.insert(url);
1190 }
1191
1192 assert_eq!(seen_urls.len(), 1);
1193 assert!(
1194 seen_urls.iter().next().unwrap().contains("example2"),
1195 "Only the non-zero weight provider should be selected"
1196 );
1197 }
1198
1199 #[test]
1200 #[serial]
1201 fn test_extreme_weight_differences() {
1202 let configs = vec![
1203 RpcConfig {
1204 url: "https://example1.com/rpc".to_string(),
1205 weight: 100, ..Default::default()
1207 },
1208 RpcConfig {
1209 url: "https://example2.com/rpc".to_string(),
1210 weight: 1, ..Default::default()
1212 },
1213 ];
1214
1215 let selector = RpcSelector::new_with_defaults(configs).unwrap();
1216
1217 let mut count_high = 0;
1219
1220 for _ in 0..100 {
1221 let url = selector.select_url().unwrap().to_string();
1222 if url.contains("example1") {
1223 count_high += 1;
1224 }
1225
1226 selector.has_current.store(false, Ordering::Relaxed);
1228 }
1229
1230 assert!(
1232 count_high > 90,
1233 "High-weight provider selected only {}/{} times",
1234 count_high,
1235 100
1236 );
1237 }
1238
1239 #[test]
1240 fn test_mark_unselected_as_failed() {
1241 let configs = vec![
1242 RpcConfig::new("https://example1.com/rpc".to_string()),
1243 RpcConfig::new("https://example2.com/rpc".to_string()),
1244 ];
1245
1246 let selector = RpcSelector::new_with_defaults(configs).unwrap();
1247
1248 selector.mark_current_as_failed();
1250
1251 let mut seen_urls = HashSet::new();
1253 for _ in 0..10 {
1254 let url = selector.select_url().unwrap().to_string();
1255 seen_urls.insert(url);
1256
1257 selector.has_current.store(false, Ordering::Relaxed);
1259 }
1260
1261 assert_eq!(
1262 seen_urls.len(),
1263 2,
1264 "Both providers should still be available"
1265 );
1266 }
1267
1268 #[test]
1269 fn test_rpc_selector_error_serialization() {
1270 let error = RpcSelectorError::NoProviders;
1271 let json = serde_json::to_string(&error).unwrap();
1272 assert!(json.contains("NoProviders"));
1273
1274 let error = RpcSelectorError::ClientInitializationError("test error".to_string());
1275 let json = serde_json::to_string(&error).unwrap();
1276 assert!(json.contains("ClientInitializationError"));
1277 assert!(json.contains("test error"));
1278
1279 let error = RpcSelectorError::WeightedIndexError("index error".to_string());
1280 let json = serde_json::to_string(&error).unwrap();
1281 assert!(json.contains("WeightedIndexError"));
1282 assert!(json.contains("index error"));
1283
1284 let error = RpcSelectorError::AllProvidersFailed;
1285 let json = serde_json::to_string(&error).unwrap();
1286 assert!(json.contains("AllProvidersFailed"));
1287 }
1288
1289 #[cfg(test)]
1290 mod rate_limiting_tests {
1291 use super::*;
1292 use crate::services::provider::rpc_health_store::RpcHealthStore;
1293
1294 #[test]
1302 #[serial]
1303 fn test_rpc_selector_switches_on_rate_limit() {
1304 RpcHealthStore::instance().clear_all();
1305 let configs = vec![
1306 RpcConfig {
1307 url: "https://test-rate-limit1.example.com".to_string(),
1308 weight: 100,
1309 ..Default::default()
1310 },
1311 RpcConfig {
1312 url: "https://test-rate-limit2.example.com".to_string(),
1313 weight: 100,
1314 ..Default::default()
1315 },
1316 ];
1317
1318 let selector = RpcSelector::new(configs, 1, 60, 60).unwrap();
1320
1321 assert_eq!(selector.available_provider_count(), 2);
1323
1324 let first_url = selector.select_url().unwrap();
1326
1327 assert!(
1329 first_url == "https://test-rate-limit1.example.com"
1330 || first_url == "https://test-rate-limit2.example.com"
1331 );
1332
1333 selector.mark_current_as_failed();
1336
1337 assert_eq!(selector.available_provider_count(), 1);
1339
1340 let second_url = selector.select_url().unwrap();
1342
1343 assert_ne!(first_url, second_url);
1345
1346 let third_url = selector.select_url().unwrap();
1348 assert_eq!(second_url, third_url); let mut selected_urls = std::collections::HashSet::new();
1352 for _ in 0..10 {
1353 let url = selector.select_url().unwrap();
1354 selected_urls.insert(url.to_string());
1355 }
1356
1357 assert_eq!(selected_urls.len(), 1);
1359 assert!(!selected_urls.contains(&first_url.to_string()));
1360 assert!(selected_urls.contains(&second_url.to_string()));
1361 }
1362
1363 #[test]
1368 #[serial]
1369 fn test_rpc_selector_rate_limit_with_weighted_selection() {
1370 RpcHealthStore::instance().clear_all();
1371 let configs = vec![
1372 RpcConfig {
1373 url: "https://test-weighted-rl1.example.com".to_string(),
1374 weight: 80, ..Default::default()
1376 },
1377 RpcConfig {
1378 url: "https://test-weighted-rl2.example.com".to_string(),
1379 weight: 20, ..Default::default()
1381 },
1382 ];
1383
1384 let selector = RpcSelector::new(configs, 1, 60, 60).unwrap();
1386
1387 let mut rpc1_count = 0;
1389 let mut rpc2_count = 0;
1390
1391 for _ in 0..20 {
1392 let url = selector.select_url().unwrap();
1393 if url == "https://test-weighted-rl1.example.com" {
1394 rpc1_count += 1;
1395 } else {
1396 rpc2_count += 1;
1397 }
1398 }
1399
1400 assert!(rpc1_count > rpc2_count);
1402
1403 let mut selected_rpc1 = false;
1406 for _ in 0..10 {
1407 let url = selector.select_url().unwrap();
1408 if url == "https://test-weighted-rl1.example.com" {
1409 selector.mark_current_as_failed();
1410 selected_rpc1 = true;
1411 break;
1412 }
1413 }
1414 assert!(selected_rpc1, "Should have selected rpc1 at least once");
1415
1416 for _ in 0..20 {
1418 let url = selector.select_url().unwrap();
1419 assert_eq!(url, "https://test-weighted-rl2.example.com");
1420 }
1421
1422 assert_eq!(selector.available_provider_count(), 1);
1424 }
1425
1426 #[test]
1431 #[serial]
1432 fn test_rpc_selector_rate_limit_recovery() {
1433 RpcHealthStore::instance().clear_all();
1434 let configs = vec![
1435 RpcConfig {
1436 url: "https://test-recovery1.example.com".to_string(),
1437 weight: 100,
1438 ..Default::default()
1439 },
1440 RpcConfig {
1441 url: "https://test-recovery2.example.com".to_string(),
1442 weight: 100,
1443 ..Default::default()
1444 },
1445 ];
1446
1447 let selector = RpcSelector::new(configs, 1, 60, 60).unwrap();
1449
1450 let first_url = selector.select_url().unwrap();
1452
1453 selector.mark_current_as_failed();
1455 assert_eq!(selector.available_provider_count(), 1);
1456
1457 let second_url = selector.select_url().unwrap();
1459 assert_ne!(first_url, second_url);
1460
1461 for _ in 0..10 {
1463 let url = selector.select_url().unwrap();
1464 assert_eq!(url, second_url);
1465 }
1466
1467 assert_eq!(selector.available_provider_count(), 1);
1469 }
1470
1471 #[test]
1473 #[serial]
1474 fn test_rpc_selector_both_providers_rate_limited() {
1475 RpcHealthStore::instance().clear_all();
1476 let configs = vec![
1477 RpcConfig {
1478 url: "https://test-both-rl1.example.com".to_string(),
1479 weight: 100,
1480 ..Default::default()
1481 },
1482 RpcConfig {
1483 url: "https://test-both-rl2.example.com".to_string(),
1484 weight: 100,
1485 ..Default::default()
1486 },
1487 ];
1488
1489 let selector = RpcSelector::new(configs, 1, 60, 60).unwrap();
1491
1492 selector.select_url().unwrap();
1494 selector.mark_current_as_failed();
1495 assert_eq!(selector.available_provider_count(), 1);
1496
1497 selector.select_url().unwrap();
1499 selector.mark_current_as_failed();
1500 assert_eq!(selector.available_provider_count(), 0);
1501
1502 let result = selector.select_url();
1504 assert!(result.is_ok());
1505 let url = result.unwrap();
1506 assert!(
1507 url == "https://test-both-rl1.example.com"
1508 || url == "https://test-both-rl2.example.com"
1509 );
1510 }
1511
1512 #[test]
1517 #[serial]
1518 fn test_rpc_selector_rate_limit_round_robin_fallback() {
1519 RpcHealthStore::instance().clear_all();
1520 let configs = vec![
1521 RpcConfig {
1522 url: "https://test-rr-fallback1.example.com".to_string(),
1523 weight: 100,
1524 ..Default::default()
1525 },
1526 RpcConfig {
1527 url: "https://test-rr-fallback2.example.com".to_string(),
1528 weight: 100,
1529 ..Default::default()
1530 },
1531 RpcConfig {
1532 url: "https://test-rr-fallback3.example.com".to_string(),
1533 weight: 100,
1534 ..Default::default()
1535 },
1536 ];
1537
1538 let selector = RpcSelector::new(configs, 1, 60, 60).unwrap();
1540
1541 selector.select_url().unwrap();
1543 let first_url = selector.get_current_url().unwrap();
1544
1545 if first_url == "https://test-rr-fallback1.example.com" {
1547 selector.mark_current_as_failed();
1548 } else {
1549 loop {
1551 let url = selector.select_url().unwrap();
1552 if url == "https://test-rr-fallback1.example.com" {
1553 selector.mark_current_as_failed();
1554 break;
1555 }
1556 }
1557 }
1558
1559 let mut selected_urls = std::collections::HashSet::new();
1561 for _ in 0..20 {
1562 let url = selector.select_url().unwrap();
1563 selected_urls.insert(url.to_string());
1564 assert_ne!(url, "https://test-rr-fallback1.example.com");
1566 }
1567
1568 assert!(selected_urls.contains("https://test-rr-fallback2.example.com"));
1570 assert!(selected_urls.contains("https://test-rr-fallback3.example.com"));
1571 assert_eq!(selected_urls.len(), 2);
1572 }
1573
1574 #[test]
1575 #[serial]
1576 fn test_select_url_excludes_tried_providers() {
1577 RpcHealthStore::instance().clear_all();
1578 let configs = vec![
1579 RpcConfig {
1580 url: "https://provider1.com".to_string(),
1581 weight: 1,
1582 ..Default::default()
1583 },
1584 RpcConfig {
1585 url: "https://provider2.com".to_string(),
1586 weight: 1,
1587 ..Default::default()
1588 },
1589 RpcConfig {
1590 url: "https://provider3.com".to_string(),
1591 weight: 1,
1592 ..Default::default()
1593 },
1594 ];
1595
1596 let selector = RpcSelector::new_with_defaults(configs).unwrap();
1597
1598 let mut excluded = std::collections::HashSet::new();
1600 excluded.insert("https://provider1.com".to_string());
1601
1602 for _ in 0..10 {
1604 let url = selector.get_next_url(&excluded).unwrap();
1605 assert_ne!(url, "https://provider1.com");
1606 }
1607 }
1608
1609 #[test]
1610 #[serial]
1611 fn test_select_url_fallback_to_paused_providers() {
1612 RpcHealthStore::instance().clear_all();
1613 let configs = vec![
1614 RpcConfig {
1615 url: "https://provider1.com".to_string(),
1616 weight: 1,
1617 ..Default::default()
1618 },
1619 RpcConfig {
1620 url: "https://provider2.com".to_string(),
1621 weight: 1,
1622 ..Default::default()
1623 },
1624 ];
1625
1626 let selector = RpcSelector::new_with_defaults(configs).unwrap();
1627 let health_store = RpcHealthStore::instance();
1628 let expiration = chrono::Duration::seconds(60);
1629
1630 health_store.mark_failed(
1632 "https://provider1.com",
1633 3,
1634 chrono::Duration::seconds(60),
1635 expiration,
1636 );
1637 health_store.mark_failed(
1638 "https://provider1.com",
1639 3,
1640 chrono::Duration::seconds(60),
1641 expiration,
1642 );
1643 health_store.mark_failed(
1644 "https://provider1.com",
1645 3,
1646 chrono::Duration::seconds(60),
1647 expiration,
1648 );
1649
1650 health_store.mark_failed(
1651 "https://provider2.com",
1652 3,
1653 chrono::Duration::seconds(60),
1654 expiration,
1655 );
1656 health_store.mark_failed(
1657 "https://provider2.com",
1658 3,
1659 chrono::Duration::seconds(60),
1660 expiration,
1661 );
1662 health_store.mark_failed(
1663 "https://provider2.com",
1664 3,
1665 chrono::Duration::seconds(60),
1666 expiration,
1667 );
1668
1669 assert!(health_store.is_paused("https://provider1.com", 3, expiration));
1671 assert!(health_store.is_paused("https://provider2.com", 3, expiration));
1672
1673 let url = selector
1675 .get_next_url(&std::collections::HashSet::new())
1676 .unwrap();
1677 assert!(url == "https://provider1.com" || url == "https://provider2.com");
1678 }
1679
1680 #[test]
1681 #[serial]
1682 fn test_select_url_single_provider_excluded() {
1683 RpcHealthStore::instance().clear_all();
1684 let configs = vec![RpcConfig {
1685 url: "https://single-provider.com".to_string(),
1686 weight: 1,
1687 ..Default::default()
1688 }];
1689
1690 let selector = RpcSelector::new_with_defaults(configs).unwrap();
1691
1692 let mut excluded = std::collections::HashSet::new();
1694 excluded.insert("https://single-provider.com".to_string());
1695
1696 let result = selector.get_next_url(&excluded);
1698 assert!(result.is_err());
1699 assert!(matches!(
1700 result.unwrap_err(),
1701 RpcSelectorError::AllProvidersFailed
1702 ));
1703 }
1704
1705 #[test]
1706 #[serial]
1707 fn test_select_url_all_providers_excluded() {
1708 RpcHealthStore::instance().clear_all();
1709 let configs = vec![
1710 RpcConfig {
1711 url: "https://provider1.com".to_string(),
1712 weight: 1,
1713 ..Default::default()
1714 },
1715 RpcConfig {
1716 url: "https://provider2.com".to_string(),
1717 weight: 1,
1718 ..Default::default()
1719 },
1720 ];
1721
1722 let selector = RpcSelector::new_with_defaults(configs).unwrap();
1723
1724 let mut excluded = std::collections::HashSet::new();
1726 excluded.insert("https://provider1.com".to_string());
1727 excluded.insert("https://provider2.com".to_string());
1728
1729 let result = selector.get_next_url(&excluded);
1731 assert!(result.is_err());
1732 assert!(matches!(
1733 result.unwrap_err(),
1734 RpcSelectorError::AllProvidersFailed
1735 ));
1736 }
1737
1738 #[test]
1739 #[serial]
1740 fn test_select_url_excluded_providers_with_weighted_selection() {
1741 RpcHealthStore::instance().clear_all();
1742 let configs = vec![
1743 RpcConfig {
1744 url: "https://provider1.com".to_string(),
1745 weight: 10,
1746 ..Default::default()
1747 },
1748 RpcConfig {
1749 url: "https://provider2.com".to_string(),
1750 weight: 1,
1751 ..Default::default()
1752 },
1753 RpcConfig {
1754 url: "https://provider3.com".to_string(),
1755 weight: 1,
1756 ..Default::default()
1757 },
1758 ];
1759
1760 let selector = RpcSelector::new_with_defaults(configs).unwrap();
1761
1762 let mut excluded = std::collections::HashSet::new();
1764 excluded.insert("https://provider1.com".to_string());
1765
1766 for _ in 0..20 {
1768 let url = selector.get_next_url(&excluded).unwrap();
1769 assert_ne!(url, "https://provider1.com");
1770 }
1771 }
1772 }
1773}