1use std::sync::Arc;
7use std::time::{Duration, Instant};
8
9use deadpool_redis::Pool;
10use tokio::sync::RwLock;
11
12use crate::jobs::{JobProducerTrait, Queue};
13use crate::models::health::{
14 ComponentStatus, Components, PluginHealth, PoolStatus, QueueHealth, QueueHealthStatus,
15 ReadinessResponse, RedisHealth, RedisHealthStatus, SystemHealth,
16};
17use crate::models::{
18 NetworkRepoModel, NotificationRepoModel, RelayerRepoModel, SignerRepoModel, ThinDataAppState,
19 TransactionRepoModel,
20};
21use crate::repositories::{
22 ApiKeyRepositoryTrait, NetworkRepository, PluginRepositoryTrait, RelayerRepository, Repository,
23 TransactionCounterTrait, TransactionRepository,
24};
25use crate::services::plugins::get_pool_manager;
26use crate::utils::RedisConnections;
27
28const PING_TIMEOUT: Duration = Duration::from_millis(3000);
34
35const WARNING_FD_RATIO: f64 = 0.7;
37
38const MAX_FD_RATIO: f64 = 0.8;
40
41const WARNING_CLOSE_WAIT: usize = 200;
44
45const MAX_CLOSE_WAIT: usize = 500;
48
49const HEALTH_CACHE_TTL: Duration = Duration::from_secs(10);
51
52struct CachedHealth {
58 response: ReadinessResponse,
59 checked_at: Instant,
60}
61
62static HEALTH_CACHE: std::sync::OnceLock<RwLock<Option<CachedHealth>>> = std::sync::OnceLock::new();
64
65fn get_cache() -> &'static RwLock<Option<CachedHealth>> {
66 HEALTH_CACHE.get_or_init(|| RwLock::new(None))
67}
68
69async fn get_cached_response() -> Option<ReadinessResponse> {
71 let cache = get_cache().read().await;
72 if let Some(ref cached) = *cache {
73 if cached.checked_at.elapsed() < HEALTH_CACHE_TTL {
74 return Some(cached.response.clone());
75 }
76 }
77 None
78}
79
80async fn cache_response(response: &ReadinessResponse) {
82 let mut cache = get_cache().write().await;
83 *cache = Some(CachedHealth {
84 response: response.clone(),
85 checked_at: Instant::now(),
86 });
87}
88
89#[cfg(test)]
91pub async fn clear_cache() {
92 let mut cache = get_cache().write().await;
93 *cache = None;
94}
95
96async fn ping_pool(pool: &Arc<Pool>, name: &str) -> PoolStatus {
102 let status = pool.status();
103
104 let result = tokio::time::timeout(PING_TIMEOUT, async {
105 let mut conn = pool.get().await?;
106 redis::cmd("PING")
107 .query_async::<String>(&mut conn)
108 .await
109 .map_err(deadpool_redis::PoolError::Backend)
110 })
111 .await;
112
113 match result {
114 Ok(Ok(_)) => PoolStatus {
115 connected: true,
116 available: status.available,
117 max_size: status.max_size,
118 error: None,
119 },
120 Ok(Err(e)) => {
121 tracing::warn!(pool = %name, error = %e, "Redis pool PING failed");
122 PoolStatus {
123 connected: false,
124 available: status.available,
125 max_size: status.max_size,
126 error: Some(e.to_string()),
127 }
128 }
129 Err(_) => {
130 tracing::warn!(pool = %name, "Redis pool PING timed out");
131 PoolStatus {
132 connected: false,
133 available: status.available,
134 max_size: status.max_size,
135 error: Some("PING timed out".to_string()),
136 }
137 }
138 }
139}
140
141async fn check_redis_health(connections: &Arc<RedisConnections>) -> RedisHealthStatus {
146 let (primary_status, reader_status) = tokio::join!(
147 ping_pool(connections.primary(), "primary"),
148 ping_pool(connections.reader(), "reader")
149 );
150
151 let healthy = primary_status.connected;
153
154 let error = if !primary_status.connected {
155 Some(format!(
156 "Redis primary pool: {}",
157 primary_status
158 .error
159 .as_deref()
160 .unwrap_or("connection failed")
161 ))
162 } else {
163 None
164 };
165
166 RedisHealthStatus {
167 healthy,
168 primary_pool: primary_status,
169 reader_pool: reader_status,
170 error,
171 }
172}
173
174fn redis_status_to_health(status: RedisHealthStatus) -> RedisHealth {
176 let component_status = if status.healthy {
177 if status.reader_pool.connected {
178 ComponentStatus::Healthy
179 } else {
180 ComponentStatus::Degraded }
182 } else {
183 ComponentStatus::Unhealthy
184 };
185
186 RedisHealth {
187 status: component_status,
188 primary_pool: status.primary_pool,
189 reader_pool: status.reader_pool,
190 error: status.error,
191 }
192}
193
194async fn check_queue_health(queue: &Queue) -> QueueHealthStatus {
202 let mut conn = queue.relayer_health_check_queue.get_connection().clone();
203
204 let result = tokio::time::timeout(PING_TIMEOUT, async {
205 redis::cmd("PING").query_async::<String>(&mut conn).await
206 })
207 .await;
208
209 match result {
212 Ok(Ok(_)) => QueueHealthStatus {
213 healthy: true,
214 error: None,
215 },
216 Ok(Err(e)) => {
217 tracing::warn!(error = %e, "Queue PING check failed");
219 QueueHealthStatus {
220 healthy: false,
221 error: Some(format!("Queue connection: {e}")),
222 }
223 }
224 Err(_) => {
225 tracing::warn!("Queue PING check timed out");
227 QueueHealthStatus {
228 healthy: false,
229 error: Some("Queue connection: PING check timed out".to_string()),
230 }
231 }
232 }
233}
234
235fn queue_status_to_health(status: QueueHealthStatus) -> QueueHealth {
237 QueueHealth {
238 status: if status.healthy {
239 ComponentStatus::Healthy
240 } else {
241 ComponentStatus::Unhealthy
242 },
243 error: status.error,
244 }
245}
246
247fn create_unavailable_health() -> (RedisHealth, QueueHealth) {
249 let error_msg = "Queue unavailable - cannot check Redis or Queue health";
250 let unhealthy_pool = PoolStatus {
251 connected: false,
252 available: 0,
253 max_size: 0,
254 error: Some(error_msg.to_string()),
255 };
256
257 let redis = RedisHealth {
258 status: ComponentStatus::Unhealthy,
259 primary_pool: unhealthy_pool.clone(),
260 reader_pool: unhealthy_pool,
261 error: Some(error_msg.to_string()),
262 };
263
264 let queue = QueueHealth {
265 status: ComponentStatus::Unhealthy,
266 error: Some("Failed to get queue from job producer".to_string()),
267 };
268
269 (redis, queue)
270}
271
272fn get_fd_count() -> Result<usize, std::io::Error> {
278 let pid = std::process::id();
279
280 #[cfg(target_os = "linux")]
281 {
282 let fd_dir = format!("/proc/{pid}/fd");
283 std::fs::read_dir(fd_dir).map(|entries| entries.count())
284 }
285
286 #[cfg(target_os = "macos")]
287 {
288 use std::process::Command;
289 let output = Command::new("lsof")
290 .args(["-p", &pid.to_string()])
291 .output()?;
292 let count = String::from_utf8_lossy(&output.stdout)
293 .lines()
294 .count()
295 .saturating_sub(1); Ok(count)
297 }
298
299 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
300 {
301 Ok(0) }
303}
304
305fn get_fd_limit() -> Result<usize, std::io::Error> {
307 #[cfg(any(target_os = "linux", target_os = "macos"))]
308 {
309 use std::process::Command;
310 let output = Command::new("sh").args(["-c", "ulimit -n"]).output()?;
311 let limit = String::from_utf8_lossy(&output.stdout)
312 .trim()
313 .parse()
314 .unwrap_or(1024);
315 Ok(limit)
316 }
317
318 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
319 {
320 Ok(1024) }
322}
323
324fn get_close_wait_count() -> Result<usize, std::io::Error> {
326 #[cfg(any(target_os = "linux", target_os = "macos"))]
327 {
328 use std::process::Command;
329 let output = Command::new("sh")
330 .args(["-c", "netstat -an | grep CLOSE_WAIT | wc -l"])
331 .output()?;
332 let count = String::from_utf8_lossy(&output.stdout)
333 .trim()
334 .parse()
335 .unwrap_or(0);
336 Ok(count)
337 }
338
339 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
340 {
341 Ok(0) }
343}
344
345fn evaluate_system_metrics(
349 fd_ratio: f64,
350 fd_count: usize,
351 fd_limit: usize,
352 close_wait_count: usize,
353) -> (ComponentStatus, Option<String>) {
354 let mut errors: Vec<String> = Vec::new();
355 let mut is_degraded = false;
356
357 if fd_ratio > MAX_FD_RATIO {
359 let fd_percent = fd_ratio * 100.0;
360 let max_percent = MAX_FD_RATIO * 100.0;
361 errors.push(format!(
362 "File descriptor limit critical: {fd_count}/{fd_limit} ({fd_percent:.1}% > {max_percent:.1}%)"
363 ));
364 } else if fd_ratio > WARNING_FD_RATIO {
365 is_degraded = true;
366 }
367
368 if close_wait_count > MAX_CLOSE_WAIT {
370 errors.push(format!(
371 "Too many CLOSE_WAIT sockets: {close_wait_count} > {MAX_CLOSE_WAIT}"
372 ));
373 } else if close_wait_count > WARNING_CLOSE_WAIT {
374 is_degraded = true;
375 }
376
377 let status = if !errors.is_empty() {
378 ComponentStatus::Unhealthy
379 } else if is_degraded {
380 ComponentStatus::Degraded
381 } else {
382 ComponentStatus::Healthy
383 };
384
385 let error = if errors.is_empty() {
386 None
387 } else {
388 Some(errors.join("; "))
389 };
390
391 (status, error)
392}
393
394pub fn check_system_health() -> SystemHealth {
401 let fd_count = get_fd_count().unwrap_or(0);
402 let fd_limit = get_fd_limit().unwrap_or(1024);
403 let close_wait_count = get_close_wait_count().unwrap_or(0);
404
405 let fd_ratio = if fd_limit > 0 {
406 fd_count as f64 / fd_limit as f64
407 } else {
408 0.0
409 };
410 let fd_usage_percent = (fd_ratio * 100.0) as u32;
411
412 let (status, error) = evaluate_system_metrics(fd_ratio, fd_count, fd_limit, close_wait_count);
413
414 SystemHealth {
415 status,
416 fd_count,
417 fd_limit,
418 fd_usage_percent,
419 close_wait_count,
420 error,
421 }
422}
423
424fn determine_plugin_status(healthy: bool, circuit_state: Option<&str>) -> ComponentStatus {
430 if healthy {
431 match circuit_state {
432 Some("closed") | None => ComponentStatus::Healthy,
433 Some("half_open") | Some("open") => ComponentStatus::Degraded,
434 _ => ComponentStatus::Healthy,
435 }
436 } else {
437 ComponentStatus::Degraded
438 }
439}
440
441pub async fn check_plugin_health() -> Option<PluginHealth> {
443 let pool_manager = get_pool_manager();
444
445 if !pool_manager.is_initialized().await {
446 return None;
447 }
448
449 match pool_manager.health_check().await {
450 Ok(plugin_status) => {
451 let status = determine_plugin_status(
452 plugin_status.healthy,
453 plugin_status.circuit_state.as_deref(),
454 );
455
456 Some(PluginHealth {
457 status,
458 enabled: true,
459 circuit_state: plugin_status.circuit_state,
460 error: if plugin_status.healthy {
461 None
462 } else {
463 Some(plugin_status.status)
464 },
465 uptime_ms: plugin_status.uptime_ms,
466 memory: plugin_status.memory,
467 pool_completed: plugin_status.pool_completed,
468 pool_queued: plugin_status.pool_queued,
469 success_rate: plugin_status.success_rate,
470 avg_response_time_ms: plugin_status.avg_response_time_ms,
471 recovering: plugin_status.recovering,
472 recovery_percent: plugin_status.recovery_percent,
473 shared_socket_available_slots: plugin_status.shared_socket_available_slots,
474 shared_socket_active_connections: plugin_status.shared_socket_active_connections,
475 shared_socket_registered_executions: plugin_status
476 .shared_socket_registered_executions,
477 connection_pool_available_slots: plugin_status.connection_pool_available_slots,
478 connection_pool_active_connections: plugin_status
479 .connection_pool_active_connections,
480 })
481 }
482 Err(e) => Some(PluginHealth {
483 status: ComponentStatus::Degraded,
484 enabled: true,
485 circuit_state: None,
486 error: Some(e.to_string()),
487 uptime_ms: None,
488 memory: None,
489 pool_completed: None,
490 pool_queued: None,
491 success_rate: None,
492 avg_response_time_ms: None,
493 recovering: None,
494 recovery_percent: None,
495 shared_socket_available_slots: None,
496 shared_socket_active_connections: None,
497 shared_socket_registered_executions: None,
498 connection_pool_available_slots: None,
499 connection_pool_active_connections: None,
500 }),
501 }
502}
503
504fn aggregate_health(
513 system: &SystemHealth,
514 redis: &RedisHealth,
515 queue: &QueueHealth,
516 plugins: &Option<PluginHealth>,
517) -> (ComponentStatus, Option<String>) {
518 let mut reasons: Vec<String> = Vec::new();
519 let mut overall_status = ComponentStatus::Healthy;
520
521 if system.status == ComponentStatus::Unhealthy {
523 overall_status = ComponentStatus::Unhealthy;
524 if let Some(ref err) = system.error {
525 reasons.push(err.clone());
526 }
527 } else if system.status == ComponentStatus::Degraded
528 && overall_status == ComponentStatus::Healthy
529 {
530 overall_status = ComponentStatus::Degraded;
531 }
532
533 if redis.status == ComponentStatus::Unhealthy {
535 overall_status = ComponentStatus::Unhealthy;
536 if let Some(ref err) = redis.error {
537 reasons.push(err.clone());
538 }
539 } else if redis.status == ComponentStatus::Degraded
540 && overall_status == ComponentStatus::Healthy
541 {
542 overall_status = ComponentStatus::Degraded;
543 }
544
545 if queue.status == ComponentStatus::Unhealthy {
547 overall_status = ComponentStatus::Unhealthy;
548 if let Some(ref err) = queue.error {
549 reasons.push(err.clone());
550 }
551 } else if queue.status == ComponentStatus::Degraded
552 && overall_status == ComponentStatus::Healthy
553 {
554 overall_status = ComponentStatus::Degraded;
555 }
556
557 if let Some(ref plugin_health) = plugins {
559 if plugin_health.status == ComponentStatus::Degraded
560 && overall_status == ComponentStatus::Healthy
561 {
562 overall_status = ComponentStatus::Degraded;
563 }
564 }
565
566 let reason = if reasons.is_empty() {
567 None
568 } else {
569 Some(reasons.join("; "))
570 };
571
572 (overall_status, reason)
573}
574
575fn build_response(
577 system: SystemHealth,
578 redis: RedisHealth,
579 queue: QueueHealth,
580 plugins: Option<PluginHealth>,
581) -> ReadinessResponse {
582 let (overall_status, reason) = aggregate_health(&system, &redis, &queue, &plugins);
583 let ready = overall_status != ComponentStatus::Unhealthy;
584
585 ReadinessResponse {
586 ready,
587 status: overall_status,
588 reason,
589 components: Components {
590 system,
591 redis,
592 queue,
593 plugins,
594 },
595 timestamp: chrono::Utc::now().to_rfc3339(),
596 }
597}
598
599pub async fn get_readiness<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
610 data: ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
611) -> ReadinessResponse
612where
613 J: JobProducerTrait + Send + Sync + 'static,
614 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
615 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
616 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
617 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
618 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
619 TCR: TransactionCounterTrait + Send + Sync + 'static,
620 PR: PluginRepositoryTrait + Send + Sync + 'static,
621 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
622{
623 if let Some(cached) = get_cached_response().await {
625 return cached;
626 }
627
628 let queue = match data.job_producer.get_queue().await {
630 Ok(queue) => Some(queue),
631 Err(e) => {
632 tracing::warn!(error = %e, "Failed to get queue from job producer");
633 None
634 }
635 };
636
637 let system = check_system_health();
639 let plugins = check_plugin_health().await;
640
641 let (redis, queue_health) = if let Some(ref q) = queue {
642 let redis_connections = q.redis_connections();
643 let redis_status = check_redis_health(&redis_connections).await;
644 let queue_status = check_queue_health(q).await;
645
646 (
647 redis_status_to_health(redis_status),
648 queue_status_to_health(queue_status),
649 )
650 } else {
651 create_unavailable_health()
652 };
653
654 let response = build_response(system, redis, queue_health, plugins);
656
657 if queue.is_some() {
659 cache_response(&response).await;
660 }
661
662 response
663}
664
665#[cfg(test)]
670mod tests {
671 use super::*;
672
673 #[test]
678 fn test_constants() {
679 assert_eq!(PING_TIMEOUT, Duration::from_millis(3000));
680 assert_eq!(WARNING_FD_RATIO, 0.7);
681 assert_eq!(MAX_FD_RATIO, 0.8);
682 assert_eq!(WARNING_CLOSE_WAIT, 200);
683 assert_eq!(MAX_CLOSE_WAIT, 500);
684 assert_eq!(HEALTH_CACHE_TTL, Duration::from_secs(10));
685 }
686
687 #[test]
692 fn test_evaluate_system_metrics_healthy() {
693 let (status, error) = evaluate_system_metrics(0.5, 500, 1000, 20);
695 assert_eq!(status, ComponentStatus::Healthy);
696 assert!(error.is_none());
697 }
698
699 #[test]
700 fn test_evaluate_system_metrics_degraded_fd() {
701 let (status, error) = evaluate_system_metrics(0.75, 750, 1000, 20);
703 assert_eq!(status, ComponentStatus::Degraded);
704 assert!(error.is_none());
705 }
706
707 #[test]
708 fn test_evaluate_system_metrics_degraded_close_wait() {
709 let (status, error) = evaluate_system_metrics(0.5, 500, 1000, 300);
711 assert_eq!(status, ComponentStatus::Degraded);
712 assert!(error.is_none());
713 }
714
715 #[test]
716 fn test_evaluate_system_metrics_unhealthy_fd() {
717 let (status, error) = evaluate_system_metrics(0.85, 850, 1000, 20);
719 assert_eq!(status, ComponentStatus::Unhealthy);
720 assert!(error.is_some());
721 assert!(error.unwrap().contains("File descriptor limit critical"));
722 }
723
724 #[test]
725 fn test_evaluate_system_metrics_unhealthy_close_wait() {
726 let (status, error) = evaluate_system_metrics(0.5, 500, 1000, 600);
728 assert_eq!(status, ComponentStatus::Unhealthy);
729 assert!(error.is_some());
730 assert!(error.unwrap().contains("CLOSE_WAIT"));
731 }
732
733 #[test]
734 fn test_evaluate_system_metrics_both_unhealthy() {
735 let (status, error) = evaluate_system_metrics(0.9, 900, 1000, 600);
737 assert_eq!(status, ComponentStatus::Unhealthy);
738 assert!(error.is_some());
739 let err = error.unwrap();
740 assert!(err.contains("File descriptor"));
741 assert!(err.contains("CLOSE_WAIT"));
742 }
743
744 #[test]
745 fn test_evaluate_system_metrics_at_warning_threshold() {
746 let (status, _) = evaluate_system_metrics(0.7, 700, 1000, 200);
748 assert_eq!(status, ComponentStatus::Healthy);
750 }
751
752 #[test]
753 fn test_evaluate_system_metrics_just_above_warning() {
754 let (status, _) = evaluate_system_metrics(0.71, 710, 1000, 201);
756 assert_eq!(status, ComponentStatus::Degraded);
757 }
758
759 #[test]
760 fn test_evaluate_system_metrics_at_max_threshold() {
761 let (status, _) = evaluate_system_metrics(0.8, 800, 1000, 500);
763 assert_eq!(status, ComponentStatus::Degraded);
765 }
766
767 #[test]
768 fn test_evaluate_system_metrics_zero_limit() {
769 let (status, _) = evaluate_system_metrics(0.0, 0, 0, 0);
771 assert_eq!(status, ComponentStatus::Healthy);
772 }
773
774 #[test]
779 fn test_determine_plugin_status_healthy_closed() {
780 assert_eq!(
781 determine_plugin_status(true, Some("closed")),
782 ComponentStatus::Healthy
783 );
784 }
785
786 #[test]
787 fn test_determine_plugin_status_healthy_none() {
788 assert_eq!(
789 determine_plugin_status(true, None),
790 ComponentStatus::Healthy
791 );
792 }
793
794 #[test]
795 fn test_determine_plugin_status_healthy_half_open() {
796 assert_eq!(
797 determine_plugin_status(true, Some("half_open")),
798 ComponentStatus::Degraded
799 );
800 }
801
802 #[test]
803 fn test_determine_plugin_status_healthy_open() {
804 assert_eq!(
805 determine_plugin_status(true, Some("open")),
806 ComponentStatus::Degraded
807 );
808 }
809
810 #[test]
811 fn test_determine_plugin_status_unhealthy() {
812 assert_eq!(
813 determine_plugin_status(false, Some("closed")),
814 ComponentStatus::Degraded
815 );
816 }
817
818 #[test]
819 fn test_determine_plugin_status_unknown_state() {
820 assert_eq!(
821 determine_plugin_status(true, Some("unknown")),
822 ComponentStatus::Healthy
823 );
824 }
825
826 fn create_healthy_system() -> SystemHealth {
831 SystemHealth {
832 status: ComponentStatus::Healthy,
833 fd_count: 100,
834 fd_limit: 1000,
835 fd_usage_percent: 10,
836 close_wait_count: 5,
837 error: None,
838 }
839 }
840
841 fn create_healthy_redis() -> RedisHealth {
842 RedisHealth {
843 status: ComponentStatus::Healthy,
844 primary_pool: PoolStatus {
845 connected: true,
846 available: 8,
847 max_size: 16,
848 error: None,
849 },
850 reader_pool: PoolStatus {
851 connected: true,
852 available: 8,
853 max_size: 16,
854 error: None,
855 },
856 error: None,
857 }
858 }
859
860 fn create_healthy_queue() -> QueueHealth {
861 QueueHealth {
862 status: ComponentStatus::Healthy,
863 error: None,
864 }
865 }
866
867 #[test]
868 fn test_aggregate_health_all_healthy() {
869 let system = create_healthy_system();
870 let redis = create_healthy_redis();
871 let queue = create_healthy_queue();
872
873 let (status, reason) = aggregate_health(&system, &redis, &queue, &None);
874 assert_eq!(status, ComponentStatus::Healthy);
875 assert!(reason.is_none());
876 }
877
878 #[test]
879 fn test_aggregate_health_system_degraded() {
880 let mut system = create_healthy_system();
881 system.status = ComponentStatus::Degraded;
882 let redis = create_healthy_redis();
883 let queue = create_healthy_queue();
884
885 let (status, reason) = aggregate_health(&system, &redis, &queue, &None);
886 assert_eq!(status, ComponentStatus::Degraded);
887 assert!(reason.is_none());
888 }
889
890 #[test]
891 fn test_aggregate_health_system_unhealthy() {
892 let mut system = create_healthy_system();
893 system.status = ComponentStatus::Unhealthy;
894 system.error = Some("FD limit exceeded".to_string());
895 let redis = create_healthy_redis();
896 let queue = create_healthy_queue();
897
898 let (status, reason) = aggregate_health(&system, &redis, &queue, &None);
899 assert_eq!(status, ComponentStatus::Unhealthy);
900 assert!(reason.is_some());
901 assert!(reason.unwrap().contains("FD limit"));
902 }
903
904 #[test]
905 fn test_aggregate_health_redis_unhealthy() {
906 let system = create_healthy_system();
907 let mut redis = create_healthy_redis();
908 redis.status = ComponentStatus::Unhealthy;
909 redis.error = Some("Primary pool down".to_string());
910 let queue = create_healthy_queue();
911
912 let (status, reason) = aggregate_health(&system, &redis, &queue, &None);
913 assert_eq!(status, ComponentStatus::Unhealthy);
914 assert!(reason.unwrap().contains("Primary pool"));
915 }
916
917 #[test]
918 fn test_aggregate_health_queue_unhealthy() {
919 let system = create_healthy_system();
920 let redis = create_healthy_redis();
921 let mut queue = create_healthy_queue();
922 queue.status = ComponentStatus::Unhealthy;
923 queue.error = Some("Queue timeout".to_string());
924
925 let (status, reason) = aggregate_health(&system, &redis, &queue, &None);
926 assert_eq!(status, ComponentStatus::Unhealthy);
927 assert!(reason.unwrap().contains("Queue timeout"));
928 }
929
930 #[test]
931 fn test_aggregate_health_multiple_unhealthy() {
932 let mut system = create_healthy_system();
933 system.status = ComponentStatus::Unhealthy;
934 system.error = Some("FD limit".to_string());
935
936 let mut redis = create_healthy_redis();
937 redis.status = ComponentStatus::Unhealthy;
938 redis.error = Some("Redis down".to_string());
939
940 let queue = create_healthy_queue();
941
942 let (status, reason) = aggregate_health(&system, &redis, &queue, &None);
943 assert_eq!(status, ComponentStatus::Unhealthy);
944 let reason_str = reason.unwrap();
945 assert!(reason_str.contains("FD limit"));
946 assert!(reason_str.contains("Redis down"));
947 }
948
949 #[test]
950 fn test_aggregate_health_plugin_degraded_only() {
951 let system = create_healthy_system();
952 let redis = create_healthy_redis();
953 let queue = create_healthy_queue();
954 let plugins = Some(PluginHealth {
955 status: ComponentStatus::Degraded,
956 enabled: true,
957 circuit_state: Some("open".to_string()),
958 error: Some("Circuit open".to_string()),
959 uptime_ms: None,
960 memory: None,
961 pool_completed: None,
962 pool_queued: None,
963 success_rate: None,
964 avg_response_time_ms: None,
965 recovering: None,
966 recovery_percent: None,
967 shared_socket_available_slots: None,
968 shared_socket_active_connections: None,
969 shared_socket_registered_executions: None,
970 connection_pool_available_slots: None,
971 connection_pool_active_connections: None,
972 });
973
974 let (status, reason) = aggregate_health(&system, &redis, &queue, &plugins);
975 assert_eq!(status, ComponentStatus::Degraded);
977 assert!(reason.is_none());
978 }
979
980 #[test]
981 fn test_aggregate_health_unhealthy_overrides_degraded() {
982 let mut system = create_healthy_system();
983 system.status = ComponentStatus::Degraded;
984
985 let mut redis = create_healthy_redis();
986 redis.status = ComponentStatus::Unhealthy;
987 redis.error = Some("Redis down".to_string());
988
989 let queue = create_healthy_queue();
990
991 let (status, _) = aggregate_health(&system, &redis, &queue, &None);
992 assert_eq!(status, ComponentStatus::Unhealthy);
993 }
994
995 #[test]
1000 fn test_build_response_ready() {
1001 let response = build_response(
1002 create_healthy_system(),
1003 create_healthy_redis(),
1004 create_healthy_queue(),
1005 None,
1006 );
1007
1008 assert!(response.ready);
1009 assert_eq!(response.status, ComponentStatus::Healthy);
1010 assert!(response.reason.is_none());
1011 assert!(!response.timestamp.is_empty());
1012 }
1013
1014 #[test]
1015 fn test_build_response_not_ready() {
1016 let mut system = create_healthy_system();
1017 system.status = ComponentStatus::Unhealthy;
1018 system.error = Some("Critical error".to_string());
1019
1020 let response = build_response(system, create_healthy_redis(), create_healthy_queue(), None);
1021
1022 assert!(!response.ready);
1023 assert_eq!(response.status, ComponentStatus::Unhealthy);
1024 assert!(response.reason.is_some());
1025 }
1026
1027 #[test]
1028 fn test_build_response_degraded_is_ready() {
1029 let mut system = create_healthy_system();
1030 system.status = ComponentStatus::Degraded;
1031
1032 let response = build_response(system, create_healthy_redis(), create_healthy_queue(), None);
1033
1034 assert!(response.ready);
1036 assert_eq!(response.status, ComponentStatus::Degraded);
1037 }
1038
1039 #[test]
1044 fn test_redis_status_to_health_healthy() {
1045 let status = RedisHealthStatus {
1046 healthy: true,
1047 primary_pool: PoolStatus {
1048 connected: true,
1049 available: 8,
1050 max_size: 16,
1051 error: None,
1052 },
1053 reader_pool: PoolStatus {
1054 connected: true,
1055 available: 8,
1056 max_size: 16,
1057 error: None,
1058 },
1059 error: None,
1060 };
1061
1062 let health = redis_status_to_health(status);
1063 assert_eq!(health.status, ComponentStatus::Healthy);
1064 }
1065
1066 #[test]
1067 fn test_redis_status_to_health_degraded() {
1068 let status = RedisHealthStatus {
1069 healthy: true,
1070 primary_pool: PoolStatus {
1071 connected: true,
1072 available: 8,
1073 max_size: 16,
1074 error: None,
1075 },
1076 reader_pool: PoolStatus {
1077 connected: false, available: 0,
1079 max_size: 16,
1080 error: Some("Connection refused".to_string()),
1081 },
1082 error: None,
1083 };
1084
1085 let health = redis_status_to_health(status);
1086 assert_eq!(health.status, ComponentStatus::Degraded);
1087 }
1088
1089 #[test]
1090 fn test_redis_status_to_health_unhealthy() {
1091 let status = RedisHealthStatus {
1092 healthy: false,
1093 primary_pool: PoolStatus {
1094 connected: false,
1095 available: 0,
1096 max_size: 16,
1097 error: Some("PING timeout".to_string()),
1098 },
1099 reader_pool: PoolStatus {
1100 connected: false,
1101 available: 0,
1102 max_size: 16,
1103 error: Some("PING timeout".to_string()),
1104 },
1105 error: Some("Primary pool failed".to_string()),
1106 };
1107
1108 let health = redis_status_to_health(status);
1109 assert_eq!(health.status, ComponentStatus::Unhealthy);
1110 }
1111
1112 #[test]
1113 fn test_queue_status_to_health_healthy() {
1114 let status = QueueHealthStatus {
1115 healthy: true,
1116 error: None,
1117 };
1118
1119 let health = queue_status_to_health(status);
1120 assert_eq!(health.status, ComponentStatus::Healthy);
1121 assert!(health.error.is_none());
1122 }
1123
1124 #[test]
1125 fn test_queue_status_to_health_unhealthy() {
1126 let status = QueueHealthStatus {
1127 healthy: false,
1128 error: Some("Stats timeout".to_string()),
1129 };
1130
1131 let health = queue_status_to_health(status);
1132 assert_eq!(health.status, ComponentStatus::Unhealthy);
1133 assert!(health.error.is_some());
1134 }
1135
1136 #[test]
1137 fn test_create_unavailable_health() {
1138 let (redis, queue) = create_unavailable_health();
1139
1140 assert_eq!(redis.status, ComponentStatus::Unhealthy);
1141 assert!(!redis.primary_pool.connected);
1142 assert!(!redis.reader_pool.connected);
1143 assert!(redis.error.is_some());
1144
1145 assert_eq!(queue.status, ComponentStatus::Unhealthy);
1146 assert!(queue.error.is_some());
1147 }
1148
1149 #[test]
1154 fn test_pool_status_serialization_without_error() {
1155 let status = PoolStatus {
1156 connected: true,
1157 available: 8,
1158 max_size: 16,
1159 error: None,
1160 };
1161
1162 let json = serde_json::to_string(&status).unwrap();
1163 assert!(json.contains("\"connected\":true"));
1164 assert!(json.contains("\"available\":8"));
1165 assert!(json.contains("\"max_size\":16"));
1166 assert!(!json.contains("error"));
1168 }
1169
1170 #[test]
1171 fn test_pool_status_serialization_with_error() {
1172 let status = PoolStatus {
1173 connected: false,
1174 available: 0,
1175 max_size: 16,
1176 error: Some("Connection refused".to_string()),
1177 };
1178
1179 let json = serde_json::to_string(&status).unwrap();
1180 assert!(json.contains("\"connected\":false"));
1181 assert!(json.contains("\"error\":\"Connection refused\""));
1182 }
1183
1184 #[actix_web::test]
1189 async fn test_check_system_health_returns_valid_data() {
1190 let health = check_system_health();
1191
1192 assert!(health.fd_limit > 0);
1194
1195 assert!(health.fd_usage_percent <= 100);
1197
1198 assert!(matches!(
1201 health.status,
1202 ComponentStatus::Healthy | ComponentStatus::Degraded | ComponentStatus::Unhealthy
1203 ));
1204 }
1205
1206 #[derive(Debug)]
1212 struct TimeoutError;
1213
1214 fn evaluate_nested_result<T, E: std::fmt::Display>(
1218 result: Result<Result<T, E>, TimeoutError>,
1219 ) -> (bool, Option<String>) {
1220 match result {
1221 Ok(Ok(_)) => (true, None),
1222 Ok(Err(e)) => (false, Some(format!("Operation failed: {e}"))),
1223 Err(_) => (false, Some("Operation timed out".to_string())),
1224 }
1225 }
1226
1227 #[test]
1228 fn test_nested_result_success() {
1229 let result: Result<Result<(), &str>, TimeoutError> = Ok(Ok(()));
1231 let (healthy, error) = evaluate_nested_result(result);
1232
1233 assert!(healthy);
1234 assert!(error.is_none());
1235 }
1236
1237 #[test]
1238 fn test_nested_result_inner_error() {
1239 let result: Result<Result<(), &str>, TimeoutError> = Ok(Err("connection refused"));
1242 let (healthy, error) = evaluate_nested_result(result);
1243
1244 assert!(!healthy, "Inner error should mark as unhealthy");
1245 assert!(error.is_some());
1246 assert!(error.unwrap().contains("connection refused"));
1247 }
1248
1249 #[test]
1250 fn test_nested_result_timeout() {
1251 let result: Result<Result<(), &str>, TimeoutError> = Err(TimeoutError);
1253 let (healthy, error) = evaluate_nested_result(result);
1254
1255 assert!(!healthy);
1256 assert!(error.is_some());
1257 assert!(error.unwrap().contains("timed out"));
1258 }
1259
1260 #[test]
1261 fn test_nested_result_is_ok_pitfall() {
1262 let inner_error: Result<Result<(), &str>, TimeoutError> = Ok(Err("inner error"));
1264
1265 let wrong_healthy = inner_error.is_ok();
1267 assert!(
1268 wrong_healthy,
1269 "is_ok() returns true even with inner error - this is the bug!"
1270 );
1271
1272 let correct_healthy = matches!(inner_error, Ok(Ok(_)));
1274 assert!(
1275 !correct_healthy,
1276 "matches! correctly identifies inner error"
1277 );
1278 }
1279
1280 #[actix_web::test]
1285 async fn test_cache_operations() {
1286 clear_cache().await;
1288
1289 let cached = get_cached_response().await;
1291 assert!(cached.is_none());
1292
1293 let response = build_response(
1295 create_healthy_system(),
1296 create_healthy_redis(),
1297 create_healthy_queue(),
1298 None,
1299 );
1300 cache_response(&response).await;
1301
1302 let cached = get_cached_response().await;
1304 assert!(cached.is_some());
1305 assert_eq!(cached.unwrap().ready, response.ready);
1306
1307 clear_cache().await;
1309 }
1310}