openzeppelin_relayer/services/health/
mod.rs

1//! Health check service.
2//!
3//! This module contains the business logic for performing health checks,
4//! including system resource checks, Redis connectivity, queue health, and plugin status.
5
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8
9use deadpool_redis::Pool;
10use tokio::sync::RwLock;
11
12use crate::jobs::{JobProducerTrait, Queue};
13use crate::models::health::{
14    ComponentStatus, Components, PluginHealth, PoolStatus, QueueHealth, QueueHealthStatus,
15    ReadinessResponse, RedisHealth, RedisHealthStatus, SystemHealth,
16};
17use crate::models::{
18    NetworkRepoModel, NotificationRepoModel, RelayerRepoModel, SignerRepoModel, ThinDataAppState,
19    TransactionRepoModel,
20};
21use crate::repositories::{
22    ApiKeyRepositoryTrait, NetworkRepository, PluginRepositoryTrait, RelayerRepository, Repository,
23    TransactionCounterTrait, TransactionRepository,
24};
25use crate::services::plugins::get_pool_manager;
26use crate::utils::RedisConnections;
27
28// ============================================================================
29// Constants
30// ============================================================================
31
32/// Timeout for Redis PING operations during health checks.
33const PING_TIMEOUT: Duration = Duration::from_millis(3000);
34
35/// Warning file descriptor ratio (70%) - triggers Degraded status.
36const WARNING_FD_RATIO: f64 = 0.7;
37
38/// Maximum file descriptor ratio (80%) - triggers Unhealthy status.
39const MAX_FD_RATIO: f64 = 0.8;
40
41/// Warning CLOSE_WAIT socket count - triggers Degraded status.
42/// Increased from 50 to tolerate Docker/Redis networking artifacts under load.
43const WARNING_CLOSE_WAIT: usize = 200;
44
45/// Maximum CLOSE_WAIT socket count - triggers Unhealthy status.
46/// Increased from 100 to tolerate Docker/Redis networking artifacts under load.
47const MAX_CLOSE_WAIT: usize = 500;
48
49/// Cache TTL - health checks are cached for this duration.
50const HEALTH_CACHE_TTL: Duration = Duration::from_secs(10);
51
52// ============================================================================
53// Cache
54// ============================================================================
55
56/// Cached health check result with timestamp.
57struct CachedHealth {
58    response: ReadinessResponse,
59    checked_at: Instant,
60}
61
62/// Global health cache (thread-safe).
63static HEALTH_CACHE: std::sync::OnceLock<RwLock<Option<CachedHealth>>> = std::sync::OnceLock::new();
64
65fn get_cache() -> &'static RwLock<Option<CachedHealth>> {
66    HEALTH_CACHE.get_or_init(|| RwLock::new(None))
67}
68
69/// Check if cached response is still valid and return it.
70async fn get_cached_response() -> Option<ReadinessResponse> {
71    let cache = get_cache().read().await;
72    if let Some(ref cached) = *cache {
73        if cached.checked_at.elapsed() < HEALTH_CACHE_TTL {
74            return Some(cached.response.clone());
75        }
76    }
77    None
78}
79
80/// Store response in cache.
81async fn cache_response(response: &ReadinessResponse) {
82    let mut cache = get_cache().write().await;
83    *cache = Some(CachedHealth {
84        response: response.clone(),
85        checked_at: Instant::now(),
86    });
87}
88
89/// Clear the health cache (useful for testing).
90#[cfg(test)]
91pub async fn clear_cache() {
92    let mut cache = get_cache().write().await;
93    *cache = None;
94}
95
96// ============================================================================
97// Redis Health Checks
98// ============================================================================
99
100/// Ping a single Redis pool and return its status.
101async fn ping_pool(pool: &Arc<Pool>, name: &str) -> PoolStatus {
102    let status = pool.status();
103
104    let result = tokio::time::timeout(PING_TIMEOUT, async {
105        let mut conn = pool.get().await?;
106        redis::cmd("PING")
107            .query_async::<String>(&mut conn)
108            .await
109            .map_err(deadpool_redis::PoolError::Backend)
110    })
111    .await;
112
113    match result {
114        Ok(Ok(_)) => PoolStatus {
115            connected: true,
116            available: status.available,
117            max_size: status.max_size,
118            error: None,
119        },
120        Ok(Err(e)) => {
121            tracing::warn!(pool = %name, error = %e, "Redis pool PING failed");
122            PoolStatus {
123                connected: false,
124                available: status.available,
125                max_size: status.max_size,
126                error: Some(e.to_string()),
127            }
128        }
129        Err(_) => {
130            tracing::warn!(pool = %name, "Redis pool PING timed out");
131            PoolStatus {
132                connected: false,
133                available: status.available,
134                max_size: status.max_size,
135                error: Some("PING timed out".to_string()),
136            }
137        }
138    }
139}
140
141/// Check health of Redis connections (primary and reader pools).
142///
143/// PINGs both pools concurrently with a 500ms timeout.
144/// Primary pool failure = Unhealthy, reader pool failure = Degraded.
145async fn check_redis_health(connections: &Arc<RedisConnections>) -> RedisHealthStatus {
146    let (primary_status, reader_status) = tokio::join!(
147        ping_pool(connections.primary(), "primary"),
148        ping_pool(connections.reader(), "reader")
149    );
150
151    // Healthy if primary is connected (reader is optional/degraded mode)
152    let healthy = primary_status.connected;
153
154    let error = if !primary_status.connected {
155        Some(format!(
156            "Redis primary pool: {}",
157            primary_status
158                .error
159                .as_deref()
160                .unwrap_or("connection failed")
161        ))
162    } else {
163        None
164    };
165
166    RedisHealthStatus {
167        healthy,
168        primary_pool: primary_status,
169        reader_pool: reader_status,
170        error,
171    }
172}
173
174/// Convert RedisHealthStatus to RedisHealth with proper ComponentStatus.
175fn redis_status_to_health(status: RedisHealthStatus) -> RedisHealth {
176    let component_status = if status.healthy {
177        if status.reader_pool.connected {
178            ComponentStatus::Healthy
179        } else {
180            ComponentStatus::Degraded // Reader down but primary OK
181        }
182    } else {
183        ComponentStatus::Unhealthy
184    };
185
186    RedisHealth {
187        status: component_status,
188        primary_pool: status.primary_pool,
189        reader_pool: status.reader_pool,
190        error: status.error,
191    }
192}
193
194// ============================================================================
195// Queue Health Checks
196// ============================================================================
197
198/// Check health of Queue's Redis connection.
199///
200/// Sends a PING command to verify the queue's Redis connection is responsive within the timeout.
201async fn check_queue_health(queue: &Queue) -> QueueHealthStatus {
202    let mut conn = queue.relayer_health_check_queue.get_connection().clone();
203
204    let result = tokio::time::timeout(PING_TIMEOUT, async {
205        redis::cmd("PING").query_async::<String>(&mut conn).await
206    })
207    .await;
208
209    // result is Result<Result<String, RedisError>, Elapsed>
210    // Must check both: timeout didn't expire AND PING succeeded
211    match result {
212        Ok(Ok(_)) => QueueHealthStatus {
213            healthy: true,
214            error: None,
215        },
216        Ok(Err(e)) => {
217            // PING call failed (but didn't timeout)
218            tracing::warn!(error = %e, "Queue PING check failed");
219            QueueHealthStatus {
220                healthy: false,
221                error: Some(format!("Queue connection: {e}")),
222            }
223        }
224        Err(_) => {
225            // Timeout expired
226            tracing::warn!("Queue PING check timed out");
227            QueueHealthStatus {
228                healthy: false,
229                error: Some("Queue connection: PING check timed out".to_string()),
230            }
231        }
232    }
233}
234
235/// Convert QueueHealthStatus to QueueHealth.
236fn queue_status_to_health(status: QueueHealthStatus) -> QueueHealth {
237    QueueHealth {
238        status: if status.healthy {
239            ComponentStatus::Healthy
240        } else {
241            ComponentStatus::Unhealthy
242        },
243        error: status.error,
244    }
245}
246
247/// Create unhealthy Redis and Queue health when queue is unavailable.
248fn create_unavailable_health() -> (RedisHealth, QueueHealth) {
249    let error_msg = "Queue unavailable - cannot check Redis or Queue health";
250    let unhealthy_pool = PoolStatus {
251        connected: false,
252        available: 0,
253        max_size: 0,
254        error: Some(error_msg.to_string()),
255    };
256
257    let redis = RedisHealth {
258        status: ComponentStatus::Unhealthy,
259        primary_pool: unhealthy_pool.clone(),
260        reader_pool: unhealthy_pool,
261        error: Some(error_msg.to_string()),
262    };
263
264    let queue = QueueHealth {
265        status: ComponentStatus::Unhealthy,
266        error: Some("Failed to get queue from job producer".to_string()),
267    };
268
269    (redis, queue)
270}
271
272// ============================================================================
273// System Health Checks
274// ============================================================================
275
276/// Get file descriptor count for current process.
277fn get_fd_count() -> Result<usize, std::io::Error> {
278    let pid = std::process::id();
279
280    #[cfg(target_os = "linux")]
281    {
282        let fd_dir = format!("/proc/{pid}/fd");
283        std::fs::read_dir(fd_dir).map(|entries| entries.count())
284    }
285
286    #[cfg(target_os = "macos")]
287    {
288        use std::process::Command;
289        let output = Command::new("lsof")
290            .args(["-p", &pid.to_string()])
291            .output()?;
292        let count = String::from_utf8_lossy(&output.stdout)
293            .lines()
294            .count()
295            .saturating_sub(1); // Subtract header line
296        Ok(count)
297    }
298
299    #[cfg(not(any(target_os = "linux", target_os = "macos")))]
300    {
301        Ok(0) // Unsupported platform
302    }
303}
304
305/// Get soft file descriptor limit for current process.
306fn get_fd_limit() -> Result<usize, std::io::Error> {
307    #[cfg(any(target_os = "linux", target_os = "macos"))]
308    {
309        use std::process::Command;
310        let output = Command::new("sh").args(["-c", "ulimit -n"]).output()?;
311        let limit = String::from_utf8_lossy(&output.stdout)
312            .trim()
313            .parse()
314            .unwrap_or(1024);
315        Ok(limit)
316    }
317
318    #[cfg(not(any(target_os = "linux", target_os = "macos")))]
319    {
320        Ok(1024) // Default fallback
321    }
322}
323
324/// Get CLOSE_WAIT socket count.
325fn get_close_wait_count() -> Result<usize, std::io::Error> {
326    #[cfg(any(target_os = "linux", target_os = "macos"))]
327    {
328        use std::process::Command;
329        let output = Command::new("sh")
330            .args(["-c", "netstat -an | grep CLOSE_WAIT | wc -l"])
331            .output()?;
332        let count = String::from_utf8_lossy(&output.stdout)
333            .trim()
334            .parse()
335            .unwrap_or(0);
336        Ok(count)
337    }
338
339    #[cfg(not(any(target_os = "linux", target_os = "macos")))]
340    {
341        Ok(0) // Unsupported platform
342    }
343}
344
345/// Evaluate system metrics and return appropriate status with optional error.
346///
347/// Returns (status, optional_error_message).
348fn evaluate_system_metrics(
349    fd_ratio: f64,
350    fd_count: usize,
351    fd_limit: usize,
352    close_wait_count: usize,
353) -> (ComponentStatus, Option<String>) {
354    let mut errors: Vec<String> = Vec::new();
355    let mut is_degraded = false;
356
357    // Check file descriptor usage
358    if fd_ratio > MAX_FD_RATIO {
359        let fd_percent = fd_ratio * 100.0;
360        let max_percent = MAX_FD_RATIO * 100.0;
361        errors.push(format!(
362            "File descriptor limit critical: {fd_count}/{fd_limit} ({fd_percent:.1}% > {max_percent:.1}%)"
363        ));
364    } else if fd_ratio > WARNING_FD_RATIO {
365        is_degraded = true;
366    }
367
368    // Check CLOSE_WAIT sockets
369    if close_wait_count > MAX_CLOSE_WAIT {
370        errors.push(format!(
371            "Too many CLOSE_WAIT sockets: {close_wait_count} > {MAX_CLOSE_WAIT}"
372        ));
373    } else if close_wait_count > WARNING_CLOSE_WAIT {
374        is_degraded = true;
375    }
376
377    let status = if !errors.is_empty() {
378        ComponentStatus::Unhealthy
379    } else if is_degraded {
380        ComponentStatus::Degraded
381    } else {
382        ComponentStatus::Healthy
383    };
384
385    let error = if errors.is_empty() {
386        None
387    } else {
388        Some(errors.join("; "))
389    };
390
391    (status, error)
392}
393
394/// Check system resources and return health status.
395///
396/// Monitors file descriptor usage and CLOSE_WAIT socket count.
397/// - Below 70% FD usage and <200 CLOSE_WAIT: Healthy
398/// - 70-80% FD usage or 200-500 CLOSE_WAIT: Degraded
399/// - Above 80% FD usage or >500 CLOSE_WAIT: Unhealthy
400pub fn check_system_health() -> SystemHealth {
401    let fd_count = get_fd_count().unwrap_or(0);
402    let fd_limit = get_fd_limit().unwrap_or(1024);
403    let close_wait_count = get_close_wait_count().unwrap_or(0);
404
405    let fd_ratio = if fd_limit > 0 {
406        fd_count as f64 / fd_limit as f64
407    } else {
408        0.0
409    };
410    let fd_usage_percent = (fd_ratio * 100.0) as u32;
411
412    let (status, error) = evaluate_system_metrics(fd_ratio, fd_count, fd_limit, close_wait_count);
413
414    SystemHealth {
415        status,
416        fd_count,
417        fd_limit,
418        fd_usage_percent,
419        close_wait_count,
420        error,
421    }
422}
423
424// ============================================================================
425// Plugin Health Checks
426// ============================================================================
427
428/// Determine plugin ComponentStatus from health check result.
429fn determine_plugin_status(healthy: bool, circuit_state: Option<&str>) -> ComponentStatus {
430    if healthy {
431        match circuit_state {
432            Some("closed") | None => ComponentStatus::Healthy,
433            Some("half_open") | Some("open") => ComponentStatus::Degraded,
434            _ => ComponentStatus::Healthy,
435        }
436    } else {
437        ComponentStatus::Degraded
438    }
439}
440
441/// Check plugin health using the global pool manager.
442pub async fn check_plugin_health() -> Option<PluginHealth> {
443    let pool_manager = get_pool_manager();
444
445    if !pool_manager.is_initialized().await {
446        return None;
447    }
448
449    match pool_manager.health_check().await {
450        Ok(plugin_status) => {
451            let status = determine_plugin_status(
452                plugin_status.healthy,
453                plugin_status.circuit_state.as_deref(),
454            );
455
456            Some(PluginHealth {
457                status,
458                enabled: true,
459                circuit_state: plugin_status.circuit_state,
460                error: if plugin_status.healthy {
461                    None
462                } else {
463                    Some(plugin_status.status)
464                },
465                uptime_ms: plugin_status.uptime_ms,
466                memory: plugin_status.memory,
467                pool_completed: plugin_status.pool_completed,
468                pool_queued: plugin_status.pool_queued,
469                success_rate: plugin_status.success_rate,
470                avg_response_time_ms: plugin_status.avg_response_time_ms,
471                recovering: plugin_status.recovering,
472                recovery_percent: plugin_status.recovery_percent,
473                shared_socket_available_slots: plugin_status.shared_socket_available_slots,
474                shared_socket_active_connections: plugin_status.shared_socket_active_connections,
475                shared_socket_registered_executions: plugin_status
476                    .shared_socket_registered_executions,
477                connection_pool_available_slots: plugin_status.connection_pool_available_slots,
478                connection_pool_active_connections: plugin_status
479                    .connection_pool_active_connections,
480            })
481        }
482        Err(e) => Some(PluginHealth {
483            status: ComponentStatus::Degraded,
484            enabled: true,
485            circuit_state: None,
486            error: Some(e.to_string()),
487            uptime_ms: None,
488            memory: None,
489            pool_completed: None,
490            pool_queued: None,
491            success_rate: None,
492            avg_response_time_ms: None,
493            recovering: None,
494            recovery_percent: None,
495            shared_socket_available_slots: None,
496            shared_socket_active_connections: None,
497            shared_socket_registered_executions: None,
498            connection_pool_available_slots: None,
499            connection_pool_active_connections: None,
500        }),
501    }
502}
503
504// ============================================================================
505// Health Aggregation
506// ============================================================================
507
508/// Aggregate component health statuses into overall status and reasons.
509///
510/// Priority: Unhealthy > Degraded > Healthy
511/// Only Unhealthy components contribute to the reason list.
512fn aggregate_health(
513    system: &SystemHealth,
514    redis: &RedisHealth,
515    queue: &QueueHealth,
516    plugins: &Option<PluginHealth>,
517) -> (ComponentStatus, Option<String>) {
518    let mut reasons: Vec<String> = Vec::new();
519    let mut overall_status = ComponentStatus::Healthy;
520
521    // System check - unhealthy = 503
522    if system.status == ComponentStatus::Unhealthy {
523        overall_status = ComponentStatus::Unhealthy;
524        if let Some(ref err) = system.error {
525            reasons.push(err.clone());
526        }
527    } else if system.status == ComponentStatus::Degraded
528        && overall_status == ComponentStatus::Healthy
529    {
530        overall_status = ComponentStatus::Degraded;
531    }
532
533    // Redis check - unhealthy = 503
534    if redis.status == ComponentStatus::Unhealthy {
535        overall_status = ComponentStatus::Unhealthy;
536        if let Some(ref err) = redis.error {
537            reasons.push(err.clone());
538        }
539    } else if redis.status == ComponentStatus::Degraded
540        && overall_status == ComponentStatus::Healthy
541    {
542        overall_status = ComponentStatus::Degraded;
543    }
544
545    // Queue check - unhealthy = 503
546    if queue.status == ComponentStatus::Unhealthy {
547        overall_status = ComponentStatus::Unhealthy;
548        if let Some(ref err) = queue.error {
549            reasons.push(err.clone());
550        }
551    } else if queue.status == ComponentStatus::Degraded
552        && overall_status == ComponentStatus::Healthy
553    {
554        overall_status = ComponentStatus::Degraded;
555    }
556
557    // Plugin check - degraded only (doesn't cause 503)
558    if let Some(ref plugin_health) = plugins {
559        if plugin_health.status == ComponentStatus::Degraded
560            && overall_status == ComponentStatus::Healthy
561        {
562            overall_status = ComponentStatus::Degraded;
563        }
564    }
565
566    let reason = if reasons.is_empty() {
567        None
568    } else {
569        Some(reasons.join("; "))
570    };
571
572    (overall_status, reason)
573}
574
575/// Build the final ReadinessResponse from components.
576fn build_response(
577    system: SystemHealth,
578    redis: RedisHealth,
579    queue: QueueHealth,
580    plugins: Option<PluginHealth>,
581) -> ReadinessResponse {
582    let (overall_status, reason) = aggregate_health(&system, &redis, &queue, &plugins);
583    let ready = overall_status != ComponentStatus::Unhealthy;
584
585    ReadinessResponse {
586        ready,
587        status: overall_status,
588        reason,
589        components: Components {
590            system,
591            redis,
592            queue,
593            plugins,
594        },
595        timestamp: chrono::Utc::now().to_rfc3339(),
596    }
597}
598
599// ============================================================================
600// Public API
601// ============================================================================
602
603/// Get readiness response with caching.
604///
605/// Checks the cache first (10-second TTL). On cache miss, performs health checks
606/// for all components: system resources, Redis pools, queue, and plugins.
607///
608/// Returns 200 OK if ready (Healthy or Degraded), 503 if Unhealthy.
609pub async fn get_readiness<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
610    data: ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
611) -> ReadinessResponse
612where
613    J: JobProducerTrait + Send + Sync + 'static,
614    RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
615    TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
616    NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
617    NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
618    SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
619    TCR: TransactionCounterTrait + Send + Sync + 'static,
620    PR: PluginRepositoryTrait + Send + Sync + 'static,
621    AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
622{
623    // Check cache first to avoid unnecessary health checks
624    if let Some(cached) = get_cached_response().await {
625        return cached;
626    }
627
628    // Try to get queue for Redis and Queue health checks
629    let queue = match data.job_producer.get_queue().await {
630        Ok(queue) => Some(queue),
631        Err(e) => {
632            tracing::warn!(error = %e, "Failed to get queue from job producer");
633            None
634        }
635    };
636
637    // Perform health checks
638    let system = check_system_health();
639    let plugins = check_plugin_health().await;
640
641    let (redis, queue_health) = if let Some(ref q) = queue {
642        let redis_connections = q.redis_connections();
643        let redis_status = check_redis_health(&redis_connections).await;
644        let queue_status = check_queue_health(q).await;
645
646        (
647            redis_status_to_health(redis_status),
648            queue_status_to_health(queue_status),
649        )
650    } else {
651        create_unavailable_health()
652    };
653
654    // Build response
655    let response = build_response(system, redis, queue_health, plugins);
656
657    // Cache only if we have a working queue (cache is less useful in degraded state)
658    if queue.is_some() {
659        cache_response(&response).await;
660    }
661
662    response
663}
664
665// ============================================================================
666// Tests
667// ============================================================================
668
669#[cfg(test)]
670mod tests {
671    use super::*;
672
673    // -------------------------------------------------------------------------
674    // Constants Tests
675    // -------------------------------------------------------------------------
676
677    #[test]
678    fn test_constants() {
679        assert_eq!(PING_TIMEOUT, Duration::from_millis(3000));
680        assert_eq!(WARNING_FD_RATIO, 0.7);
681        assert_eq!(MAX_FD_RATIO, 0.8);
682        assert_eq!(WARNING_CLOSE_WAIT, 200);
683        assert_eq!(MAX_CLOSE_WAIT, 500);
684        assert_eq!(HEALTH_CACHE_TTL, Duration::from_secs(10));
685    }
686
687    // -------------------------------------------------------------------------
688    // System Metrics Evaluation Tests
689    // -------------------------------------------------------------------------
690
691    #[test]
692    fn test_evaluate_system_metrics_healthy() {
693        // Low FD usage, low CLOSE_WAIT
694        let (status, error) = evaluate_system_metrics(0.5, 500, 1000, 20);
695        assert_eq!(status, ComponentStatus::Healthy);
696        assert!(error.is_none());
697    }
698
699    #[test]
700    fn test_evaluate_system_metrics_degraded_fd() {
701        // FD at 75% (between warning and max)
702        let (status, error) = evaluate_system_metrics(0.75, 750, 1000, 20);
703        assert_eq!(status, ComponentStatus::Degraded);
704        assert!(error.is_none());
705    }
706
707    #[test]
708    fn test_evaluate_system_metrics_degraded_close_wait() {
709        // CLOSE_WAIT at 300 (between warning 200 and max 500)
710        let (status, error) = evaluate_system_metrics(0.5, 500, 1000, 300);
711        assert_eq!(status, ComponentStatus::Degraded);
712        assert!(error.is_none());
713    }
714
715    #[test]
716    fn test_evaluate_system_metrics_unhealthy_fd() {
717        // FD at 85% (above max)
718        let (status, error) = evaluate_system_metrics(0.85, 850, 1000, 20);
719        assert_eq!(status, ComponentStatus::Unhealthy);
720        assert!(error.is_some());
721        assert!(error.unwrap().contains("File descriptor limit critical"));
722    }
723
724    #[test]
725    fn test_evaluate_system_metrics_unhealthy_close_wait() {
726        // CLOSE_WAIT at 600 (above max 500)
727        let (status, error) = evaluate_system_metrics(0.5, 500, 1000, 600);
728        assert_eq!(status, ComponentStatus::Unhealthy);
729        assert!(error.is_some());
730        assert!(error.unwrap().contains("CLOSE_WAIT"));
731    }
732
733    #[test]
734    fn test_evaluate_system_metrics_both_unhealthy() {
735        // Both FD and CLOSE_WAIT above max
736        let (status, error) = evaluate_system_metrics(0.9, 900, 1000, 600);
737        assert_eq!(status, ComponentStatus::Unhealthy);
738        assert!(error.is_some());
739        let err = error.unwrap();
740        assert!(err.contains("File descriptor"));
741        assert!(err.contains("CLOSE_WAIT"));
742    }
743
744    #[test]
745    fn test_evaluate_system_metrics_at_warning_threshold() {
746        // Exactly at warning threshold (70% FD, 200 CLOSE_WAIT)
747        let (status, _) = evaluate_system_metrics(0.7, 700, 1000, 200);
748        // At exactly warning threshold, should still be healthy (> comparison)
749        assert_eq!(status, ComponentStatus::Healthy);
750    }
751
752    #[test]
753    fn test_evaluate_system_metrics_just_above_warning() {
754        // Just above warning threshold
755        let (status, _) = evaluate_system_metrics(0.71, 710, 1000, 201);
756        assert_eq!(status, ComponentStatus::Degraded);
757    }
758
759    #[test]
760    fn test_evaluate_system_metrics_at_max_threshold() {
761        // Exactly at max threshold (80% FD, 500 CLOSE_WAIT)
762        let (status, _) = evaluate_system_metrics(0.8, 800, 1000, 500);
763        // At exactly max, should be degraded (> comparison for unhealthy)
764        assert_eq!(status, ComponentStatus::Degraded);
765    }
766
767    #[test]
768    fn test_evaluate_system_metrics_zero_limit() {
769        // Edge case: zero fd_limit should not cause division by zero
770        let (status, _) = evaluate_system_metrics(0.0, 0, 0, 0);
771        assert_eq!(status, ComponentStatus::Healthy);
772    }
773
774    // -------------------------------------------------------------------------
775    // Plugin Status Tests
776    // -------------------------------------------------------------------------
777
778    #[test]
779    fn test_determine_plugin_status_healthy_closed() {
780        assert_eq!(
781            determine_plugin_status(true, Some("closed")),
782            ComponentStatus::Healthy
783        );
784    }
785
786    #[test]
787    fn test_determine_plugin_status_healthy_none() {
788        assert_eq!(
789            determine_plugin_status(true, None),
790            ComponentStatus::Healthy
791        );
792    }
793
794    #[test]
795    fn test_determine_plugin_status_healthy_half_open() {
796        assert_eq!(
797            determine_plugin_status(true, Some("half_open")),
798            ComponentStatus::Degraded
799        );
800    }
801
802    #[test]
803    fn test_determine_plugin_status_healthy_open() {
804        assert_eq!(
805            determine_plugin_status(true, Some("open")),
806            ComponentStatus::Degraded
807        );
808    }
809
810    #[test]
811    fn test_determine_plugin_status_unhealthy() {
812        assert_eq!(
813            determine_plugin_status(false, Some("closed")),
814            ComponentStatus::Degraded
815        );
816    }
817
818    #[test]
819    fn test_determine_plugin_status_unknown_state() {
820        assert_eq!(
821            determine_plugin_status(true, Some("unknown")),
822            ComponentStatus::Healthy
823        );
824    }
825
826    // -------------------------------------------------------------------------
827    // Health Aggregation Tests
828    // -------------------------------------------------------------------------
829
830    fn create_healthy_system() -> SystemHealth {
831        SystemHealth {
832            status: ComponentStatus::Healthy,
833            fd_count: 100,
834            fd_limit: 1000,
835            fd_usage_percent: 10,
836            close_wait_count: 5,
837            error: None,
838        }
839    }
840
841    fn create_healthy_redis() -> RedisHealth {
842        RedisHealth {
843            status: ComponentStatus::Healthy,
844            primary_pool: PoolStatus {
845                connected: true,
846                available: 8,
847                max_size: 16,
848                error: None,
849            },
850            reader_pool: PoolStatus {
851                connected: true,
852                available: 8,
853                max_size: 16,
854                error: None,
855            },
856            error: None,
857        }
858    }
859
860    fn create_healthy_queue() -> QueueHealth {
861        QueueHealth {
862            status: ComponentStatus::Healthy,
863            error: None,
864        }
865    }
866
867    #[test]
868    fn test_aggregate_health_all_healthy() {
869        let system = create_healthy_system();
870        let redis = create_healthy_redis();
871        let queue = create_healthy_queue();
872
873        let (status, reason) = aggregate_health(&system, &redis, &queue, &None);
874        assert_eq!(status, ComponentStatus::Healthy);
875        assert!(reason.is_none());
876    }
877
878    #[test]
879    fn test_aggregate_health_system_degraded() {
880        let mut system = create_healthy_system();
881        system.status = ComponentStatus::Degraded;
882        let redis = create_healthy_redis();
883        let queue = create_healthy_queue();
884
885        let (status, reason) = aggregate_health(&system, &redis, &queue, &None);
886        assert_eq!(status, ComponentStatus::Degraded);
887        assert!(reason.is_none());
888    }
889
890    #[test]
891    fn test_aggregate_health_system_unhealthy() {
892        let mut system = create_healthy_system();
893        system.status = ComponentStatus::Unhealthy;
894        system.error = Some("FD limit exceeded".to_string());
895        let redis = create_healthy_redis();
896        let queue = create_healthy_queue();
897
898        let (status, reason) = aggregate_health(&system, &redis, &queue, &None);
899        assert_eq!(status, ComponentStatus::Unhealthy);
900        assert!(reason.is_some());
901        assert!(reason.unwrap().contains("FD limit"));
902    }
903
904    #[test]
905    fn test_aggregate_health_redis_unhealthy() {
906        let system = create_healthy_system();
907        let mut redis = create_healthy_redis();
908        redis.status = ComponentStatus::Unhealthy;
909        redis.error = Some("Primary pool down".to_string());
910        let queue = create_healthy_queue();
911
912        let (status, reason) = aggregate_health(&system, &redis, &queue, &None);
913        assert_eq!(status, ComponentStatus::Unhealthy);
914        assert!(reason.unwrap().contains("Primary pool"));
915    }
916
917    #[test]
918    fn test_aggregate_health_queue_unhealthy() {
919        let system = create_healthy_system();
920        let redis = create_healthy_redis();
921        let mut queue = create_healthy_queue();
922        queue.status = ComponentStatus::Unhealthy;
923        queue.error = Some("Queue timeout".to_string());
924
925        let (status, reason) = aggregate_health(&system, &redis, &queue, &None);
926        assert_eq!(status, ComponentStatus::Unhealthy);
927        assert!(reason.unwrap().contains("Queue timeout"));
928    }
929
930    #[test]
931    fn test_aggregate_health_multiple_unhealthy() {
932        let mut system = create_healthy_system();
933        system.status = ComponentStatus::Unhealthy;
934        system.error = Some("FD limit".to_string());
935
936        let mut redis = create_healthy_redis();
937        redis.status = ComponentStatus::Unhealthy;
938        redis.error = Some("Redis down".to_string());
939
940        let queue = create_healthy_queue();
941
942        let (status, reason) = aggregate_health(&system, &redis, &queue, &None);
943        assert_eq!(status, ComponentStatus::Unhealthy);
944        let reason_str = reason.unwrap();
945        assert!(reason_str.contains("FD limit"));
946        assert!(reason_str.contains("Redis down"));
947    }
948
949    #[test]
950    fn test_aggregate_health_plugin_degraded_only() {
951        let system = create_healthy_system();
952        let redis = create_healthy_redis();
953        let queue = create_healthy_queue();
954        let plugins = Some(PluginHealth {
955            status: ComponentStatus::Degraded,
956            enabled: true,
957            circuit_state: Some("open".to_string()),
958            error: Some("Circuit open".to_string()),
959            uptime_ms: None,
960            memory: None,
961            pool_completed: None,
962            pool_queued: None,
963            success_rate: None,
964            avg_response_time_ms: None,
965            recovering: None,
966            recovery_percent: None,
967            shared_socket_available_slots: None,
968            shared_socket_active_connections: None,
969            shared_socket_registered_executions: None,
970            connection_pool_available_slots: None,
971            connection_pool_active_connections: None,
972        });
973
974        let (status, reason) = aggregate_health(&system, &redis, &queue, &plugins);
975        // Plugin degraded doesn't cause 503, just degrades overall status
976        assert_eq!(status, ComponentStatus::Degraded);
977        assert!(reason.is_none());
978    }
979
980    #[test]
981    fn test_aggregate_health_unhealthy_overrides_degraded() {
982        let mut system = create_healthy_system();
983        system.status = ComponentStatus::Degraded;
984
985        let mut redis = create_healthy_redis();
986        redis.status = ComponentStatus::Unhealthy;
987        redis.error = Some("Redis down".to_string());
988
989        let queue = create_healthy_queue();
990
991        let (status, _) = aggregate_health(&system, &redis, &queue, &None);
992        assert_eq!(status, ComponentStatus::Unhealthy);
993    }
994
995    // -------------------------------------------------------------------------
996    // Build Response Tests
997    // -------------------------------------------------------------------------
998
999    #[test]
1000    fn test_build_response_ready() {
1001        let response = build_response(
1002            create_healthy_system(),
1003            create_healthy_redis(),
1004            create_healthy_queue(),
1005            None,
1006        );
1007
1008        assert!(response.ready);
1009        assert_eq!(response.status, ComponentStatus::Healthy);
1010        assert!(response.reason.is_none());
1011        assert!(!response.timestamp.is_empty());
1012    }
1013
1014    #[test]
1015    fn test_build_response_not_ready() {
1016        let mut system = create_healthy_system();
1017        system.status = ComponentStatus::Unhealthy;
1018        system.error = Some("Critical error".to_string());
1019
1020        let response = build_response(system, create_healthy_redis(), create_healthy_queue(), None);
1021
1022        assert!(!response.ready);
1023        assert_eq!(response.status, ComponentStatus::Unhealthy);
1024        assert!(response.reason.is_some());
1025    }
1026
1027    #[test]
1028    fn test_build_response_degraded_is_ready() {
1029        let mut system = create_healthy_system();
1030        system.status = ComponentStatus::Degraded;
1031
1032        let response = build_response(system, create_healthy_redis(), create_healthy_queue(), None);
1033
1034        // Degraded is still ready (returns 200, not 503)
1035        assert!(response.ready);
1036        assert_eq!(response.status, ComponentStatus::Degraded);
1037    }
1038
1039    // -------------------------------------------------------------------------
1040    // Redis/Queue Helper Tests
1041    // -------------------------------------------------------------------------
1042
1043    #[test]
1044    fn test_redis_status_to_health_healthy() {
1045        let status = RedisHealthStatus {
1046            healthy: true,
1047            primary_pool: PoolStatus {
1048                connected: true,
1049                available: 8,
1050                max_size: 16,
1051                error: None,
1052            },
1053            reader_pool: PoolStatus {
1054                connected: true,
1055                available: 8,
1056                max_size: 16,
1057                error: None,
1058            },
1059            error: None,
1060        };
1061
1062        let health = redis_status_to_health(status);
1063        assert_eq!(health.status, ComponentStatus::Healthy);
1064    }
1065
1066    #[test]
1067    fn test_redis_status_to_health_degraded() {
1068        let status = RedisHealthStatus {
1069            healthy: true,
1070            primary_pool: PoolStatus {
1071                connected: true,
1072                available: 8,
1073                max_size: 16,
1074                error: None,
1075            },
1076            reader_pool: PoolStatus {
1077                connected: false, // Reader down
1078                available: 0,
1079                max_size: 16,
1080                error: Some("Connection refused".to_string()),
1081            },
1082            error: None,
1083        };
1084
1085        let health = redis_status_to_health(status);
1086        assert_eq!(health.status, ComponentStatus::Degraded);
1087    }
1088
1089    #[test]
1090    fn test_redis_status_to_health_unhealthy() {
1091        let status = RedisHealthStatus {
1092            healthy: false,
1093            primary_pool: PoolStatus {
1094                connected: false,
1095                available: 0,
1096                max_size: 16,
1097                error: Some("PING timeout".to_string()),
1098            },
1099            reader_pool: PoolStatus {
1100                connected: false,
1101                available: 0,
1102                max_size: 16,
1103                error: Some("PING timeout".to_string()),
1104            },
1105            error: Some("Primary pool failed".to_string()),
1106        };
1107
1108        let health = redis_status_to_health(status);
1109        assert_eq!(health.status, ComponentStatus::Unhealthy);
1110    }
1111
1112    #[test]
1113    fn test_queue_status_to_health_healthy() {
1114        let status = QueueHealthStatus {
1115            healthy: true,
1116            error: None,
1117        };
1118
1119        let health = queue_status_to_health(status);
1120        assert_eq!(health.status, ComponentStatus::Healthy);
1121        assert!(health.error.is_none());
1122    }
1123
1124    #[test]
1125    fn test_queue_status_to_health_unhealthy() {
1126        let status = QueueHealthStatus {
1127            healthy: false,
1128            error: Some("Stats timeout".to_string()),
1129        };
1130
1131        let health = queue_status_to_health(status);
1132        assert_eq!(health.status, ComponentStatus::Unhealthy);
1133        assert!(health.error.is_some());
1134    }
1135
1136    #[test]
1137    fn test_create_unavailable_health() {
1138        let (redis, queue) = create_unavailable_health();
1139
1140        assert_eq!(redis.status, ComponentStatus::Unhealthy);
1141        assert!(!redis.primary_pool.connected);
1142        assert!(!redis.reader_pool.connected);
1143        assert!(redis.error.is_some());
1144
1145        assert_eq!(queue.status, ComponentStatus::Unhealthy);
1146        assert!(queue.error.is_some());
1147    }
1148
1149    // -------------------------------------------------------------------------
1150    // Serialization Tests
1151    // -------------------------------------------------------------------------
1152
1153    #[test]
1154    fn test_pool_status_serialization_without_error() {
1155        let status = PoolStatus {
1156            connected: true,
1157            available: 8,
1158            max_size: 16,
1159            error: None,
1160        };
1161
1162        let json = serde_json::to_string(&status).unwrap();
1163        assert!(json.contains("\"connected\":true"));
1164        assert!(json.contains("\"available\":8"));
1165        assert!(json.contains("\"max_size\":16"));
1166        // error should be omitted when None (skip_serializing_if)
1167        assert!(!json.contains("error"));
1168    }
1169
1170    #[test]
1171    fn test_pool_status_serialization_with_error() {
1172        let status = PoolStatus {
1173            connected: false,
1174            available: 0,
1175            max_size: 16,
1176            error: Some("Connection refused".to_string()),
1177        };
1178
1179        let json = serde_json::to_string(&status).unwrap();
1180        assert!(json.contains("\"connected\":false"));
1181        assert!(json.contains("\"error\":\"Connection refused\""));
1182    }
1183
1184    // -------------------------------------------------------------------------
1185    // System Health Integration Test
1186    // -------------------------------------------------------------------------
1187
1188    #[actix_web::test]
1189    async fn test_check_system_health_returns_valid_data() {
1190        let health = check_system_health();
1191
1192        // Should have positive fd_limit
1193        assert!(health.fd_limit > 0);
1194
1195        // fd_usage_percent should be reasonable (0-100 under normal conditions)
1196        assert!(health.fd_usage_percent <= 100);
1197
1198        // close_wait_count should be non-negative (always true for usize)
1199        // Status should be one of the valid enum values
1200        assert!(matches!(
1201            health.status,
1202            ComponentStatus::Healthy | ComponentStatus::Degraded | ComponentStatus::Unhealthy
1203        ));
1204    }
1205
1206    // -------------------------------------------------------------------------
1207    // Nested Result Pattern Tests (documents check_queue_health behavior)
1208    // -------------------------------------------------------------------------
1209
1210    /// Represents a timeout error for testing (mirrors tokio::time::error::Elapsed)
1211    #[derive(Debug)]
1212    struct TimeoutError;
1213
1214    /// Helper that mimics the nested Result pattern from timeout + async operation.
1215    /// This documents the correct handling to prevent regression of the bug where
1216    /// `result.is_ok()` incorrectly marked failed operations as healthy.
1217    fn evaluate_nested_result<T, E: std::fmt::Display>(
1218        result: Result<Result<T, E>, TimeoutError>,
1219    ) -> (bool, Option<String>) {
1220        match result {
1221            Ok(Ok(_)) => (true, None),
1222            Ok(Err(e)) => (false, Some(format!("Operation failed: {e}"))),
1223            Err(_) => (false, Some("Operation timed out".to_string())),
1224        }
1225    }
1226
1227    #[test]
1228    fn test_nested_result_success() {
1229        // Simulates: timeout didn't expire AND inner operation succeeded
1230        let result: Result<Result<(), &str>, TimeoutError> = Ok(Ok(()));
1231        let (healthy, error) = evaluate_nested_result(result);
1232
1233        assert!(healthy);
1234        assert!(error.is_none());
1235    }
1236
1237    #[test]
1238    fn test_nested_result_inner_error() {
1239        // Simulates: timeout didn't expire BUT inner operation failed
1240        // This is the bug case - previously `result.is_ok()` returned true here!
1241        let result: Result<Result<(), &str>, TimeoutError> = Ok(Err("connection refused"));
1242        let (healthy, error) = evaluate_nested_result(result);
1243
1244        assert!(!healthy, "Inner error should mark as unhealthy");
1245        assert!(error.is_some());
1246        assert!(error.unwrap().contains("connection refused"));
1247    }
1248
1249    #[test]
1250    fn test_nested_result_timeout() {
1251        // Simulates: timeout expired
1252        let result: Result<Result<(), &str>, TimeoutError> = Err(TimeoutError);
1253        let (healthy, error) = evaluate_nested_result(result);
1254
1255        assert!(!healthy);
1256        assert!(error.is_some());
1257        assert!(error.unwrap().contains("timed out"));
1258    }
1259
1260    #[test]
1261    fn test_nested_result_is_ok_pitfall() {
1262        // Documents the pitfall: is_ok() only checks outer Result
1263        let inner_error: Result<Result<(), &str>, TimeoutError> = Ok(Err("inner error"));
1264
1265        // This is the WRONG way (the bug)
1266        let wrong_healthy = inner_error.is_ok();
1267        assert!(
1268            wrong_healthy,
1269            "is_ok() returns true even with inner error - this is the bug!"
1270        );
1271
1272        // This is the CORRECT way (the fix)
1273        let correct_healthy = matches!(inner_error, Ok(Ok(_)));
1274        assert!(
1275            !correct_healthy,
1276            "matches! correctly identifies inner error"
1277        );
1278    }
1279
1280    // -------------------------------------------------------------------------
1281    // Cache Tests
1282    // -------------------------------------------------------------------------
1283
1284    #[actix_web::test]
1285    async fn test_cache_operations() {
1286        // Clear any existing cache
1287        clear_cache().await;
1288
1289        // Should be empty initially
1290        let cached = get_cached_response().await;
1291        assert!(cached.is_none());
1292
1293        // Cache a response
1294        let response = build_response(
1295            create_healthy_system(),
1296            create_healthy_redis(),
1297            create_healthy_queue(),
1298            None,
1299        );
1300        cache_response(&response).await;
1301
1302        // Should now be cached
1303        let cached = get_cached_response().await;
1304        assert!(cached.is_some());
1305        assert_eq!(cached.unwrap().ready, response.ready);
1306
1307        // Clean up
1308        clear_cache().await;
1309    }
1310}