openzeppelin_relayer/services/plugins/
pool_executor.rs

1//! Pool-based Plugin Executor
2//!
3//! This module provides execution of pre-compiled JavaScript plugins via
4//! a persistent Piscina worker pool, replacing the per-request ts-node approach.
5//!
6//! Communication with the Node.js pool server happens via Unix socket using
7//! a JSON-line protocol.
8
9use std::collections::HashMap;
10use std::process::Stdio;
11use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14use tokio::io::{AsyncBufReadExt, BufReader};
15use tokio::process::{Child, Command};
16use tokio::sync::oneshot;
17use uuid::Uuid;
18
19use super::config::get_config;
20use super::connection::{ConnectionPool, PoolConnection};
21use super::health::{
22    CircuitBreaker, CircuitState, DeadServerIndicator, HealthStatus, ProcessStatus,
23};
24use super::protocol::{PoolError, PoolRequest, PoolResponse};
25use super::shared_socket::get_shared_socket_service;
26use super::{LogEntry, PluginError, PluginHandlerPayload, ScriptResult};
27
28/// Request queue entry for throttling
29struct QueuedRequest {
30    plugin_id: String,
31    compiled_code: Option<String>,
32    plugin_path: Option<String>,
33    params: serde_json::Value,
34    headers: Option<HashMap<String, Vec<String>>>,
35    socket_path: String,
36    http_request_id: Option<String>,
37    timeout_secs: Option<u64>,
38    route: Option<String>,
39    config: Option<serde_json::Value>,
40    method: Option<String>,
41    query: Option<serde_json::Value>,
42    response_tx: oneshot::Sender<Result<ScriptResult, PluginError>>,
43}
44
45/// Parsed health check result fields extracted from pool server JSON response.
46///
47/// This struct replaces a complex tuple return type to satisfy Clippy's
48/// `type_complexity` lint and improve readability.
49#[derive(Debug, Default, PartialEq)]
50pub struct ParsedHealthResult {
51    pub status: String,
52    pub uptime_ms: Option<u64>,
53    pub memory: Option<u64>,
54    pub pool_completed: Option<u64>,
55    pub pool_queued: Option<u64>,
56    pub success_rate: Option<f64>,
57}
58
59/// Manages the pool server process and connections
60pub struct PoolManager {
61    socket_path: String,
62    process: tokio::sync::Mutex<Option<Child>>,
63    initialized: Arc<AtomicBool>,
64    /// Lock to prevent concurrent restarts (thundering herd)
65    restart_lock: tokio::sync::Mutex<()>,
66    /// Connection pool for reusing connections
67    connection_pool: Arc<ConnectionPool>,
68    /// Request queue for throttling/backpressure (multi-consumer channel)
69    request_tx: async_channel::Sender<QueuedRequest>,
70    /// Actual configured queue size (for error messages)
71    max_queue_size: usize,
72    /// Flag indicating if health check is needed (set by background task)
73    health_check_needed: Arc<AtomicBool>,
74    /// Consecutive failure count for health checks
75    consecutive_failures: Arc<AtomicU32>,
76    /// Circuit breaker for automatic degradation under GC pressure
77    circuit_breaker: Arc<CircuitBreaker>,
78    /// Last successful restart time (for backoff calculation)
79    last_restart_time_ms: Arc<AtomicU64>,
80    /// Is currently in recovery mode (gradual ramp-up)
81    recovery_mode: Arc<AtomicBool>,
82    /// Requests allowed during recovery (gradual increase)
83    recovery_allowance: Arc<AtomicU32>,
84    /// Shutdown signal for background tasks (queue workers, health check, etc.)
85    shutdown_signal: Arc<tokio::sync::Notify>,
86}
87
88impl Default for PoolManager {
89    fn default() -> Self {
90        Self::new()
91    }
92}
93
94impl PoolManager {
95    /// Base heap size in MB for the pool server process.
96    /// This provides the minimum memory needed for the Node.js runtime and core pool infrastructure.
97    const BASE_HEAP_MB: usize = 512;
98
99    /// Concurrency divisor for heap calculation.
100    /// Heap is incremented for every N concurrent requests to scale with load.
101    const CONCURRENCY_DIVISOR: usize = 10;
102
103    /// Heap increment in MB per CONCURRENCY_DIVISOR concurrent requests.
104    /// Formula: BASE_HEAP_MB + ((max_concurrency / CONCURRENCY_DIVISOR) * HEAP_INCREMENT_PER_DIVISOR_MB)
105    /// This accounts for additional memory needed per concurrent plugin execution context.
106    const HEAP_INCREMENT_PER_DIVISOR_MB: usize = 32;
107
108    /// Maximum heap size in MB (hard cap) for the pool server process.
109    /// Prevents excessive memory allocation that could cause system instability.
110    /// Set to 8GB (8192 MB) as a reasonable upper bound for Node.js processes.
111    const MAX_HEAP_MB: usize = 8192;
112
113    /// Calculate heap size based on concurrency level.
114    ///
115    /// Formula: BASE_HEAP_MB + ((max_concurrency / CONCURRENCY_DIVISOR) * HEAP_INCREMENT_PER_DIVISOR_MB)
116    /// Result is capped at MAX_HEAP_MB.
117    ///
118    /// This scales memory allocation with expected load while maintaining a reasonable minimum.
119    pub fn calculate_heap_size(max_concurrency: usize) -> usize {
120        let calculated = Self::BASE_HEAP_MB
121            + ((max_concurrency / Self::CONCURRENCY_DIVISOR) * Self::HEAP_INCREMENT_PER_DIVISOR_MB);
122        calculated.min(Self::MAX_HEAP_MB)
123    }
124
125    /// Format a result value from the pool response into a string.
126    ///
127    /// If the value is already a string, returns it directly.
128    /// Otherwise, serializes it to JSON.
129    pub fn format_return_value(value: Option<serde_json::Value>) -> String {
130        value
131            .map(|v| {
132                if v.is_string() {
133                    v.as_str().unwrap_or("").to_string()
134                } else {
135                    serde_json::to_string(&v).unwrap_or_default()
136                }
137            })
138            .unwrap_or_default()
139    }
140
141    /// Parse a successful pool response into a ScriptResult.
142    ///
143    /// Converts logs from PoolLogEntry to LogEntry and extracts the return value.
144    pub fn parse_success_response(response: PoolResponse) -> ScriptResult {
145        let logs: Vec<LogEntry> = response
146            .logs
147            .map(|logs| logs.into_iter().map(|l| l.into()).collect())
148            .unwrap_or_default();
149
150        ScriptResult {
151            logs,
152            error: String::new(),
153            return_value: Self::format_return_value(response.result),
154            trace: Vec::new(),
155        }
156    }
157
158    /// Parse a failed pool response into a PluginError.
159    ///
160    /// Extracts error details and converts logs for inclusion in the error payload.
161    pub fn parse_error_response(response: PoolResponse) -> PluginError {
162        let logs: Vec<LogEntry> = response
163            .logs
164            .map(|logs| logs.into_iter().map(|l| l.into()).collect())
165            .unwrap_or_default();
166
167        let error = response.error.unwrap_or(PoolError {
168            message: "Unknown error".to_string(),
169            code: None,
170            status: None,
171            details: None,
172        });
173
174        PluginError::HandlerError(Box::new(PluginHandlerPayload {
175            message: error.message,
176            status: error.status.unwrap_or(500),
177            code: error.code,
178            details: error.details,
179            logs: Some(logs),
180            traces: None,
181        }))
182    }
183
184    /// Parse a pool response into either a success result or an error.
185    ///
186    /// This is the main entry point for response parsing, dispatching to
187    /// either parse_success_response or parse_error_response based on the success flag.
188    pub fn parse_pool_response(response: PoolResponse) -> Result<ScriptResult, PluginError> {
189        if response.success {
190            Ok(Self::parse_success_response(response))
191        } else {
192            Err(Self::parse_error_response(response))
193        }
194    }
195
196    /// Parse health check result JSON into individual fields.
197    ///
198    /// Extracts status, uptime, memory usage, pool stats, and success rate
199    /// from the nested JSON structure returned by the pool server.
200    pub fn parse_health_result(result: &serde_json::Value) -> ParsedHealthResult {
201        ParsedHealthResult {
202            status: result
203                .get("status")
204                .and_then(|v| v.as_str())
205                .unwrap_or("unknown")
206                .to_string(),
207            uptime_ms: result.get("uptime").and_then(|v| v.as_u64()),
208            memory: result
209                .get("memory")
210                .and_then(|v| v.get("heapUsed"))
211                .and_then(|v| v.as_u64()),
212            pool_completed: result
213                .get("pool")
214                .and_then(|v| v.get("completed"))
215                .and_then(|v| v.as_u64()),
216            pool_queued: result
217                .get("pool")
218                .and_then(|v| v.get("queued"))
219                .and_then(|v| v.as_u64()),
220            success_rate: result
221                .get("execution")
222                .and_then(|v| v.get("successRate"))
223                .and_then(|v| v.as_f64()),
224        }
225    }
226
227    /// Create a new PoolManager with default socket path
228    pub fn new() -> Self {
229        Self::init(format!("/tmp/relayer-plugin-pool-{}.sock", Uuid::new_v4()))
230    }
231
232    /// Create a new PoolManager with custom socket path
233    pub fn with_socket_path(socket_path: String) -> Self {
234        Self::init(socket_path)
235    }
236
237    /// Common initialization logic
238    fn init(socket_path: String) -> Self {
239        let config = get_config();
240        let max_connections = config.pool_max_connections;
241        let max_queue_size = config.pool_max_queue_size;
242
243        let (tx, rx) = async_channel::bounded(max_queue_size);
244
245        let connection_pool = Arc::new(ConnectionPool::new(socket_path.clone(), max_connections));
246        let connection_pool_clone = connection_pool.clone();
247
248        let shutdown_signal = Arc::new(tokio::sync::Notify::new());
249
250        Self::spawn_queue_workers(
251            rx,
252            connection_pool_clone,
253            config.pool_workers,
254            shutdown_signal.clone(),
255        );
256
257        let health_check_needed = Arc::new(AtomicBool::new(false));
258        let consecutive_failures = Arc::new(AtomicU32::new(0));
259        let circuit_breaker = Arc::new(CircuitBreaker::new());
260        let last_restart_time_ms = Arc::new(AtomicU64::new(0));
261        let recovery_mode = Arc::new(AtomicBool::new(false));
262        let recovery_allowance = Arc::new(AtomicU32::new(0));
263
264        Self::spawn_health_check_task(
265            health_check_needed.clone(),
266            config.health_check_interval_secs,
267            shutdown_signal.clone(),
268        );
269
270        Self::spawn_recovery_task(
271            recovery_mode.clone(),
272            recovery_allowance.clone(),
273            shutdown_signal.clone(),
274        );
275
276        Self {
277            connection_pool,
278            socket_path,
279            process: tokio::sync::Mutex::new(None),
280            initialized: Arc::new(AtomicBool::new(false)),
281            restart_lock: tokio::sync::Mutex::new(()),
282            request_tx: tx,
283            max_queue_size,
284            health_check_needed,
285            consecutive_failures,
286            circuit_breaker,
287            last_restart_time_ms,
288            recovery_mode,
289            recovery_allowance,
290            shutdown_signal,
291        }
292    }
293
294    /// Spawn background task to gradually increase recovery allowance
295    fn spawn_recovery_task(
296        recovery_mode: Arc<AtomicBool>,
297        recovery_allowance: Arc<AtomicU32>,
298        shutdown_signal: Arc<tokio::sync::Notify>,
299    ) {
300        tokio::spawn(async move {
301            let mut interval = tokio::time::interval(Duration::from_millis(500));
302            interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
303
304            loop {
305                tokio::select! {
306                    biased;
307
308                    _ = shutdown_signal.notified() => {
309                        tracing::debug!("Recovery task received shutdown signal");
310                        break;
311                    }
312
313                    _ = interval.tick() => {
314                        if recovery_mode.load(Ordering::Relaxed) {
315                            let current = recovery_allowance.load(Ordering::Relaxed);
316                            if current < 100 {
317                                let new_allowance = (current + 10).min(100);
318                                recovery_allowance.store(new_allowance, Ordering::Relaxed);
319                                tracing::debug!(
320                                    allowance = new_allowance,
321                                    "Recovery mode: increasing request allowance"
322                                );
323                            } else {
324                                recovery_mode.store(false, Ordering::Relaxed);
325                                tracing::info!("Recovery mode complete - full capacity restored");
326                            }
327                        }
328                    }
329                }
330            }
331        });
332    }
333
334    /// Spawn background task to set health check flag periodically
335    fn spawn_health_check_task(
336        health_check_needed: Arc<AtomicBool>,
337        interval_secs: u64,
338        shutdown_signal: Arc<tokio::sync::Notify>,
339    ) {
340        tokio::spawn(async move {
341            let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
342            interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
343
344            loop {
345                tokio::select! {
346                    biased;
347
348                    _ = shutdown_signal.notified() => {
349                        tracing::debug!("Health check task received shutdown signal");
350                        break;
351                    }
352
353                    _ = interval.tick() => {
354                        health_check_needed.store(true, Ordering::Relaxed);
355                    }
356                }
357            }
358        });
359    }
360
361    /// Spawn multiple worker tasks to process queued requests concurrently
362    fn spawn_queue_workers(
363        rx: async_channel::Receiver<QueuedRequest>,
364        connection_pool: Arc<ConnectionPool>,
365        configured_workers: usize,
366        shutdown_signal: Arc<tokio::sync::Notify>,
367    ) {
368        let num_workers = if configured_workers > 0 {
369            configured_workers
370        } else {
371            std::thread::available_parallelism()
372                .map(|n| n.get().clamp(4, 32))
373                .unwrap_or(8)
374        };
375
376        tracing::info!(num_workers = num_workers, "Starting request queue workers");
377
378        for worker_id in 0..num_workers {
379            let rx_clone = rx.clone();
380            let pool_clone = connection_pool.clone();
381            let shutdown = shutdown_signal.clone();
382
383            tokio::spawn(async move {
384                loop {
385                    tokio::select! {
386                        biased;
387
388                        _ = shutdown.notified() => {
389                            tracing::debug!(worker_id = worker_id, "Request queue worker received shutdown signal");
390                            break;
391                        }
392
393                        request_result = rx_clone.recv() => {
394                            let request = match request_result {
395                                Ok(r) => r,
396                                Err(_) => break,
397                            };
398
399                            let start = std::time::Instant::now();
400                            let plugin_id = request.plugin_id.clone();
401
402                            let result = Self::execute_plugin_internal(
403                                &pool_clone,
404                                request.plugin_id,
405                                request.compiled_code,
406                                request.plugin_path,
407                                request.params,
408                                request.headers,
409                                request.socket_path,
410                                request.http_request_id,
411                                request.timeout_secs,
412                                request.route,
413                                request.config,
414                                request.method,
415                                request.query,
416                            )
417                            .await;
418
419                            let elapsed = start.elapsed();
420                            if let Err(ref e) = result {
421                                let error_str = format!("{e:?}");
422                                if error_str.contains("shutdown") || error_str.contains("Shutdown") {
423                                    tracing::debug!(
424                                        worker_id = worker_id,
425                                        plugin_id = %plugin_id,
426                                        "Plugin execution cancelled during shutdown"
427                                    );
428                                } else {
429                                    tracing::warn!(
430                                        worker_id = worker_id,
431                                        plugin_id = %plugin_id,
432                                        elapsed_ms = elapsed.as_millis() as u64,
433                                        error = ?e,
434                                        "Plugin execution failed"
435                                    );
436                                }
437                            } else if elapsed.as_secs() > 1 {
438                                tracing::debug!(
439                                    worker_id = worker_id,
440                                    plugin_id = %plugin_id,
441                                    elapsed_ms = elapsed.as_millis() as u64,
442                                    "Slow plugin execution"
443                                );
444                            }
445
446                            let _ = request.response_tx.send(result);
447                        }
448                    }
449                }
450
451                tracing::debug!(worker_id = worker_id, "Request queue worker exited");
452            });
453        }
454    }
455
456    /// Spawn a rate-limited stderr reader to prevent log flooding
457    fn spawn_rate_limited_stderr_reader(stderr: tokio::process::ChildStderr) {
458        tokio::spawn(async move {
459            let reader = BufReader::new(stderr);
460            let mut lines = reader.lines();
461
462            let mut last_log_time = std::time::Instant::now();
463            let mut suppressed_count = 0u64;
464            let min_interval = Duration::from_millis(100);
465
466            while let Ok(Some(line)) = lines.next_line().await {
467                let now = std::time::Instant::now();
468                let elapsed = now.duration_since(last_log_time);
469
470                if elapsed >= min_interval {
471                    if suppressed_count > 0 {
472                        tracing::warn!(
473                            target: "pool_server",
474                            suppressed = suppressed_count,
475                            "... ({} lines suppressed due to rate limiting)",
476                            suppressed_count
477                        );
478                        suppressed_count = 0;
479                    }
480                    tracing::error!(target: "pool_server", "{}", line);
481                    last_log_time = now;
482                } else {
483                    suppressed_count += 1;
484                    if suppressed_count % 100 == 0 {
485                        tracing::warn!(
486                            target: "pool_server",
487                            suppressed = suppressed_count,
488                            "Pool server producing excessive stderr output"
489                        );
490                    }
491                }
492            }
493
494            if suppressed_count > 0 {
495                tracing::warn!(
496                    target: "pool_server",
497                    suppressed = suppressed_count,
498                    "Pool server stderr closed ({} final lines suppressed)",
499                    suppressed_count
500                );
501            }
502        });
503    }
504
505    /// Execute plugin with optional pre-acquired permit (unified fast/slow path)
506    #[allow(clippy::too_many_arguments)]
507    async fn execute_with_permit(
508        connection_pool: &Arc<ConnectionPool>,
509        permit: Option<tokio::sync::OwnedSemaphorePermit>,
510        plugin_id: String,
511        compiled_code: Option<String>,
512        plugin_path: Option<String>,
513        params: serde_json::Value,
514        headers: Option<HashMap<String, Vec<String>>>,
515        socket_path: String,
516        http_request_id: Option<String>,
517        timeout_secs: Option<u64>,
518        route: Option<String>,
519        config: Option<serde_json::Value>,
520        method: Option<String>,
521        query: Option<serde_json::Value>,
522    ) -> Result<ScriptResult, PluginError> {
523        let mut conn = connection_pool.acquire_with_permit(permit).await?;
524
525        let request = PoolRequest::Execute(Box::new(super::protocol::ExecuteRequest {
526            task_id: Uuid::new_v4().to_string(),
527            plugin_id: plugin_id.clone(),
528            compiled_code,
529            plugin_path,
530            params,
531            headers,
532            socket_path,
533            http_request_id,
534            timeout: timeout_secs.map(|s| s * 1000),
535            route,
536            config,
537            method,
538            query,
539        }));
540
541        let timeout = timeout_secs.unwrap_or(get_config().pool_request_timeout_secs);
542        let response = conn.send_request_with_timeout(&request, timeout).await?;
543
544        // Use extracted parsing function for cleaner code and testability
545        Self::parse_pool_response(response)
546    }
547
548    /// Internal execution method (wrapper for execute_with_permit)
549    #[allow(clippy::too_many_arguments)]
550    async fn execute_plugin_internal(
551        connection_pool: &Arc<ConnectionPool>,
552        plugin_id: String,
553        compiled_code: Option<String>,
554        plugin_path: Option<String>,
555        params: serde_json::Value,
556        headers: Option<HashMap<String, Vec<String>>>,
557        socket_path: String,
558        http_request_id: Option<String>,
559        timeout_secs: Option<u64>,
560        route: Option<String>,
561        config: Option<serde_json::Value>,
562        method: Option<String>,
563        query: Option<serde_json::Value>,
564    ) -> Result<ScriptResult, PluginError> {
565        Self::execute_with_permit(
566            connection_pool,
567            None,
568            plugin_id,
569            compiled_code,
570            plugin_path,
571            params,
572            headers,
573            socket_path,
574            http_request_id,
575            timeout_secs,
576            route,
577            config,
578            method,
579            query,
580        )
581        .await
582    }
583
584    /// Check if the pool manager has been initialized.
585    ///
586    /// This is useful for health checks to determine if the plugin pool
587    /// is expected to be running.
588    pub async fn is_initialized(&self) -> bool {
589        self.initialized.load(Ordering::Acquire)
590    }
591
592    /// Start the pool server if not already running
593    pub async fn ensure_started(&self) -> Result<(), PluginError> {
594        if self.initialized.load(Ordering::Acquire) {
595            return Ok(());
596        }
597
598        let _startup_guard = self.restart_lock.lock().await;
599
600        if self.initialized.load(Ordering::Acquire) {
601            return Ok(());
602        }
603
604        self.start_pool_server().await?;
605        self.initialized.store(true, Ordering::Release);
606        Ok(())
607    }
608
609    /// Ensure pool is started and healthy, with auto-recovery on failure
610    async fn ensure_started_and_healthy(&self) -> Result<(), PluginError> {
611        self.ensure_started().await?;
612
613        if !self.health_check_needed.load(Ordering::Relaxed) {
614            return Ok(());
615        }
616
617        if self
618            .health_check_needed
619            .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed)
620            .is_err()
621        {
622            return Ok(());
623        }
624
625        self.check_and_restart_if_needed().await
626    }
627
628    /// Check process status and restart if needed
629    async fn check_and_restart_if_needed(&self) -> Result<(), PluginError> {
630        // Check process status without holding restart lock
631        let process_status = {
632            let mut process_guard = self.process.lock().await;
633            if let Some(child) = process_guard.as_mut() {
634                match child.try_wait() {
635                    Ok(Some(exit_status)) => {
636                        tracing::warn!(
637                            exit_status = ?exit_status,
638                            "Pool server process has exited"
639                        );
640                        *process_guard = None;
641                        ProcessStatus::Exited
642                    }
643                    Ok(None) => ProcessStatus::Running,
644                    Err(e) => {
645                        tracing::warn!(
646                            error = %e,
647                            "Failed to check pool server process status, assuming dead"
648                        );
649                        *process_guard = None;
650                        ProcessStatus::Unknown
651                    }
652                }
653            } else {
654                ProcessStatus::NoProcess
655            }
656        };
657
658        // Determine if restart is needed
659        let needs_restart = match process_status {
660            ProcessStatus::Running => {
661                let socket_exists = std::path::Path::new(&self.socket_path).exists();
662                if !socket_exists {
663                    tracing::warn!(
664                        socket_path = %self.socket_path,
665                        "Pool server socket file missing, needs restart"
666                    );
667                    true
668                } else {
669                    false
670                }
671            }
672            ProcessStatus::Exited | ProcessStatus::Unknown | ProcessStatus::NoProcess => {
673                tracing::warn!("Pool server not running, needs restart");
674                true
675            }
676        };
677
678        // Only acquire restart lock if restart is actually needed
679        if needs_restart {
680            let _restart_guard = self.restart_lock.lock().await;
681            self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
682            self.restart_internal().await?;
683            self.consecutive_failures.store(0, Ordering::Relaxed);
684        }
685
686        Ok(())
687    }
688
689    /// Clean up socket file with retry logic
690    async fn cleanup_socket_file(socket_path: &str) {
691        let max_cleanup_attempts = 5;
692        let mut attempts = 0;
693
694        while attempts < max_cleanup_attempts {
695            match std::fs::remove_file(socket_path) {
696                Ok(_) => break,
697                Err(e) if e.kind() == std::io::ErrorKind::NotFound => break,
698                Err(e) => {
699                    attempts += 1;
700                    if attempts >= max_cleanup_attempts {
701                        tracing::warn!(
702                            socket_path = %socket_path,
703                            error = %e,
704                            "Failed to remove socket file after {} attempts, proceeding anyway",
705                            max_cleanup_attempts
706                        );
707                        break;
708                    }
709                    let delay_ms = 10 * (1 << attempts.min(3));
710                    tokio::time::sleep(Duration::from_millis(delay_ms)).await;
711                }
712            }
713        }
714
715        tokio::time::sleep(Duration::from_millis(50)).await;
716    }
717
718    /// Spawn the pool server process with proper configuration
719    async fn spawn_pool_server_process(
720        socket_path: &str,
721        context: &str,
722    ) -> Result<Child, PluginError> {
723        let pool_server_path = std::env::current_dir()
724            .map(|cwd| cwd.join("plugins/lib/pool-server.ts").display().to_string())
725            .unwrap_or_else(|_| "plugins/lib/pool-server.ts".to_string());
726
727        let config = get_config();
728
729        // Use extracted function for heap calculation
730        let pool_server_heap_mb = Self::calculate_heap_size(config.max_concurrency);
731
732        // Log warning if heap was capped (for observability)
733        let uncapped_heap = Self::BASE_HEAP_MB
734            + ((config.max_concurrency / Self::CONCURRENCY_DIVISOR)
735                * Self::HEAP_INCREMENT_PER_DIVISOR_MB);
736        if uncapped_heap > Self::MAX_HEAP_MB {
737            tracing::warn!(
738                calculated_heap_mb = uncapped_heap,
739                capped_heap_mb = pool_server_heap_mb,
740                max_concurrency = config.max_concurrency,
741                "Pool server heap calculation exceeded 8GB cap"
742            );
743        }
744
745        tracing::info!(
746            socket_path = %socket_path,
747            heap_mb = pool_server_heap_mb,
748            max_concurrency = config.max_concurrency,
749            context = context,
750            "Spawning plugin pool server"
751        );
752
753        let node_options = format!("--max-old-space-size={pool_server_heap_mb} --expose-gc");
754
755        let mut child = Command::new("ts-node")
756            .arg("--transpile-only")
757            .arg(&pool_server_path)
758            .arg(socket_path)
759            .env("NODE_OPTIONS", node_options)
760            .env("PLUGIN_MAX_CONCURRENCY", config.max_concurrency.to_string())
761            .env(
762                "PLUGIN_POOL_MIN_THREADS",
763                config.nodejs_pool_min_threads.to_string(),
764            )
765            .env(
766                "PLUGIN_POOL_MAX_THREADS",
767                config.nodejs_pool_max_threads.to_string(),
768            )
769            .env(
770                "PLUGIN_POOL_CONCURRENT_TASKS",
771                config.nodejs_pool_concurrent_tasks.to_string(),
772            )
773            .env(
774                "PLUGIN_POOL_IDLE_TIMEOUT",
775                config.nodejs_pool_idle_timeout_ms.to_string(),
776            )
777            .env(
778                "PLUGIN_WORKER_HEAP_MB",
779                config.nodejs_worker_heap_mb.to_string(),
780            )
781            .env(
782                "PLUGIN_POOL_SOCKET_BACKLOG",
783                config.pool_socket_backlog.to_string(),
784            )
785            .stdin(Stdio::null())
786            .stdout(Stdio::piped())
787            .stderr(Stdio::piped())
788            .spawn()
789            .map_err(|e| {
790                PluginError::PluginExecutionError(format!("Failed to {context} pool server: {e}"))
791            })?;
792
793        if let Some(stderr) = child.stderr.take() {
794            Self::spawn_rate_limited_stderr_reader(stderr);
795        }
796
797        if let Some(stdout) = child.stdout.take() {
798            let reader = BufReader::new(stdout);
799            let mut lines = reader.lines();
800
801            let timeout_result = tokio::time::timeout(Duration::from_secs(10), async {
802                while let Ok(Some(line)) = lines.next_line().await {
803                    if line.contains("POOL_SERVER_READY") {
804                        return Ok(());
805                    }
806                }
807                Err(PluginError::PluginExecutionError(
808                    "Pool server did not send ready signal".to_string(),
809                ))
810            })
811            .await;
812
813            match timeout_result {
814                Ok(Ok(())) => {
815                    tracing::info!(context = context, "Plugin pool server ready");
816                }
817                Ok(Err(e)) => return Err(e),
818                Err(_) => {
819                    return Err(PluginError::PluginExecutionError(format!(
820                        "Timeout waiting for pool server to {context}"
821                    )))
822                }
823            }
824        }
825
826        Ok(child)
827    }
828
829    async fn start_pool_server(&self) -> Result<(), PluginError> {
830        let mut process_guard = self.process.lock().await;
831
832        if process_guard.is_some() {
833            return Ok(());
834        }
835
836        Self::cleanup_socket_file(&self.socket_path).await;
837
838        let child = Self::spawn_pool_server_process(&self.socket_path, "start").await?;
839
840        *process_guard = Some(child);
841        Ok(())
842    }
843
844    /// Execute a plugin via the pool
845    #[allow(clippy::too_many_arguments)]
846    pub async fn execute_plugin(
847        &self,
848        plugin_id: String,
849        compiled_code: Option<String>,
850        plugin_path: Option<String>,
851        params: serde_json::Value,
852        headers: Option<HashMap<String, Vec<String>>>,
853        socket_path: String,
854        http_request_id: Option<String>,
855        timeout_secs: Option<u64>,
856        route: Option<String>,
857        config: Option<serde_json::Value>,
858        method: Option<String>,
859        query: Option<serde_json::Value>,
860    ) -> Result<ScriptResult, PluginError> {
861        let rid = http_request_id.as_deref().unwrap_or("unknown");
862        let effective_timeout =
863            timeout_secs.unwrap_or_else(|| get_config().pool_request_timeout_secs);
864        tracing::debug!(
865            plugin_id = %plugin_id,
866            http_request_id = %rid,
867            timeout_secs = effective_timeout,
868            "Pool execute request received"
869        );
870        let recovery_allowance = if self.recovery_mode.load(Ordering::Relaxed) {
871            Some(self.recovery_allowance.load(Ordering::Relaxed))
872        } else {
873            None
874        };
875
876        if !self
877            .circuit_breaker
878            .should_allow_request(recovery_allowance)
879        {
880            let state = self.circuit_breaker.state();
881            tracing::warn!(
882                plugin_id = %plugin_id,
883                circuit_state = ?state,
884                recovery_allowance = ?recovery_allowance,
885                "Request rejected by circuit breaker"
886            );
887            return Err(PluginError::PluginExecutionError(
888                "Plugin system temporarily unavailable due to high load. Please retry shortly."
889                    .to_string(),
890            ));
891        }
892
893        let start_time = Instant::now();
894
895        self.ensure_started_and_healthy().await?;
896        tracing::debug!(
897            plugin_id = %plugin_id,
898            http_request_id = %rid,
899            "Pool execute start (healthy/started)"
900        );
901
902        let circuit_breaker = self.circuit_breaker.clone();
903        match self.connection_pool.semaphore.clone().try_acquire_owned() {
904            Ok(permit) => {
905                tracing::debug!(
906                    plugin_id = %plugin_id,
907                    http_request_id = %rid,
908                    "Pool execute acquired connection permit (fast path)"
909                );
910                let result = Self::execute_with_permit(
911                    &self.connection_pool,
912                    Some(permit),
913                    plugin_id,
914                    compiled_code,
915                    plugin_path,
916                    params,
917                    headers,
918                    socket_path,
919                    http_request_id,
920                    timeout_secs,
921                    route,
922                    config,
923                    method,
924                    query,
925                )
926                .await;
927
928                let elapsed_ms = start_time.elapsed().as_millis() as u32;
929                match &result {
930                    Ok(_) => circuit_breaker.record_success(elapsed_ms),
931                    Err(e) => {
932                        // Only count infrastructure errors for circuit breaker, not business errors
933                        // Business errors (RPC failures, plugin logic errors) mean the pool is healthy
934                        if Self::is_dead_server_error(e) {
935                            circuit_breaker.record_failure();
936                            tracing::warn!(
937                                error = %e,
938                                "Detected dead pool server error, triggering health check for restart"
939                            );
940                            self.health_check_needed.store(true, Ordering::Relaxed);
941                        } else {
942                            // Plugin executed but returned error - infrastructure is healthy
943                            circuit_breaker.record_success(elapsed_ms);
944                        }
945                    }
946                }
947
948                tracing::debug!(
949                    elapsed_ms = elapsed_ms,
950                    result_ok = result.is_ok(),
951                    "Pool execute finished (fast path)"
952                );
953                result
954            }
955            Err(_) => {
956                tracing::debug!(
957                    plugin_id = %plugin_id,
958                    http_request_id = %rid,
959                    "Pool execute queueing (no permits)"
960                );
961                let (response_tx, response_rx) = oneshot::channel();
962
963                let queued_request = QueuedRequest {
964                    plugin_id,
965                    compiled_code,
966                    plugin_path,
967                    params,
968                    headers,
969                    socket_path,
970                    http_request_id,
971                    timeout_secs,
972                    route,
973                    config,
974                    method,
975                    query,
976                    response_tx,
977                };
978
979                let result = match self.request_tx.try_send(queued_request) {
980                    Ok(()) => {
981                        let queue_len = self.request_tx.len();
982                        if queue_len > self.max_queue_size / 2 {
983                            tracing::warn!(
984                                queue_len = queue_len,
985                                max_queue_size = self.max_queue_size,
986                                "Plugin queue is over 50% capacity"
987                            );
988                        }
989                        // Add timeout to response_rx to prevent hung requests if worker crashes
990                        let response_timeout = timeout_secs
991                            .map(Duration::from_secs)
992                            .unwrap_or(Duration::from_secs(get_config().pool_request_timeout_secs))
993                            + Duration::from_secs(5); // Add 5s buffer for queue processing
994
995                        match tokio::time::timeout(response_timeout, response_rx).await {
996                            Ok(Ok(result)) => result,
997                            Ok(Err(_)) => Err(PluginError::PluginExecutionError(
998                                "Request queue processor closed".to_string(),
999                            )),
1000                            Err(_) => Err(PluginError::PluginExecutionError(format!(
1001                                "Request timed out after {}s waiting for worker response",
1002                                response_timeout.as_secs()
1003                            ))),
1004                        }
1005                    }
1006                    Err(async_channel::TrySendError::Full(req)) => {
1007                        let queue_timeout_ms = get_config().pool_queue_send_timeout_ms;
1008                        let queue_timeout = Duration::from_millis(queue_timeout_ms);
1009                        match tokio::time::timeout(queue_timeout, self.request_tx.send(req)).await {
1010                            Ok(Ok(())) => {
1011                                let queue_len = self.request_tx.len();
1012                                tracing::debug!(
1013                                    queue_len = queue_len,
1014                                    "Request queued after waiting for queue space"
1015                                );
1016                                // Add timeout to response_rx to prevent hung requests if worker crashes
1017                                let response_timeout =
1018                                    timeout_secs.map(Duration::from_secs).unwrap_or(
1019                                        Duration::from_secs(get_config().pool_request_timeout_secs),
1020                                    ) + Duration::from_secs(5); // Add 5s buffer for queue processing
1021
1022                                match tokio::time::timeout(response_timeout, response_rx).await {
1023                                    Ok(Ok(result)) => result,
1024                                    Ok(Err(_)) => Err(PluginError::PluginExecutionError(
1025                                        "Request queue processor closed".to_string(),
1026                                    )),
1027                                    Err(_) => Err(PluginError::PluginExecutionError(format!(
1028                                        "Request timed out after {}s waiting for worker response",
1029                                        response_timeout.as_secs()
1030                                    ))),
1031                                }
1032                            }
1033                            Ok(Err(async_channel::SendError(_))) => {
1034                                Err(PluginError::PluginExecutionError(
1035                                    "Plugin execution queue is closed".to_string(),
1036                                ))
1037                            }
1038                            Err(_) => {
1039                                let queue_len = self.request_tx.len();
1040                                tracing::error!(
1041                                    queue_len = queue_len,
1042                                    max_queue_size = self.max_queue_size,
1043                                    timeout_ms = queue_timeout.as_millis(),
1044                                    "Plugin execution queue is FULL - timeout waiting for space"
1045                                );
1046                                Err(PluginError::PluginExecutionError(format!(
1047                                    "Plugin execution queue is full (max: {}) and timeout waiting for space. \
1048                                    Consider increasing PLUGIN_POOL_MAX_QUEUE_SIZE or PLUGIN_POOL_MAX_CONNECTIONS.",
1049                                    self.max_queue_size
1050                                )))
1051                            }
1052                        }
1053                    }
1054                    Err(async_channel::TrySendError::Closed(_)) => {
1055                        Err(PluginError::PluginExecutionError(
1056                            "Plugin execution queue is closed".to_string(),
1057                        ))
1058                    }
1059                };
1060
1061                let elapsed_ms = start_time.elapsed().as_millis() as u32;
1062                match &result {
1063                    Ok(_) => circuit_breaker.record_success(elapsed_ms),
1064                    Err(e) => {
1065                        // Only count infrastructure errors for circuit breaker, not business errors
1066                        if Self::is_dead_server_error(e) {
1067                            circuit_breaker.record_failure();
1068                            tracing::warn!(
1069                                error = %e,
1070                                "Detected dead pool server error (queued path), triggering health check for restart"
1071                            );
1072                            self.health_check_needed.store(true, Ordering::Relaxed);
1073                        } else {
1074                            // Plugin executed but returned error - infrastructure is healthy
1075                            circuit_breaker.record_success(elapsed_ms);
1076                        }
1077                    }
1078                }
1079
1080                tracing::debug!(
1081                    elapsed_ms = elapsed_ms,
1082                    result_ok = result.is_ok(),
1083                    "Pool execute finished (queued path)"
1084                );
1085                result
1086            }
1087        }
1088    }
1089
1090    /// Check if an error indicates the pool server is dead and needs restart
1091    pub fn is_dead_server_error(err: &PluginError) -> bool {
1092        let error_str = err.to_string();
1093        let lower = error_str.to_lowercase();
1094
1095        if lower.contains("handler timed out")
1096            || (lower.contains("plugin") && lower.contains("timed out"))
1097        {
1098            return false;
1099        }
1100
1101        DeadServerIndicator::from_error_str(&error_str).is_some()
1102    }
1103
1104    /// Precompile a plugin
1105    pub async fn precompile_plugin(
1106        &self,
1107        plugin_id: String,
1108        plugin_path: Option<String>,
1109        source_code: Option<String>,
1110    ) -> Result<String, PluginError> {
1111        self.ensure_started().await?;
1112
1113        let mut conn = self.connection_pool.acquire().await?;
1114
1115        let request = PoolRequest::Precompile {
1116            task_id: Uuid::new_v4().to_string(),
1117            plugin_id: plugin_id.clone(),
1118            plugin_path,
1119            source_code,
1120        };
1121
1122        let response = conn
1123            .send_request_with_timeout(&request, get_config().pool_request_timeout_secs)
1124            .await?;
1125
1126        if response.success {
1127            response
1128                .result
1129                .and_then(|v| {
1130                    v.get("code")
1131                        .and_then(|c| c.as_str())
1132                        .map(|s| s.to_string())
1133                })
1134                .ok_or_else(|| {
1135                    PluginError::PluginExecutionError("No compiled code in response".to_string())
1136                })
1137        } else {
1138            let error = response.error.unwrap_or(PoolError {
1139                message: "Compilation failed".to_string(),
1140                code: None,
1141                status: None,
1142                details: None,
1143            });
1144            Err(PluginError::PluginExecutionError(error.message))
1145        }
1146    }
1147
1148    /// Cache compiled code in the pool
1149    pub async fn cache_compiled_code(
1150        &self,
1151        plugin_id: String,
1152        compiled_code: String,
1153    ) -> Result<(), PluginError> {
1154        self.ensure_started().await?;
1155
1156        let mut conn = self.connection_pool.acquire().await?;
1157
1158        let request = PoolRequest::Cache {
1159            task_id: Uuid::new_v4().to_string(),
1160            plugin_id: plugin_id.clone(),
1161            compiled_code,
1162        };
1163
1164        let response = conn
1165            .send_request_with_timeout(&request, get_config().pool_request_timeout_secs)
1166            .await?;
1167
1168        if response.success {
1169            Ok(())
1170        } else {
1171            let error = response.error.unwrap_or(PoolError {
1172                message: "Cache failed".to_string(),
1173                code: None,
1174                status: None,
1175                details: None,
1176            });
1177            Err(PluginError::PluginError(error.message))
1178        }
1179    }
1180
1181    /// Invalidate a cached plugin
1182    pub async fn invalidate_plugin(&self, plugin_id: String) -> Result<(), PluginError> {
1183        if !self.initialized.load(Ordering::Acquire) {
1184            return Ok(());
1185        }
1186
1187        let mut conn = self.connection_pool.acquire().await?;
1188
1189        let request = PoolRequest::Invalidate {
1190            task_id: Uuid::new_v4().to_string(),
1191            plugin_id,
1192        };
1193
1194        let _ = conn
1195            .send_request_with_timeout(&request, get_config().pool_request_timeout_secs)
1196            .await?;
1197        Ok(())
1198    }
1199
1200    /// Health check - verify the pool server is responding
1201    /// Collect socket connection statistics
1202    async fn collect_socket_stats(
1203        &self,
1204    ) -> (
1205        Option<usize>,
1206        Option<usize>,
1207        Option<usize>,
1208        Option<usize>,
1209        Option<usize>,
1210    ) {
1211        // Collect shared socket stats
1212        let (shared_available, shared_active, shared_executions) = match get_shared_socket_service()
1213        {
1214            Ok(service) => {
1215                let available = service.available_connection_slots();
1216                let active = service.active_connection_count();
1217                let executions = service.registered_executions_count().await;
1218                (Some(available), Some(active), Some(executions))
1219            }
1220            Err(_) => (None, None, None),
1221        };
1222
1223        // Collect connection pool stats (for pool server connections)
1224        let pool_available = self.connection_pool.semaphore.available_permits();
1225        let pool_max = get_config().pool_max_connections;
1226        let pool_active = pool_max.saturating_sub(pool_available);
1227
1228        (
1229            shared_available,
1230            shared_active,
1231            shared_executions,
1232            Some(pool_available),
1233            Some(pool_active),
1234        )
1235    }
1236
1237    pub async fn health_check(&self) -> Result<HealthStatus, PluginError> {
1238        let circuit_info = || {
1239            let state = match self.circuit_breaker.state() {
1240                CircuitState::Closed => "closed",
1241                CircuitState::HalfOpen => "half_open",
1242                CircuitState::Open => "open",
1243            };
1244            (
1245                Some(state.to_string()),
1246                Some(self.circuit_breaker.avg_response_time()),
1247                Some(self.recovery_mode.load(Ordering::Relaxed)),
1248                Some(self.recovery_allowance.load(Ordering::Relaxed)),
1249            )
1250        };
1251
1252        let socket_stats = self.collect_socket_stats().await;
1253
1254        if !self.initialized.load(Ordering::Acquire) {
1255            let (circuit_state, avg_rt, recovering, recovery_pct) = circuit_info();
1256            let (shared_available, shared_active, shared_executions, pool_available, pool_active) =
1257                socket_stats;
1258            return Ok(HealthStatus {
1259                healthy: false,
1260                status: "not_initialized".to_string(),
1261                uptime_ms: None,
1262                memory: None,
1263                pool_completed: None,
1264                pool_queued: None,
1265                success_rate: None,
1266                circuit_state,
1267                avg_response_time_ms: avg_rt,
1268                recovering,
1269                recovery_percent: recovery_pct,
1270                shared_socket_available_slots: shared_available,
1271                shared_socket_active_connections: shared_active,
1272                shared_socket_registered_executions: shared_executions,
1273                connection_pool_available_slots: pool_available,
1274                connection_pool_active_connections: pool_active,
1275            });
1276        }
1277
1278        if !std::path::Path::new(&self.socket_path).exists() {
1279            let (circuit_state, avg_rt, recovering, recovery_pct) = circuit_info();
1280            let (shared_available, shared_active, shared_executions, pool_available, pool_active) =
1281                socket_stats;
1282            return Ok(HealthStatus {
1283                healthy: false,
1284                status: "socket_missing".to_string(),
1285                uptime_ms: None,
1286                memory: None,
1287                pool_completed: None,
1288                pool_queued: None,
1289                success_rate: None,
1290                circuit_state,
1291                avg_response_time_ms: avg_rt,
1292                recovering,
1293                recovery_percent: recovery_pct,
1294                shared_socket_available_slots: shared_available,
1295                shared_socket_active_connections: shared_active,
1296                shared_socket_registered_executions: shared_executions,
1297                connection_pool_available_slots: pool_available,
1298                connection_pool_active_connections: pool_active,
1299            });
1300        }
1301
1302        let mut conn =
1303            match tokio::time::timeout(Duration::from_millis(100), self.connection_pool.acquire())
1304                .await
1305            {
1306                Ok(Ok(c)) => c,
1307                Ok(Err(e)) => {
1308                    let err_str = e.to_string();
1309                    let is_pool_exhausted =
1310                        err_str.contains("semaphore") || err_str.contains("Connection refused");
1311
1312                    // Try to check process status without blocking on lock
1313                    let process_status = match self.process.try_lock() {
1314                        Ok(guard) => {
1315                            if let Some(child) = guard.as_ref() {
1316                                format!("process_pid_{}", child.id().unwrap_or(0))
1317                            } else {
1318                                "no_process".to_string()
1319                            }
1320                        }
1321                        Err(_) => "process_lock_busy".to_string(),
1322                    };
1323
1324                    let (circuit_state, avg_rt, recovering, recovery_pct) = circuit_info();
1325                    let (
1326                        shared_available,
1327                        shared_active,
1328                        shared_executions,
1329                        pool_available,
1330                        pool_active,
1331                    ) = socket_stats;
1332                    return Ok(HealthStatus {
1333                        healthy: is_pool_exhausted,
1334                        status: if is_pool_exhausted {
1335                            format!("pool_exhausted: {e} ({process_status})")
1336                        } else {
1337                            format!("connection_failed: {e} ({process_status})")
1338                        },
1339                        uptime_ms: None,
1340                        memory: None,
1341                        pool_completed: None,
1342                        pool_queued: None,
1343                        success_rate: None,
1344                        circuit_state,
1345                        avg_response_time_ms: avg_rt,
1346                        recovering,
1347                        recovery_percent: recovery_pct,
1348                        shared_socket_available_slots: shared_available,
1349                        shared_socket_active_connections: shared_active,
1350                        shared_socket_registered_executions: shared_executions,
1351                        connection_pool_available_slots: pool_available,
1352                        connection_pool_active_connections: pool_active,
1353                    });
1354                }
1355                Err(_) => {
1356                    let (circuit_state, avg_rt, recovering, recovery_pct) = circuit_info();
1357                    let (
1358                        shared_available,
1359                        shared_active,
1360                        shared_executions,
1361                        pool_available,
1362                        pool_active,
1363                    ) = socket_stats;
1364                    return Ok(HealthStatus {
1365                        healthy: true,
1366                        status: "pool_busy".to_string(),
1367                        uptime_ms: None,
1368                        memory: None,
1369                        pool_completed: None,
1370                        pool_queued: None,
1371                        success_rate: None,
1372                        circuit_state,
1373                        avg_response_time_ms: avg_rt,
1374                        recovering,
1375                        recovery_percent: recovery_pct,
1376                        shared_socket_available_slots: shared_available,
1377                        shared_socket_active_connections: shared_active,
1378                        shared_socket_registered_executions: shared_executions,
1379                        connection_pool_available_slots: pool_available,
1380                        connection_pool_active_connections: pool_active,
1381                    });
1382                }
1383            };
1384
1385        let request = PoolRequest::Health {
1386            task_id: Uuid::new_v4().to_string(),
1387        };
1388
1389        let (circuit_state, avg_rt, recovering, recovery_pct) = circuit_info();
1390
1391        match conn.send_request_with_timeout(&request, 5).await {
1392            Ok(response) => {
1393                if response.success {
1394                    let result = response.result.unwrap_or_default();
1395                    // Use extracted parsing function for testability
1396                    let parsed = Self::parse_health_result(&result);
1397
1398                    {
1399                        let (
1400                            shared_available,
1401                            shared_active,
1402                            shared_executions,
1403                            pool_available,
1404                            pool_active,
1405                        ) = socket_stats;
1406                        Ok(HealthStatus {
1407                            healthy: true,
1408                            status: parsed.status,
1409                            uptime_ms: parsed.uptime_ms,
1410                            memory: parsed.memory,
1411                            pool_completed: parsed.pool_completed,
1412                            pool_queued: parsed.pool_queued,
1413                            success_rate: parsed.success_rate,
1414                            circuit_state,
1415                            avg_response_time_ms: avg_rt,
1416                            recovering,
1417                            recovery_percent: recovery_pct,
1418                            shared_socket_available_slots: shared_available,
1419                            shared_socket_active_connections: shared_active,
1420                            shared_socket_registered_executions: shared_executions,
1421                            connection_pool_available_slots: pool_available,
1422                            connection_pool_active_connections: pool_active,
1423                        })
1424                    }
1425                } else {
1426                    let (
1427                        shared_available,
1428                        shared_active,
1429                        shared_executions,
1430                        pool_available,
1431                        pool_active,
1432                    ) = socket_stats;
1433                    Ok(HealthStatus {
1434                        healthy: false,
1435                        status: response
1436                            .error
1437                            .map(|e| e.message)
1438                            .unwrap_or_else(|| "unknown_error".to_string()),
1439                        uptime_ms: None,
1440                        memory: None,
1441                        pool_completed: None,
1442                        pool_queued: None,
1443                        success_rate: None,
1444                        circuit_state,
1445                        avg_response_time_ms: avg_rt,
1446                        recovering,
1447                        recovery_percent: recovery_pct,
1448                        shared_socket_available_slots: shared_available,
1449                        shared_socket_active_connections: shared_active,
1450                        shared_socket_registered_executions: shared_executions,
1451                        connection_pool_available_slots: pool_available,
1452                        connection_pool_active_connections: pool_active,
1453                    })
1454                }
1455            }
1456            Err(e) => {
1457                let (
1458                    shared_available,
1459                    shared_active,
1460                    shared_executions,
1461                    pool_available,
1462                    pool_active,
1463                ) = socket_stats;
1464                Ok(HealthStatus {
1465                    healthy: false,
1466                    status: format!("request_failed: {e}"),
1467                    uptime_ms: None,
1468                    memory: None,
1469                    pool_completed: None,
1470                    pool_queued: None,
1471                    success_rate: None,
1472                    circuit_state,
1473                    avg_response_time_ms: avg_rt,
1474                    recovering,
1475                    recovery_percent: recovery_pct,
1476                    shared_socket_available_slots: shared_available,
1477                    shared_socket_active_connections: shared_active,
1478                    shared_socket_registered_executions: shared_executions,
1479                    connection_pool_available_slots: pool_available,
1480                    connection_pool_active_connections: pool_active,
1481                })
1482            }
1483        }
1484    }
1485
1486    /// Check health and restart if unhealthy
1487    pub async fn ensure_healthy(&self) -> Result<bool, PluginError> {
1488        let health = self.health_check().await?;
1489
1490        if health.healthy {
1491            return Ok(true);
1492        }
1493
1494        match self.restart_lock.try_lock() {
1495            Ok(_guard) => {
1496                let health_recheck = self.health_check().await?;
1497                if health_recheck.healthy {
1498                    return Ok(true);
1499                }
1500
1501                tracing::warn!(status = %health.status, "Pool server unhealthy, attempting restart");
1502                self.restart_internal().await?;
1503            }
1504            Err(_) => {
1505                tracing::debug!("Waiting for another task to complete pool server restart");
1506                let _guard = self.restart_lock.lock().await;
1507            }
1508        }
1509
1510        let health_after = self.health_check().await?;
1511        Ok(health_after.healthy)
1512    }
1513
1514    /// Force restart the pool server (public API - acquires lock)
1515    pub async fn restart(&self) -> Result<(), PluginError> {
1516        let _guard = self.restart_lock.lock().await;
1517        self.restart_internal().await
1518    }
1519
1520    /// Internal restart without lock (must be called with restart_lock held)
1521    async fn restart_internal(&self) -> Result<(), PluginError> {
1522        tracing::info!("Restarting plugin pool server");
1523
1524        {
1525            let mut process_guard = self.process.lock().await;
1526            if let Some(mut child) = process_guard.take() {
1527                let _ = child.kill().await;
1528                tokio::time::sleep(Duration::from_millis(100)).await;
1529            }
1530        }
1531
1532        Self::cleanup_socket_file(&self.socket_path).await;
1533
1534        self.initialized.store(false, Ordering::Release);
1535
1536        let mut process_guard = self.process.lock().await;
1537        if process_guard.is_some() {
1538            return Ok(());
1539        }
1540
1541        let child = Self::spawn_pool_server_process(&self.socket_path, "restart").await?;
1542        *process_guard = Some(child);
1543
1544        self.initialized.store(true, Ordering::Release);
1545
1546        self.recovery_allowance.store(10, Ordering::Relaxed);
1547        self.recovery_mode.store(true, Ordering::Relaxed);
1548
1549        self.circuit_breaker.force_close();
1550
1551        let now = std::time::SystemTime::now()
1552            .duration_since(std::time::UNIX_EPOCH)
1553            .unwrap_or_default()
1554            .as_millis() as u64;
1555        self.last_restart_time_ms.store(now, Ordering::Relaxed);
1556
1557        tracing::info!("Recovery mode enabled - requests will gradually increase from 10%");
1558
1559        Ok(())
1560    }
1561
1562    /// Get current circuit breaker state for monitoring
1563    pub fn circuit_state(&self) -> CircuitState {
1564        self.circuit_breaker.state()
1565    }
1566
1567    /// Get average response time in ms (for monitoring)
1568    pub fn avg_response_time_ms(&self) -> u32 {
1569        self.circuit_breaker.avg_response_time()
1570    }
1571
1572    /// Check if currently in recovery mode
1573    pub fn is_recovering(&self) -> bool {
1574        self.recovery_mode.load(Ordering::Relaxed)
1575    }
1576
1577    /// Get current recovery allowance percentage (0-100)
1578    pub fn recovery_allowance_percent(&self) -> u32 {
1579        self.recovery_allowance.load(Ordering::Relaxed)
1580    }
1581
1582    /// Shutdown the pool server gracefully
1583    pub async fn shutdown(&self) -> Result<(), PluginError> {
1584        if !self.initialized.load(Ordering::Acquire) {
1585            return Ok(());
1586        }
1587
1588        tracing::info!("Initiating graceful shutdown of plugin pool server");
1589
1590        self.shutdown_signal.notify_waiters();
1591
1592        let shutdown_timeout = std::time::Duration::from_secs(35);
1593        let shutdown_result = self.send_shutdown_request(shutdown_timeout).await;
1594
1595        match &shutdown_result {
1596            Ok(response) => {
1597                tracing::info!(
1598                    response = ?response,
1599                    "Pool server acknowledged shutdown, waiting for graceful exit"
1600                );
1601            }
1602            Err(e) => {
1603                tracing::warn!(
1604                    error = %e,
1605                    "Failed to send shutdown request, will force kill"
1606                );
1607            }
1608        }
1609
1610        let mut process_guard = self.process.lock().await;
1611        if let Some(ref mut child) = *process_guard {
1612            let graceful_wait = std::time::Duration::from_secs(35);
1613            let start = std::time::Instant::now();
1614
1615            loop {
1616                match child.try_wait() {
1617                    Ok(Some(status)) => {
1618                        tracing::info!(
1619                            exit_status = ?status,
1620                            elapsed_ms = start.elapsed().as_millis(),
1621                            "Pool server exited gracefully"
1622                        );
1623                        break;
1624                    }
1625                    Ok(None) => {
1626                        if start.elapsed() >= graceful_wait {
1627                            tracing::warn!(
1628                                "Pool server did not exit within graceful timeout, force killing"
1629                            );
1630                            let _ = child.kill().await;
1631                            break;
1632                        }
1633                        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1634                    }
1635                    Err(e) => {
1636                        tracing::warn!(error = %e, "Error checking pool server status");
1637                        let _ = child.kill().await;
1638                        break;
1639                    }
1640                }
1641            }
1642        }
1643        *process_guard = None;
1644
1645        let _ = std::fs::remove_file(&self.socket_path);
1646
1647        self.initialized.store(false, Ordering::Release);
1648        tracing::info!("Plugin pool server shutdown complete");
1649        Ok(())
1650    }
1651
1652    /// Send shutdown request to the pool server
1653    async fn send_shutdown_request(
1654        &self,
1655        timeout: std::time::Duration,
1656    ) -> Result<PoolResponse, PluginError> {
1657        let request = PoolRequest::Shutdown {
1658            task_id: Uuid::new_v4().to_string(),
1659        };
1660
1661        // Use the pool's connection ID counter to ensure unique IDs
1662        // even for shutdown connections that bypass the pool
1663        let connection_id = self.connection_pool.next_connection_id();
1664        let mut conn = match PoolConnection::new(&self.socket_path, connection_id).await {
1665            Ok(c) => c,
1666            Err(e) => {
1667                return Err(PluginError::PluginExecutionError(format!(
1668                    "Failed to connect for shutdown: {e}"
1669                )));
1670            }
1671        };
1672
1673        conn.send_request_with_timeout(&request, timeout.as_secs())
1674            .await
1675    }
1676}
1677
1678impl Drop for PoolManager {
1679    fn drop(&mut self) {
1680        let _ = std::fs::remove_file(&self.socket_path);
1681    }
1682}
1683
1684/// Global pool manager instance
1685static POOL_MANAGER: std::sync::OnceLock<Arc<PoolManager>> = std::sync::OnceLock::new();
1686
1687/// Get or create the global pool manager
1688pub fn get_pool_manager() -> Arc<PoolManager> {
1689    POOL_MANAGER
1690        .get_or_init(|| Arc::new(PoolManager::new()))
1691        .clone()
1692}
1693
1694#[cfg(test)]
1695mod tests {
1696    use super::*;
1697    use crate::services::plugins::script_executor::LogLevel;
1698
1699    #[test]
1700    fn test_is_dead_server_error_detects_dead_server() {
1701        let err = PluginError::PluginExecutionError("Connection refused".to_string());
1702        assert!(PoolManager::is_dead_server_error(&err));
1703
1704        let err = PluginError::PluginExecutionError("Broken pipe".to_string());
1705        assert!(PoolManager::is_dead_server_error(&err));
1706    }
1707
1708    #[test]
1709    fn test_is_dead_server_error_excludes_plugin_timeouts() {
1710        let err = PluginError::PluginExecutionError("Plugin timed out after 30s".to_string());
1711        assert!(!PoolManager::is_dead_server_error(&err));
1712
1713        let err = PluginError::PluginExecutionError("Handler timed out".to_string());
1714        assert!(!PoolManager::is_dead_server_error(&err));
1715    }
1716
1717    #[test]
1718    fn test_is_dead_server_error_normal_errors() {
1719        let err =
1720            PluginError::PluginExecutionError("TypeError: undefined is not a function".to_string());
1721        assert!(!PoolManager::is_dead_server_error(&err));
1722
1723        let err = PluginError::PluginExecutionError("Plugin returned invalid JSON".to_string());
1724        assert!(!PoolManager::is_dead_server_error(&err));
1725    }
1726
1727    #[test]
1728    fn test_is_dead_server_error_detects_all_dead_server_indicators() {
1729        // Test common DeadServerIndicator patterns
1730        let dead_server_errors = vec![
1731            "EOF while parsing JSON response",
1732            "Broken pipe when writing to socket",
1733            "Connection refused: server not running",
1734            "Connection reset by peer",
1735            "Socket not connected",
1736            "Failed to connect to pool server",
1737            "Socket file missing: /tmp/test.sock",
1738            "No such file or directory",
1739        ];
1740
1741        for error_msg in dead_server_errors {
1742            let err = PluginError::PluginExecutionError(error_msg.to_string());
1743            assert!(
1744                PoolManager::is_dead_server_error(&err),
1745                "Expected '{}' to be detected as dead server error",
1746                error_msg
1747            );
1748        }
1749    }
1750
1751    #[test]
1752    fn test_dead_server_indicator_patterns() {
1753        // Test the DeadServerIndicator pattern matching directly
1754        use super::super::health::DeadServerIndicator;
1755
1756        // These should all match
1757        assert!(DeadServerIndicator::from_error_str("eof while parsing").is_some());
1758        assert!(DeadServerIndicator::from_error_str("broken pipe").is_some());
1759        assert!(DeadServerIndicator::from_error_str("connection refused").is_some());
1760        assert!(DeadServerIndicator::from_error_str("connection reset").is_some());
1761        assert!(DeadServerIndicator::from_error_str("not connected").is_some());
1762        assert!(DeadServerIndicator::from_error_str("failed to connect").is_some());
1763        assert!(DeadServerIndicator::from_error_str("socket file missing").is_some());
1764        assert!(DeadServerIndicator::from_error_str("no such file").is_some());
1765        assert!(DeadServerIndicator::from_error_str("connection timed out").is_some());
1766        assert!(DeadServerIndicator::from_error_str("connect timed out").is_some());
1767
1768        // These should NOT match
1769        assert!(DeadServerIndicator::from_error_str("handler timed out").is_none());
1770        assert!(DeadServerIndicator::from_error_str("validation error").is_none());
1771        assert!(DeadServerIndicator::from_error_str("TypeError: undefined").is_none());
1772    }
1773
1774    #[test]
1775    fn test_is_dead_server_error_excludes_plugin_timeouts_with_connection() {
1776        // Plugin timeout should NOT be detected even if it mentions connection
1777        let plugin_timeout =
1778            PluginError::PluginExecutionError("plugin connection timed out".to_string());
1779        // This contains both "plugin" and "timed out" so it's excluded
1780        assert!(!PoolManager::is_dead_server_error(&plugin_timeout));
1781    }
1782
1783    #[test]
1784    fn test_is_dead_server_error_case_insensitive() {
1785        // Test case insensitivity
1786        let err = PluginError::PluginExecutionError("CONNECTION REFUSED".to_string());
1787        assert!(PoolManager::is_dead_server_error(&err));
1788
1789        let err = PluginError::PluginExecutionError("BROKEN PIPE".to_string());
1790        assert!(PoolManager::is_dead_server_error(&err));
1791
1792        let err = PluginError::PluginExecutionError("Connection Reset By Peer".to_string());
1793        assert!(PoolManager::is_dead_server_error(&err));
1794    }
1795
1796    #[test]
1797    fn test_is_dead_server_error_handler_timeout_variations() {
1798        // All variations of plugin/handler timeouts should NOT trigger restart
1799        let timeout_errors = vec![
1800            "Handler timed out",
1801            "handler timed out after 30000ms",
1802            "Plugin handler timed out",
1803            "plugin timed out",
1804            "Plugin execution timed out after 60s",
1805        ];
1806
1807        for error_msg in timeout_errors {
1808            let err = PluginError::PluginExecutionError(error_msg.to_string());
1809            assert!(
1810                !PoolManager::is_dead_server_error(&err),
1811                "Expected '{}' to NOT be detected as dead server error",
1812                error_msg
1813            );
1814        }
1815    }
1816
1817    #[test]
1818    fn test_is_dead_server_error_business_errors_not_detected() {
1819        // Business logic errors should not trigger restart
1820        let business_errors = vec![
1821            "ReferenceError: x is not defined",
1822            "SyntaxError: Unexpected token",
1823            "TypeError: Cannot read property 'foo' of undefined",
1824            "Plugin returned status 400: Bad Request",
1825            "Validation error: missing required field",
1826            "Authorization failed",
1827            "Rate limit exceeded",
1828            "Plugin threw an error: Invalid input",
1829        ];
1830
1831        for error_msg in business_errors {
1832            let err = PluginError::PluginExecutionError(error_msg.to_string());
1833            assert!(
1834                !PoolManager::is_dead_server_error(&err),
1835                "Expected '{}' to NOT be detected as dead server error",
1836                error_msg
1837            );
1838        }
1839    }
1840
1841    #[test]
1842    fn test_is_dead_server_error_with_handler_error_type() {
1843        // HandlerError type should also be checked
1844        let handler_payload = PluginHandlerPayload {
1845            message: "Connection refused".to_string(),
1846            status: 500,
1847            code: None,
1848            details: None,
1849            logs: None,
1850            traces: None,
1851        };
1852        let err = PluginError::HandlerError(Box::new(handler_payload));
1853        // The error message contains "Connection refused" but it's wrapped differently
1854        // This tests that we check the string representation
1855        assert!(PoolManager::is_dead_server_error(&err));
1856    }
1857
1858    // ============================================
1859    // Heap calculation tests
1860    // ============================================
1861
1862    #[test]
1863    fn test_heap_calculation_base_case() {
1864        // With default concurrency, should get base heap
1865        let base = PoolManager::BASE_HEAP_MB;
1866        let divisor = PoolManager::CONCURRENCY_DIVISOR;
1867        let increment = PoolManager::HEAP_INCREMENT_PER_DIVISOR_MB;
1868
1869        // For 100 concurrent requests:
1870        // 512 + (100 / 10) * 32 = 512 + 320 = 832 MB
1871        let concurrency = 100;
1872        let expected = base + ((concurrency / divisor) * increment);
1873        assert_eq!(expected, 832);
1874    }
1875
1876    #[test]
1877    fn test_heap_calculation_minimum() {
1878        // With very low concurrency, should still get base heap
1879        let base = PoolManager::BASE_HEAP_MB;
1880        let divisor = PoolManager::CONCURRENCY_DIVISOR;
1881        let increment = PoolManager::HEAP_INCREMENT_PER_DIVISOR_MB;
1882
1883        // For 5 concurrent requests:
1884        // 512 + (5 / 10) * 32 = 512 + 0 = 512 MB (integer division)
1885        let concurrency = 5;
1886        let expected = base + ((concurrency / divisor) * increment);
1887        assert_eq!(expected, 512);
1888    }
1889
1890    #[test]
1891    fn test_heap_calculation_high_concurrency() {
1892        // With high concurrency, should scale appropriately
1893        let base = PoolManager::BASE_HEAP_MB;
1894        let divisor = PoolManager::CONCURRENCY_DIVISOR;
1895        let increment = PoolManager::HEAP_INCREMENT_PER_DIVISOR_MB;
1896
1897        // For 500 concurrent requests:
1898        // 512 + (500 / 10) * 32 = 512 + 1600 = 2112 MB
1899        let concurrency = 500;
1900        let expected = base + ((concurrency / divisor) * increment);
1901        assert_eq!(expected, 2112);
1902    }
1903
1904    #[test]
1905    fn test_heap_calculation_max_cap() {
1906        // Verify max heap cap is respected
1907        let max_heap = PoolManager::MAX_HEAP_MB;
1908        assert_eq!(max_heap, 8192);
1909
1910        // For extreme concurrency that would exceed cap:
1911        // e.g., 3000 concurrent: 512 + (3000 / 10) * 32 = 512 + 9600 = 10112 MB
1912        // Should be capped to 8192 MB
1913        let base = PoolManager::BASE_HEAP_MB;
1914        let divisor = PoolManager::CONCURRENCY_DIVISOR;
1915        let increment = PoolManager::HEAP_INCREMENT_PER_DIVISOR_MB;
1916
1917        let concurrency = 3000;
1918        let calculated = base + ((concurrency / divisor) * increment);
1919        let capped = calculated.min(max_heap);
1920
1921        assert_eq!(calculated, 10112);
1922        assert_eq!(capped, 8192);
1923    }
1924
1925    // ============================================
1926    // Constants verification tests
1927    // ============================================
1928
1929    #[test]
1930    fn test_pool_manager_constants() {
1931        // Verify important constants have reasonable values
1932        assert_eq!(PoolManager::BASE_HEAP_MB, 512);
1933        assert_eq!(PoolManager::CONCURRENCY_DIVISOR, 10);
1934        assert_eq!(PoolManager::HEAP_INCREMENT_PER_DIVISOR_MB, 32);
1935        assert_eq!(PoolManager::MAX_HEAP_MB, 8192);
1936    }
1937
1938    // ============================================
1939    // Extracted function tests: calculate_heap_size
1940    // ============================================
1941
1942    #[test]
1943    fn test_calculate_heap_size_low_concurrency() {
1944        // Low concurrency should give base heap
1945        assert_eq!(PoolManager::calculate_heap_size(5), 512);
1946        assert_eq!(PoolManager::calculate_heap_size(9), 512);
1947    }
1948
1949    #[test]
1950    fn test_calculate_heap_size_medium_concurrency() {
1951        // 10 concurrent: 512 + (10/10)*32 = 544
1952        assert_eq!(PoolManager::calculate_heap_size(10), 544);
1953        // 50 concurrent: 512 + (50/10)*32 = 672
1954        assert_eq!(PoolManager::calculate_heap_size(50), 672);
1955        // 100 concurrent: 512 + (100/10)*32 = 832
1956        assert_eq!(PoolManager::calculate_heap_size(100), 832);
1957    }
1958
1959    #[test]
1960    fn test_calculate_heap_size_high_concurrency() {
1961        // 500 concurrent: 512 + (500/10)*32 = 2112
1962        assert_eq!(PoolManager::calculate_heap_size(500), 2112);
1963        // 1000 concurrent: 512 + (1000/10)*32 = 3712
1964        assert_eq!(PoolManager::calculate_heap_size(1000), 3712);
1965    }
1966
1967    #[test]
1968    fn test_calculate_heap_size_capped_at_max() {
1969        // 3000 concurrent would be 10112, but capped at 8192
1970        assert_eq!(PoolManager::calculate_heap_size(3000), 8192);
1971        // Even higher should still be capped
1972        assert_eq!(PoolManager::calculate_heap_size(10000), 8192);
1973    }
1974
1975    #[test]
1976    fn test_calculate_heap_size_zero_concurrency() {
1977        // Zero concurrency gives base heap
1978        assert_eq!(PoolManager::calculate_heap_size(0), 512);
1979    }
1980
1981    // ============================================
1982    // Extracted function tests: format_return_value
1983    // ============================================
1984
1985    #[test]
1986    fn test_format_return_value_none() {
1987        assert_eq!(PoolManager::format_return_value(None), "");
1988    }
1989
1990    #[test]
1991    fn test_format_return_value_string() {
1992        let value = Some(serde_json::json!("hello world"));
1993        assert_eq!(PoolManager::format_return_value(value), "hello world");
1994    }
1995
1996    #[test]
1997    fn test_format_return_value_empty_string() {
1998        let value = Some(serde_json::json!(""));
1999        assert_eq!(PoolManager::format_return_value(value), "");
2000    }
2001
2002    #[test]
2003    fn test_format_return_value_object() {
2004        let value = Some(serde_json::json!({"key": "value", "num": 42}));
2005        let result = PoolManager::format_return_value(value);
2006        // JSON object gets serialized
2007        assert!(result.contains("key"));
2008        assert!(result.contains("value"));
2009        assert!(result.contains("42"));
2010    }
2011
2012    #[test]
2013    fn test_format_return_value_array() {
2014        let value = Some(serde_json::json!([1, 2, 3]));
2015        assert_eq!(PoolManager::format_return_value(value), "[1,2,3]");
2016    }
2017
2018    #[test]
2019    fn test_format_return_value_number() {
2020        let value = Some(serde_json::json!(42));
2021        assert_eq!(PoolManager::format_return_value(value), "42");
2022    }
2023
2024    #[test]
2025    fn test_format_return_value_boolean() {
2026        assert_eq!(
2027            PoolManager::format_return_value(Some(serde_json::json!(true))),
2028            "true"
2029        );
2030        assert_eq!(
2031            PoolManager::format_return_value(Some(serde_json::json!(false))),
2032            "false"
2033        );
2034    }
2035
2036    #[test]
2037    fn test_format_return_value_null() {
2038        let value = Some(serde_json::json!(null));
2039        assert_eq!(PoolManager::format_return_value(value), "null");
2040    }
2041
2042    // ============================================
2043    // Extracted function tests: parse_pool_response
2044    // ============================================
2045
2046    #[test]
2047    fn test_parse_pool_response_success_with_string_result() {
2048        use super::super::protocol::{PoolLogEntry, PoolResponse};
2049
2050        let response = PoolResponse {
2051            task_id: "test-123".to_string(),
2052            success: true,
2053            result: Some(serde_json::json!("success result")),
2054            error: None,
2055            logs: Some(vec![PoolLogEntry {
2056                level: "info".to_string(),
2057                message: "test log".to_string(),
2058            }]),
2059        };
2060
2061        let result = PoolManager::parse_pool_response(response).unwrap();
2062        assert_eq!(result.return_value, "success result");
2063        assert!(result.error.is_empty());
2064        assert_eq!(result.logs.len(), 1);
2065        assert_eq!(result.logs[0].level, LogLevel::Info);
2066        assert_eq!(result.logs[0].message, "test log");
2067    }
2068
2069    #[test]
2070    fn test_parse_pool_response_success_with_object_result() {
2071        use super::super::protocol::PoolResponse;
2072
2073        let response = PoolResponse {
2074            task_id: "test-456".to_string(),
2075            success: true,
2076            result: Some(serde_json::json!({"data": "value"})),
2077            error: None,
2078            logs: None,
2079        };
2080
2081        let result = PoolManager::parse_pool_response(response).unwrap();
2082        assert!(result.return_value.contains("data"));
2083        assert!(result.return_value.contains("value"));
2084        assert!(result.logs.is_empty());
2085    }
2086
2087    #[test]
2088    fn test_parse_pool_response_success_no_result() {
2089        use super::super::protocol::PoolResponse;
2090
2091        let response = PoolResponse {
2092            task_id: "test-789".to_string(),
2093            success: true,
2094            result: None,
2095            error: None,
2096            logs: None,
2097        };
2098
2099        let result = PoolManager::parse_pool_response(response).unwrap();
2100        assert_eq!(result.return_value, "");
2101        assert!(result.error.is_empty());
2102    }
2103
2104    #[test]
2105    fn test_parse_pool_response_failure_with_error() {
2106        use super::super::protocol::{PoolError, PoolResponse};
2107
2108        let response = PoolResponse {
2109            task_id: "test-error".to_string(),
2110            success: false,
2111            result: None,
2112            error: Some(PoolError {
2113                message: "Something went wrong".to_string(),
2114                code: Some("ERR_001".to_string()),
2115                status: Some(400),
2116                details: Some(serde_json::json!({"field": "name"})),
2117            }),
2118            logs: None,
2119        };
2120
2121        let err = PoolManager::parse_pool_response(response).unwrap_err();
2122        match err {
2123            PluginError::HandlerError(payload) => {
2124                assert_eq!(payload.message, "Something went wrong");
2125                assert_eq!(payload.status, 400);
2126                assert_eq!(payload.code, Some("ERR_001".to_string()));
2127            }
2128            _ => panic!("Expected HandlerError"),
2129        }
2130    }
2131
2132    #[test]
2133    fn test_parse_pool_response_failure_no_error_details() {
2134        use super::super::protocol::PoolResponse;
2135
2136        let response = PoolResponse {
2137            task_id: "test-unknown".to_string(),
2138            success: false,
2139            result: None,
2140            error: None,
2141            logs: None,
2142        };
2143
2144        let err = PoolManager::parse_pool_response(response).unwrap_err();
2145        match err {
2146            PluginError::HandlerError(payload) => {
2147                assert_eq!(payload.message, "Unknown error");
2148                assert_eq!(payload.status, 500);
2149            }
2150            _ => panic!("Expected HandlerError"),
2151        }
2152    }
2153
2154    #[test]
2155    fn test_parse_pool_response_failure_preserves_logs() {
2156        use super::super::protocol::{PoolError, PoolLogEntry, PoolResponse};
2157
2158        let response = PoolResponse {
2159            task_id: "test-logs".to_string(),
2160            success: false,
2161            result: None,
2162            error: Some(PoolError {
2163                message: "Error with logs".to_string(),
2164                code: None,
2165                status: None,
2166                details: None,
2167            }),
2168            logs: Some(vec![
2169                PoolLogEntry {
2170                    level: "debug".to_string(),
2171                    message: "debug message".to_string(),
2172                },
2173                PoolLogEntry {
2174                    level: "error".to_string(),
2175                    message: "error message".to_string(),
2176                },
2177            ]),
2178        };
2179
2180        let err = PoolManager::parse_pool_response(response).unwrap_err();
2181        match err {
2182            PluginError::HandlerError(payload) => {
2183                let logs = payload.logs.unwrap();
2184                assert_eq!(logs.len(), 2);
2185                assert_eq!(logs[0].level, LogLevel::Debug);
2186                assert_eq!(logs[1].level, LogLevel::Error);
2187            }
2188            _ => panic!("Expected HandlerError"),
2189        }
2190    }
2191
2192    // ============================================
2193    // Extracted function tests: parse_success_response
2194    // ============================================
2195
2196    #[test]
2197    fn test_parse_success_response_complete() {
2198        use super::super::protocol::{PoolLogEntry, PoolResponse};
2199
2200        let response = PoolResponse {
2201            task_id: "task-1".to_string(),
2202            success: true,
2203            result: Some(serde_json::json!("completed")),
2204            error: None,
2205            logs: Some(vec![
2206                PoolLogEntry {
2207                    level: "log".to_string(),
2208                    message: "starting".to_string(),
2209                },
2210                PoolLogEntry {
2211                    level: "result".to_string(),
2212                    message: "finished".to_string(),
2213                },
2214            ]),
2215        };
2216
2217        let result = PoolManager::parse_success_response(response);
2218        assert_eq!(result.return_value, "completed");
2219        assert!(result.error.is_empty());
2220        assert_eq!(result.logs.len(), 2);
2221        assert_eq!(result.logs[0].level, LogLevel::Log);
2222        assert_eq!(result.logs[1].level, LogLevel::Result);
2223    }
2224
2225    // ============================================
2226    // Extracted function tests: parse_error_response
2227    // ============================================
2228
2229    #[test]
2230    fn test_parse_error_response_with_all_fields() {
2231        use super::super::protocol::{PoolError, PoolLogEntry, PoolResponse};
2232
2233        let response = PoolResponse {
2234            task_id: "err-task".to_string(),
2235            success: false,
2236            result: None,
2237            error: Some(PoolError {
2238                message: "Validation failed".to_string(),
2239                code: Some("VALIDATION_ERROR".to_string()),
2240                status: Some(422),
2241                details: Some(serde_json::json!({"fields": ["email"]})),
2242            }),
2243            logs: Some(vec![PoolLogEntry {
2244                level: "warn".to_string(),
2245                message: "validation warning".to_string(),
2246            }]),
2247        };
2248
2249        let err = PoolManager::parse_error_response(response);
2250        match err {
2251            PluginError::HandlerError(payload) => {
2252                assert_eq!(payload.message, "Validation failed");
2253                assert_eq!(payload.status, 422);
2254                assert_eq!(payload.code, Some("VALIDATION_ERROR".to_string()));
2255                assert!(payload.details.is_some());
2256                let logs = payload.logs.unwrap();
2257                assert_eq!(logs.len(), 1);
2258                assert_eq!(logs[0].level, LogLevel::Warn);
2259            }
2260            _ => panic!("Expected HandlerError"),
2261        }
2262    }
2263
2264    // ============================================
2265    // Extracted function tests: parse_health_result
2266    // ============================================
2267
2268    #[test]
2269    fn test_parse_health_result_complete() {
2270        let json = serde_json::json!({
2271            "status": "healthy",
2272            "uptime": 123456,
2273            "memory": {
2274                "heapUsed": 50000000,
2275                "heapTotal": 100000000
2276            },
2277            "pool": {
2278                "completed": 1000,
2279                "queued": 5
2280            },
2281            "execution": {
2282                "successRate": 0.99
2283            }
2284        });
2285
2286        let result = PoolManager::parse_health_result(&json);
2287
2288        assert_eq!(result.status, "healthy");
2289        assert_eq!(result.uptime_ms, Some(123456));
2290        assert_eq!(result.memory, Some(50000000));
2291        assert_eq!(result.pool_completed, Some(1000));
2292        assert_eq!(result.pool_queued, Some(5));
2293        assert!((result.success_rate.unwrap() - 0.99).abs() < 0.001);
2294    }
2295
2296    #[test]
2297    fn test_parse_health_result_minimal() {
2298        let json = serde_json::json!({});
2299
2300        let result = PoolManager::parse_health_result(&json);
2301
2302        assert_eq!(result.status, "unknown");
2303        assert_eq!(result.uptime_ms, None);
2304        assert_eq!(result.memory, None);
2305        assert_eq!(result.pool_completed, None);
2306        assert_eq!(result.pool_queued, None);
2307        assert_eq!(result.success_rate, None);
2308    }
2309
2310    #[test]
2311    fn test_parse_health_result_partial() {
2312        let json = serde_json::json!({
2313            "status": "degraded",
2314            "uptime": 5000,
2315            "memory": {
2316                "heapTotal": 100000000
2317                // heapUsed missing
2318            }
2319        });
2320
2321        let result = PoolManager::parse_health_result(&json);
2322
2323        assert_eq!(result.status, "degraded");
2324        assert_eq!(result.uptime_ms, Some(5000));
2325        assert_eq!(result.memory, None); // heapUsed was missing
2326        assert_eq!(result.pool_completed, None);
2327        assert_eq!(result.pool_queued, None);
2328        assert_eq!(result.success_rate, None);
2329    }
2330
2331    #[test]
2332    fn test_parse_health_result_wrong_types() {
2333        let json = serde_json::json!({
2334            "status": 123,  // Should be string, will use "unknown"
2335            "uptime": "not a number",  // Should be u64, will be None
2336            "memory": "invalid"  // Should be object, will give None
2337        });
2338
2339        let result = PoolManager::parse_health_result(&json);
2340
2341        assert_eq!(result.status, "unknown"); // Falls back when not a string
2342        assert_eq!(result.uptime_ms, None);
2343        assert_eq!(result.memory, None);
2344        assert_eq!(result.pool_completed, None);
2345        assert_eq!(result.pool_queued, None);
2346        assert_eq!(result.success_rate, None);
2347    }
2348
2349    #[test]
2350    fn test_parse_health_result_nested_values() {
2351        let json = serde_json::json!({
2352            "pool": {
2353                "completed": 0,
2354                "queued": 0
2355            },
2356            "execution": {
2357                "successRate": 1.0
2358            }
2359        });
2360
2361        let result = PoolManager::parse_health_result(&json);
2362
2363        assert_eq!(result.status, "unknown");
2364        assert_eq!(result.uptime_ms, None);
2365        assert_eq!(result.memory, None);
2366        assert_eq!(result.pool_completed, Some(0));
2367        assert_eq!(result.pool_queued, Some(0));
2368        assert!((result.success_rate.unwrap() - 1.0).abs() < 0.001);
2369    }
2370
2371    // ============================================
2372    // PoolManager creation tests
2373    // ============================================
2374
2375    #[tokio::test]
2376    async fn test_pool_manager_new_creates_unique_socket_path() {
2377        // Two PoolManagers should have different socket paths
2378        let manager1 = PoolManager::new();
2379        let manager2 = PoolManager::new();
2380
2381        assert_ne!(manager1.socket_path, manager2.socket_path);
2382        assert!(manager1
2383            .socket_path
2384            .starts_with("/tmp/relayer-plugin-pool-"));
2385        assert!(manager2
2386            .socket_path
2387            .starts_with("/tmp/relayer-plugin-pool-"));
2388    }
2389
2390    #[tokio::test]
2391    async fn test_pool_manager_with_custom_socket_path() {
2392        let custom_path = "/tmp/custom-test-pool.sock".to_string();
2393        let manager = PoolManager::with_socket_path(custom_path.clone());
2394
2395        assert_eq!(manager.socket_path, custom_path);
2396    }
2397
2398    #[tokio::test]
2399    async fn test_pool_manager_default_trait() {
2400        // Verify Default trait creates a valid manager
2401        let manager = PoolManager::default();
2402        assert!(manager.socket_path.starts_with("/tmp/relayer-plugin-pool-"));
2403    }
2404
2405    // ============================================
2406    // Circuit breaker state tests
2407    // ============================================
2408
2409    #[tokio::test]
2410    async fn test_circuit_state_initial() {
2411        let manager = PoolManager::new();
2412
2413        // Initial state should be Closed
2414        assert_eq!(manager.circuit_state(), CircuitState::Closed);
2415    }
2416
2417    #[tokio::test]
2418    async fn test_avg_response_time_initial() {
2419        let manager = PoolManager::new();
2420
2421        // Initial response time should be 0
2422        assert_eq!(manager.avg_response_time_ms(), 0);
2423    }
2424
2425    // ============================================
2426    // Recovery mode tests
2427    // ============================================
2428
2429    #[tokio::test]
2430    async fn test_recovery_mode_initial() {
2431        let manager = PoolManager::new();
2432
2433        // Should not be in recovery mode initially
2434        assert!(!manager.is_recovering());
2435        assert_eq!(manager.recovery_allowance_percent(), 0);
2436    }
2437
2438    // ============================================
2439    // ScriptResult construction tests
2440    // ============================================
2441
2442    #[test]
2443    fn test_script_result_success_construction() {
2444        let result = ScriptResult {
2445            logs: vec![LogEntry {
2446                level: LogLevel::Info,
2447                message: "Test log".to_string(),
2448            }],
2449            error: String::new(),
2450            return_value: r#"{"success": true}"#.to_string(),
2451            trace: vec![],
2452        };
2453
2454        assert!(result.error.is_empty());
2455        assert_eq!(result.logs.len(), 1);
2456        assert_eq!(result.logs[0].level, LogLevel::Info);
2457    }
2458
2459    #[test]
2460    fn test_script_result_with_multiple_logs() {
2461        let result = ScriptResult {
2462            logs: vec![
2463                LogEntry {
2464                    level: LogLevel::Log,
2465                    message: "Starting execution".to_string(),
2466                },
2467                LogEntry {
2468                    level: LogLevel::Debug,
2469                    message: "Processing data".to_string(),
2470                },
2471                LogEntry {
2472                    level: LogLevel::Warn,
2473                    message: "Deprecated API used".to_string(),
2474                },
2475                LogEntry {
2476                    level: LogLevel::Error,
2477                    message: "Non-fatal error".to_string(),
2478                },
2479            ],
2480            error: String::new(),
2481            return_value: "done".to_string(),
2482            trace: vec![],
2483        };
2484
2485        assert_eq!(result.logs.len(), 4);
2486        assert_eq!(result.logs[0].level, LogLevel::Log);
2487        assert_eq!(result.logs[1].level, LogLevel::Debug);
2488        assert_eq!(result.logs[2].level, LogLevel::Warn);
2489        assert_eq!(result.logs[3].level, LogLevel::Error);
2490    }
2491
2492    // ============================================
2493    // QueuedRequest structure tests
2494    // ============================================
2495
2496    #[test]
2497    fn test_queued_request_required_fields() {
2498        let (tx, _rx) = oneshot::channel();
2499
2500        let request = QueuedRequest {
2501            plugin_id: "test-plugin".to_string(),
2502            compiled_code: Some("module.exports.handler = () => {}".to_string()),
2503            plugin_path: None,
2504            params: serde_json::json!({"key": "value"}),
2505            headers: None,
2506            socket_path: "/tmp/test.sock".to_string(),
2507            http_request_id: Some("req-123".to_string()),
2508            timeout_secs: Some(30),
2509            route: Some("/api/test".to_string()),
2510            config: Some(serde_json::json!({"setting": true})),
2511            method: Some("POST".to_string()),
2512            query: Some(serde_json::json!({"page": "1"})),
2513            response_tx: tx,
2514        };
2515
2516        assert_eq!(request.plugin_id, "test-plugin");
2517        assert!(request.compiled_code.is_some());
2518        assert!(request.plugin_path.is_none());
2519        assert_eq!(request.timeout_secs, Some(30));
2520    }
2521
2522    #[test]
2523    fn test_queued_request_minimal() {
2524        let (tx, _rx) = oneshot::channel();
2525
2526        let request = QueuedRequest {
2527            plugin_id: "minimal".to_string(),
2528            compiled_code: None,
2529            plugin_path: Some("/path/to/plugin.ts".to_string()),
2530            params: serde_json::json!(null),
2531            headers: None,
2532            socket_path: "/tmp/min.sock".to_string(),
2533            http_request_id: None,
2534            timeout_secs: None,
2535            route: None,
2536            config: None,
2537            method: None,
2538            query: None,
2539            response_tx: tx,
2540        };
2541
2542        assert_eq!(request.plugin_id, "minimal");
2543        assert!(request.compiled_code.is_none());
2544        assert!(request.plugin_path.is_some());
2545    }
2546
2547    // ============================================
2548    // Error type tests
2549    // ============================================
2550
2551    #[test]
2552    fn test_plugin_error_socket_error() {
2553        let err = PluginError::SocketError("Connection failed".to_string());
2554        let display = format!("{}", err);
2555        assert!(display.contains("Socket error"));
2556        assert!(display.contains("Connection failed"));
2557    }
2558
2559    #[test]
2560    fn test_plugin_error_plugin_execution_error() {
2561        let err = PluginError::PluginExecutionError("Execution failed".to_string());
2562        let display = format!("{}", err);
2563        assert!(display.contains("Execution failed"));
2564    }
2565
2566    #[test]
2567    fn test_plugin_error_handler_error() {
2568        let payload = PluginHandlerPayload {
2569            message: "Handler error".to_string(),
2570            status: 400,
2571            code: Some("BAD_REQUEST".to_string()),
2572            details: Some(serde_json::json!({"field": "name"})),
2573            logs: None,
2574            traces: None,
2575        };
2576        let err = PluginError::HandlerError(Box::new(payload));
2577
2578        // Check that it can be displayed
2579        let display = format!("{:?}", err);
2580        assert!(display.contains("HandlerError"));
2581    }
2582
2583    // ============================================
2584    // Handler payload tests
2585    // ============================================
2586
2587    #[test]
2588    fn test_plugin_handler_payload_full() {
2589        let payload = PluginHandlerPayload {
2590            message: "Validation failed".to_string(),
2591            status: 422,
2592            code: Some("VALIDATION_ERROR".to_string()),
2593            details: Some(serde_json::json!({
2594                "errors": [
2595                    {"field": "email", "message": "Invalid format"}
2596                ]
2597            })),
2598            logs: Some(vec![LogEntry {
2599                level: LogLevel::Error,
2600                message: "Validation failed for email".to_string(),
2601            }]),
2602            traces: Some(vec![serde_json::json!({"stack": "Error at line 10"})]),
2603        };
2604
2605        assert_eq!(payload.status, 422);
2606        assert_eq!(payload.code, Some("VALIDATION_ERROR".to_string()));
2607        assert!(payload.logs.is_some());
2608        assert!(payload.traces.is_some());
2609    }
2610
2611    #[test]
2612    fn test_plugin_handler_payload_minimal() {
2613        let payload = PluginHandlerPayload {
2614            message: "Error".to_string(),
2615            status: 500,
2616            code: None,
2617            details: None,
2618            logs: None,
2619            traces: None,
2620        };
2621
2622        assert_eq!(payload.status, 500);
2623        assert!(payload.code.is_none());
2624        assert!(payload.details.is_none());
2625    }
2626
2627    // ============================================
2628    // Async tests (tokio runtime)
2629    // ============================================
2630
2631    #[tokio::test]
2632    async fn test_pool_manager_not_initialized_health_check() {
2633        let manager = PoolManager::with_socket_path("/tmp/test-health.sock".to_string());
2634
2635        // Health check on uninitialized manager should return not_initialized
2636        let health = manager.health_check().await.unwrap();
2637
2638        assert!(!health.healthy);
2639        assert_eq!(health.status, "not_initialized");
2640        assert!(health.uptime_ms.is_none());
2641        assert!(health.memory.is_none());
2642    }
2643
2644    #[tokio::test]
2645    async fn test_pool_manager_circuit_info_in_health_status() {
2646        let manager = PoolManager::with_socket_path("/tmp/test-circuit.sock".to_string());
2647
2648        let health = manager.health_check().await.unwrap();
2649
2650        // Circuit state info should be present even when not initialized
2651        assert!(health.circuit_state.is_some());
2652        assert_eq!(health.circuit_state, Some("closed".to_string()));
2653        assert!(health.avg_response_time_ms.is_some());
2654        assert!(health.recovering.is_some());
2655        assert!(health.recovery_percent.is_some());
2656    }
2657
2658    #[tokio::test]
2659    async fn test_invalidate_plugin_when_not_initialized() {
2660        let manager = PoolManager::with_socket_path("/tmp/test-invalidate.sock".to_string());
2661
2662        // Invalidating when not initialized should be a no-op
2663        let result = manager.invalidate_plugin("test-plugin".to_string()).await;
2664
2665        // Should succeed (no-op)
2666        assert!(result.is_ok());
2667    }
2668
2669    #[tokio::test]
2670    async fn test_shutdown_when_not_initialized() {
2671        let manager = PoolManager::with_socket_path("/tmp/test-shutdown.sock".to_string());
2672
2673        // Shutdown when not initialized should be a no-op
2674        let result = manager.shutdown().await;
2675
2676        // Should succeed (no-op)
2677        assert!(result.is_ok());
2678    }
2679
2680    // ============================================
2681    // Additional ParsedHealthResult tests
2682    // ============================================
2683
2684    #[test]
2685    fn test_parsed_health_result_default() {
2686        let result = ParsedHealthResult::default();
2687        assert_eq!(result.status, "");
2688        assert_eq!(result.uptime_ms, None);
2689        assert_eq!(result.memory, None);
2690        assert_eq!(result.pool_completed, None);
2691        assert_eq!(result.pool_queued, None);
2692        assert_eq!(result.success_rate, None);
2693    }
2694
2695    #[test]
2696    fn test_parsed_health_result_equality() {
2697        let result1 = ParsedHealthResult {
2698            status: "ok".to_string(),
2699            uptime_ms: Some(1000),
2700            memory: Some(500000),
2701            pool_completed: Some(50),
2702            pool_queued: Some(2),
2703            success_rate: Some(1.0),
2704        };
2705        let result2 = ParsedHealthResult {
2706            status: "ok".to_string(),
2707            uptime_ms: Some(1000),
2708            memory: Some(500000),
2709            pool_completed: Some(50),
2710            pool_queued: Some(2),
2711            success_rate: Some(1.0),
2712        };
2713        assert_eq!(result1, result2);
2714    }
2715
2716    #[test]
2717    fn test_format_return_value_nested_object() {
2718        let value = Some(serde_json::json!({
2719            "user": { "name": "John", "age": 30 }
2720        }));
2721        let result = PoolManager::format_return_value(value);
2722        assert!(result.contains("John"));
2723        assert!(result.contains("30"));
2724    }
2725
2726    #[test]
2727    fn test_format_return_value_empty_collections() {
2728        let value = Some(serde_json::json!({}));
2729        assert_eq!(PoolManager::format_return_value(value), "{}");
2730        let value = Some(serde_json::json!([]));
2731        assert_eq!(PoolManager::format_return_value(value), "[]");
2732    }
2733
2734    #[test]
2735    fn test_parse_health_result_zero_values() {
2736        let json = serde_json::json!({
2737            "status": "starting",
2738            "uptime": 0,
2739            "memory": { "heapUsed": 0 },
2740            "pool": { "completed": 0, "queued": 0 },
2741            "execution": { "successRate": 0.0 }
2742        });
2743        let result = PoolManager::parse_health_result(&json);
2744        assert_eq!(result.status, "starting");
2745        assert_eq!(result.uptime_ms, Some(0));
2746        assert_eq!(result.memory, Some(0));
2747        assert_eq!(result.pool_completed, Some(0));
2748        assert_eq!(result.pool_queued, Some(0));
2749        assert_eq!(result.success_rate, Some(0.0));
2750    }
2751
2752    #[test]
2753    fn test_calculate_heap_size_precise_calculations() {
2754        assert_eq!(PoolManager::calculate_heap_size(0), 512);
2755        assert_eq!(PoolManager::calculate_heap_size(1), 512);
2756        assert_eq!(PoolManager::calculate_heap_size(10), 544);
2757        assert_eq!(PoolManager::calculate_heap_size(20), 576);
2758        assert_eq!(PoolManager::calculate_heap_size(100), 832);
2759        assert_eq!(PoolManager::calculate_heap_size(200), 1152);
2760    }
2761
2762    #[tokio::test]
2763    async fn test_pool_manager_health_check_flag_initial() {
2764        let manager = PoolManager::new();
2765        assert!(!manager.health_check_needed.load(Ordering::Relaxed));
2766    }
2767
2768    #[tokio::test]
2769    async fn test_pool_manager_consecutive_failures_initial() {
2770        let manager = PoolManager::new();
2771        assert_eq!(manager.consecutive_failures.load(Ordering::Relaxed), 0);
2772    }
2773
2774    #[tokio::test]
2775    async fn test_recovery_allowance_bounds() {
2776        let manager = PoolManager::new();
2777        manager.recovery_allowance.store(0, Ordering::Relaxed);
2778        assert_eq!(manager.recovery_allowance_percent(), 0);
2779        manager.recovery_allowance.store(50, Ordering::Relaxed);
2780        assert_eq!(manager.recovery_allowance_percent(), 50);
2781        manager.recovery_allowance.store(100, Ordering::Relaxed);
2782        assert_eq!(manager.recovery_allowance_percent(), 100);
2783    }
2784
2785    #[tokio::test]
2786    async fn test_is_initialized_changes_with_state() {
2787        let manager = PoolManager::with_socket_path("/tmp/init-test-123.sock".to_string());
2788        assert!(!manager.is_initialized().await);
2789        manager.initialized.store(true, Ordering::Release);
2790        assert!(manager.is_initialized().await);
2791        manager.initialized.store(false, Ordering::Release);
2792        assert!(!manager.is_initialized().await);
2793    }
2794
2795    // ============================================
2796    // Additional edge case tests for coverage
2797    // ============================================
2798
2799    #[test]
2800    fn test_is_dead_server_error_with_script_timeout() {
2801        // ScriptTimeout should NOT be a dead server error
2802        let err = PluginError::ScriptTimeout(30);
2803        assert!(!PoolManager::is_dead_server_error(&err));
2804    }
2805
2806    #[test]
2807    fn test_is_dead_server_error_with_plugin_error() {
2808        let err = PluginError::PluginError("some plugin error".to_string());
2809        assert!(!PoolManager::is_dead_server_error(&err));
2810    }
2811
2812    #[test]
2813    fn test_is_dead_server_error_with_connection_timeout_in_plugin_error() {
2814        // Note: When "connection timed out" is wrapped in PluginExecutionError,
2815        // the Display output includes "Plugin" which triggers the exclusion
2816        // for (plugin + timed out). This is expected behavior to prevent
2817        // plugin execution timeouts from triggering restarts.
2818        let err = PluginError::PluginExecutionError("connection timed out".to_string());
2819        // The error string becomes something like "Plugin execution error: connection timed out"
2820        // which contains "plugin" AND "timed out", so it's excluded
2821        assert!(!PoolManager::is_dead_server_error(&err));
2822
2823        // SocketError doesn't add "Plugin" to the display, so connection issues there
2824        // would be detected correctly
2825        let err = PluginError::SocketError("connect timed out".to_string());
2826        assert!(PoolManager::is_dead_server_error(&err));
2827    }
2828
2829    #[test]
2830    fn test_parse_pool_response_success_with_logs_various_levels() {
2831        use super::super::protocol::{PoolLogEntry, PoolResponse};
2832
2833        let response = PoolResponse {
2834            task_id: "test-levels".to_string(),
2835            success: true,
2836            result: Some(serde_json::json!("ok")),
2837            error: None,
2838            logs: Some(vec![
2839                PoolLogEntry {
2840                    level: "log".to_string(),
2841                    message: "log level".to_string(),
2842                },
2843                PoolLogEntry {
2844                    level: "debug".to_string(),
2845                    message: "debug level".to_string(),
2846                },
2847                PoolLogEntry {
2848                    level: "info".to_string(),
2849                    message: "info level".to_string(),
2850                },
2851                PoolLogEntry {
2852                    level: "warn".to_string(),
2853                    message: "warn level".to_string(),
2854                },
2855                PoolLogEntry {
2856                    level: "error".to_string(),
2857                    message: "error level".to_string(),
2858                },
2859                PoolLogEntry {
2860                    level: "result".to_string(),
2861                    message: "result level".to_string(),
2862                },
2863            ]),
2864        };
2865
2866        let result = PoolManager::parse_pool_response(response).unwrap();
2867        assert_eq!(result.logs.len(), 6);
2868        assert_eq!(result.logs[0].level, LogLevel::Log);
2869        assert_eq!(result.logs[1].level, LogLevel::Debug);
2870        assert_eq!(result.logs[2].level, LogLevel::Info);
2871        assert_eq!(result.logs[3].level, LogLevel::Warn);
2872        assert_eq!(result.logs[4].level, LogLevel::Error);
2873        assert_eq!(result.logs[5].level, LogLevel::Result);
2874    }
2875
2876    #[test]
2877    fn test_parse_error_response_defaults() {
2878        use super::super::protocol::PoolResponse;
2879
2880        // Response with no error field at all
2881        let response = PoolResponse {
2882            task_id: "no-error".to_string(),
2883            success: false,
2884            result: None,
2885            error: None,
2886            logs: None,
2887        };
2888
2889        let err = PoolManager::parse_error_response(response);
2890        match err {
2891            PluginError::HandlerError(payload) => {
2892                assert_eq!(payload.message, "Unknown error");
2893                assert_eq!(payload.status, 500);
2894                assert!(payload.code.is_none());
2895                assert!(payload.details.is_none());
2896            }
2897            _ => panic!("Expected HandlerError"),
2898        }
2899    }
2900
2901    #[test]
2902    fn test_format_return_value_float() {
2903        let value = Some(serde_json::json!(3.14159));
2904        let result = PoolManager::format_return_value(value);
2905        assert!(result.contains("3.14159"));
2906    }
2907
2908    #[test]
2909    fn test_format_return_value_large_array() {
2910        let value = Some(serde_json::json!([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]));
2911        let result = PoolManager::format_return_value(value);
2912        assert_eq!(result, "[1,2,3,4,5,6,7,8,9,10]");
2913    }
2914
2915    #[test]
2916    fn test_format_return_value_string_with_special_chars() {
2917        let value = Some(serde_json::json!("hello\nworld\ttab"));
2918        assert_eq!(PoolManager::format_return_value(value), "hello\nworld\ttab");
2919    }
2920
2921    #[test]
2922    fn test_format_return_value_unicode() {
2923        let value = Some(serde_json::json!("こんにちは世界 🌍"));
2924        assert_eq!(PoolManager::format_return_value(value), "こんにちは世界 🌍");
2925    }
2926
2927    #[test]
2928    fn test_parse_health_result_large_values() {
2929        let json = serde_json::json!({
2930            "status": "healthy",
2931            "uptime": 999999999999_u64,
2932            "memory": { "heapUsed": 9999999999_u64 },
2933            "pool": { "completed": 999999999_u64, "queued": 999999_u64 },
2934            "execution": { "successRate": 0.999999 }
2935        });
2936
2937        let result = PoolManager::parse_health_result(&json);
2938        assert_eq!(result.status, "healthy");
2939        assert_eq!(result.uptime_ms, Some(999999999999));
2940        assert_eq!(result.memory, Some(9999999999));
2941        assert_eq!(result.pool_completed, Some(999999999));
2942        assert_eq!(result.pool_queued, Some(999999));
2943        assert!((result.success_rate.unwrap() - 0.999999).abs() < 0.0000001);
2944    }
2945
2946    #[test]
2947    fn test_parse_health_result_negative_values_treated_as_none() {
2948        // JSON doesn't have unsigned, so negative values won't parse as u64
2949        let json = serde_json::json!({
2950            "status": "error",
2951            "uptime": -1,
2952            "memory": { "heapUsed": -100 }
2953        });
2954
2955        let result = PoolManager::parse_health_result(&json);
2956        assert_eq!(result.status, "error");
2957        assert_eq!(result.uptime_ms, None); // -1 can't be u64
2958        assert_eq!(result.memory, None);
2959    }
2960
2961    #[test]
2962    fn test_parsed_health_result_debug() {
2963        let result = ParsedHealthResult {
2964            status: "test".to_string(),
2965            uptime_ms: Some(100),
2966            memory: Some(200),
2967            pool_completed: Some(50),
2968            pool_queued: Some(5),
2969            success_rate: Some(0.95),
2970        };
2971
2972        let debug_str = format!("{:?}", result);
2973        assert!(debug_str.contains("test"));
2974        assert!(debug_str.contains("100"));
2975        assert!(debug_str.contains("200"));
2976    }
2977
2978    #[test]
2979    fn test_calculate_heap_size_boundary_values() {
2980        // Test at exact boundaries
2981        // 9 should give base (9/10 = 0)
2982        assert_eq!(PoolManager::calculate_heap_size(9), 512);
2983        // 10 should give base + 32 (10/10 = 1)
2984        assert_eq!(PoolManager::calculate_heap_size(10), 544);
2985
2986        // Test boundary where cap kicks in
2987        // 2400 would be: 512 + (240 * 32) = 512 + 7680 = 8192 (at cap)
2988        assert_eq!(PoolManager::calculate_heap_size(2400), 8192);
2989        // 2399 would be: 512 + (239 * 32) = 512 + 7648 = 8160 (under cap)
2990        assert_eq!(PoolManager::calculate_heap_size(2399), 8160);
2991    }
2992
2993    #[tokio::test]
2994    async fn test_pool_manager_socket_path_format() {
2995        let manager = PoolManager::new();
2996        // Should contain UUID format
2997        assert!(manager.socket_path.starts_with("/tmp/relayer-plugin-pool-"));
2998        assert!(manager.socket_path.ends_with(".sock"));
2999        // UUID is 36 chars (32 hex + 4 dashes)
3000        let uuid_part = manager
3001            .socket_path
3002            .strip_prefix("/tmp/relayer-plugin-pool-")
3003            .unwrap()
3004            .strip_suffix(".sock")
3005            .unwrap();
3006        assert_eq!(uuid_part.len(), 36);
3007    }
3008
3009    #[tokio::test]
3010    async fn test_health_check_socket_missing() {
3011        let manager =
3012            PoolManager::with_socket_path("/tmp/nonexistent-socket-12345.sock".to_string());
3013        // Mark as initialized but socket doesn't exist
3014        manager.initialized.store(true, Ordering::Release);
3015
3016        let health = manager.health_check().await.unwrap();
3017        assert!(!health.healthy);
3018        assert_eq!(health.status, "socket_missing");
3019    }
3020
3021    #[test]
3022    fn test_is_dead_server_error_embedded_patterns() {
3023        // Patterns embedded in longer messages
3024        let err = PluginError::PluginExecutionError(
3025            "Error: ECONNREFUSED connection refused at 127.0.0.1:3000".to_string(),
3026        );
3027        assert!(PoolManager::is_dead_server_error(&err));
3028
3029        let err = PluginError::PluginExecutionError(
3030            "SocketError: broken pipe while writing to /tmp/socket".to_string(),
3031        );
3032        assert!(PoolManager::is_dead_server_error(&err));
3033
3034        let err = PluginError::PluginExecutionError(
3035            "IO Error: No such file or directory (os error 2)".to_string(),
3036        );
3037        assert!(PoolManager::is_dead_server_error(&err));
3038    }
3039
3040    #[test]
3041    fn test_is_dead_server_error_mixed_case_timeout_patterns() {
3042        // Handler timeout variants - none should be dead server errors
3043        let variants = vec![
3044            "HANDLER TIMED OUT",
3045            "Handler Timed Out after 30s",
3046            "the handler timed out waiting for response",
3047        ];
3048
3049        for msg in variants {
3050            let err = PluginError::PluginExecutionError(msg.to_string());
3051            assert!(
3052                !PoolManager::is_dead_server_error(&err),
3053                "Expected '{}' to NOT be dead server error",
3054                msg
3055            );
3056        }
3057    }
3058
3059    #[tokio::test]
3060    async fn test_ensure_started_idempotent() {
3061        let manager = PoolManager::with_socket_path("/tmp/idempotent-test-999.sock".to_string());
3062
3063        // First call when not initialized
3064        assert!(!manager.is_initialized().await);
3065
3066        // Manually set initialized without actually starting (for test)
3067        manager.initialized.store(true, Ordering::Release);
3068
3069        // ensure_started should return immediately
3070        let result = manager.ensure_started().await;
3071        assert!(result.is_ok());
3072        assert!(manager.is_initialized().await);
3073    }
3074
3075    #[test]
3076    fn test_queued_request_with_headers() {
3077        let (tx, _rx) = oneshot::channel();
3078
3079        let mut headers = HashMap::new();
3080        headers.insert(
3081            "Authorization".to_string(),
3082            vec!["Bearer token".to_string()],
3083        );
3084        headers.insert(
3085            "Content-Type".to_string(),
3086            vec!["application/json".to_string()],
3087        );
3088
3089        let request = QueuedRequest {
3090            plugin_id: "headers-test".to_string(),
3091            compiled_code: None,
3092            plugin_path: Some("/path/to/plugin.ts".to_string()),
3093            params: serde_json::json!({}),
3094            headers: Some(headers),
3095            socket_path: "/tmp/test.sock".to_string(),
3096            http_request_id: None,
3097            timeout_secs: None,
3098            route: None,
3099            config: None,
3100            method: None,
3101            query: None,
3102            response_tx: tx,
3103        };
3104
3105        assert!(request.headers.is_some());
3106        let headers = request.headers.unwrap();
3107        assert!(headers.contains_key("Authorization"));
3108        assert!(headers.contains_key("Content-Type"));
3109    }
3110
3111    #[test]
3112    fn test_plugin_error_display_formats() {
3113        // Test all PluginError variants have proper Display implementations
3114        let err = PluginError::SocketError("test socket error".to_string());
3115        assert!(format!("{}", err).contains("Socket error"));
3116
3117        let err = PluginError::PluginExecutionError("test execution error".to_string());
3118        assert!(format!("{}", err).contains("test execution error"));
3119
3120        let err = PluginError::ScriptTimeout(60);
3121        assert!(format!("{}", err).contains("60"));
3122
3123        let err = PluginError::PluginError("test plugin error".to_string());
3124        assert!(format!("{}", err).contains("test plugin error"));
3125    }
3126
3127    #[test]
3128    fn test_pool_log_entry_to_log_entry_conversion() {
3129        use super::super::protocol::PoolLogEntry;
3130
3131        // Test the From<PoolLogEntry> for LogEntry conversion
3132        let pool_log = PoolLogEntry {
3133            level: "info".to_string(),
3134            message: "test message".to_string(),
3135        };
3136
3137        let log_entry: LogEntry = pool_log.into();
3138        assert_eq!(log_entry.level, LogLevel::Info);
3139        assert_eq!(log_entry.message, "test message");
3140
3141        // Test unknown level defaults
3142        let pool_log = PoolLogEntry {
3143            level: "unknown_level".to_string(),
3144            message: "unknown level message".to_string(),
3145        };
3146
3147        let log_entry: LogEntry = pool_log.into();
3148        assert_eq!(log_entry.level, LogLevel::Log); // Should default to Log
3149    }
3150
3151    #[tokio::test]
3152    async fn test_circuit_breaker_records_success() {
3153        let manager = PoolManager::new();
3154
3155        // Record some successes
3156        manager.circuit_breaker.record_success(100);
3157        manager.circuit_breaker.record_success(150);
3158        manager.circuit_breaker.record_success(200);
3159
3160        // Average should be calculated
3161        let avg = manager.avg_response_time_ms();
3162        assert!(avg > 0);
3163    }
3164
3165    #[tokio::test]
3166    async fn test_circuit_breaker_state_transitions() {
3167        let manager = PoolManager::new();
3168
3169        // Initial state is Closed
3170        assert_eq!(manager.circuit_state(), CircuitState::Closed);
3171
3172        // Record many failures to potentially trip the breaker
3173        for _ in 0..20 {
3174            manager.circuit_breaker.record_failure();
3175        }
3176
3177        // State might have changed (depends on thresholds)
3178        let state = manager.circuit_state();
3179        assert!(matches!(
3180            state,
3181            CircuitState::Closed | CircuitState::HalfOpen | CircuitState::Open
3182        ));
3183    }
3184
3185    #[tokio::test]
3186    async fn test_recovery_mode_activation() {
3187        let manager = PoolManager::new();
3188
3189        // Manually activate recovery mode
3190        manager.recovery_allowance.store(10, Ordering::Relaxed);
3191        manager.recovery_mode.store(true, Ordering::Relaxed);
3192
3193        assert!(manager.is_recovering());
3194        assert_eq!(manager.recovery_allowance_percent(), 10);
3195
3196        // Increase allowance
3197        manager.recovery_allowance.store(50, Ordering::Relaxed);
3198        assert_eq!(manager.recovery_allowance_percent(), 50);
3199
3200        // Exit recovery mode
3201        manager.recovery_mode.store(false, Ordering::Relaxed);
3202        assert!(!manager.is_recovering());
3203    }
3204
3205    #[test]
3206    fn test_parse_pool_response_with_empty_logs() {
3207        use super::super::protocol::PoolResponse;
3208
3209        let response = PoolResponse {
3210            task_id: "empty-logs".to_string(),
3211            success: true,
3212            result: Some(serde_json::json!("done")),
3213            error: None,
3214            logs: Some(vec![]), // Empty logs array
3215        };
3216
3217        let result = PoolManager::parse_pool_response(response).unwrap();
3218        assert!(result.logs.is_empty());
3219        assert_eq!(result.return_value, "done");
3220    }
3221
3222    #[test]
3223    fn test_handler_payload_with_complex_details() {
3224        let payload = PluginHandlerPayload {
3225            message: "Complex error".to_string(),
3226            status: 400,
3227            code: Some("VALIDATION_ERROR".to_string()),
3228            details: Some(serde_json::json!({
3229                "errors": [
3230                    {"field": "email", "code": "invalid", "message": "Invalid email format"},
3231                    {"field": "password", "code": "weak", "message": "Password too weak"}
3232                ],
3233                "metadata": {
3234                    "requestId": "req-123",
3235                    "timestamp": "2024-01-01T00:00:00Z"
3236                }
3237            })),
3238            logs: None,
3239            traces: None,
3240        };
3241
3242        assert_eq!(payload.status, 400);
3243        let details = payload.details.unwrap();
3244        assert!(details.get("errors").is_some());
3245        assert!(details.get("metadata").is_some());
3246    }
3247
3248    #[test]
3249    fn test_health_status_construction_healthy() {
3250        use super::super::health::HealthStatus;
3251
3252        let status = HealthStatus {
3253            healthy: true,
3254            status: "ok".to_string(),
3255            uptime_ms: Some(1000000),
3256            memory: Some(500000000),
3257            pool_completed: Some(1000),
3258            pool_queued: Some(5),
3259            success_rate: Some(0.99),
3260            circuit_state: Some("closed".to_string()),
3261            avg_response_time_ms: Some(50),
3262            recovering: Some(false),
3263            recovery_percent: Some(100),
3264            shared_socket_available_slots: Some(100),
3265            shared_socket_active_connections: Some(10),
3266            shared_socket_registered_executions: Some(5),
3267            connection_pool_available_slots: Some(50),
3268            connection_pool_active_connections: Some(5),
3269        };
3270
3271        assert!(status.healthy);
3272        assert_eq!(status.status, "ok");
3273        assert_eq!(status.uptime_ms, Some(1000000));
3274        assert_eq!(status.circuit_state, Some("closed".to_string()));
3275    }
3276
3277    #[test]
3278    fn test_health_status_construction_unhealthy() {
3279        use super::super::health::HealthStatus;
3280
3281        let status = HealthStatus {
3282            healthy: false,
3283            status: "connection_failed".to_string(),
3284            uptime_ms: None,
3285            memory: None,
3286            pool_completed: None,
3287            pool_queued: None,
3288            success_rate: None,
3289            circuit_state: Some("open".to_string()),
3290            avg_response_time_ms: Some(0),
3291            recovering: Some(true),
3292            recovery_percent: Some(10),
3293            shared_socket_available_slots: None,
3294            shared_socket_active_connections: None,
3295            shared_socket_registered_executions: None,
3296            connection_pool_available_slots: None,
3297            connection_pool_active_connections: None,
3298        };
3299
3300        assert!(!status.healthy);
3301        assert_eq!(status.status, "connection_failed");
3302        assert!(status.uptime_ms.is_none());
3303    }
3304
3305    #[test]
3306    fn test_health_status_debug_format() {
3307        use super::super::health::HealthStatus;
3308
3309        let status = HealthStatus {
3310            healthy: true,
3311            status: "test".to_string(),
3312            uptime_ms: Some(100),
3313            memory: None,
3314            pool_completed: None,
3315            pool_queued: None,
3316            success_rate: None,
3317            circuit_state: None,
3318            avg_response_time_ms: None,
3319            recovering: None,
3320            recovery_percent: None,
3321            shared_socket_available_slots: None,
3322            shared_socket_active_connections: None,
3323            shared_socket_registered_executions: None,
3324            connection_pool_available_slots: None,
3325            connection_pool_active_connections: None,
3326        };
3327
3328        let debug_str = format!("{:?}", status);
3329        assert!(debug_str.contains("healthy: true"));
3330        assert!(debug_str.contains("test"));
3331    }
3332
3333    #[test]
3334    fn test_health_status_clone() {
3335        use super::super::health::HealthStatus;
3336
3337        let status = HealthStatus {
3338            healthy: true,
3339            status: "original".to_string(),
3340            uptime_ms: Some(500),
3341            memory: Some(100),
3342            pool_completed: Some(10),
3343            pool_queued: Some(1),
3344            success_rate: Some(0.95),
3345            circuit_state: Some("closed".to_string()),
3346            avg_response_time_ms: Some(25),
3347            recovering: Some(false),
3348            recovery_percent: Some(100),
3349            shared_socket_available_slots: Some(50),
3350            shared_socket_active_connections: Some(2),
3351            shared_socket_registered_executions: Some(1),
3352            connection_pool_available_slots: Some(25),
3353            connection_pool_active_connections: Some(1),
3354        };
3355
3356        let cloned = status.clone();
3357        assert_eq!(cloned.healthy, status.healthy);
3358        assert_eq!(cloned.status, status.status);
3359        assert_eq!(cloned.uptime_ms, status.uptime_ms);
3360    }
3361
3362    #[test]
3363    fn test_execute_request_debug() {
3364        use super::super::protocol::ExecuteRequest;
3365
3366        let request = ExecuteRequest {
3367            task_id: "debug-test".to_string(),
3368            plugin_id: "test-plugin".to_string(),
3369            compiled_code: None,
3370            plugin_path: Some("/path/to/plugin.ts".to_string()),
3371            params: serde_json::json!({"test": true}),
3372            headers: None,
3373            socket_path: "/tmp/test.sock".to_string(),
3374            http_request_id: None,
3375            timeout: None,
3376            route: None,
3377            config: None,
3378            method: None,
3379            query: None,
3380        };
3381
3382        let debug_str = format!("{:?}", request);
3383        assert!(debug_str.contains("debug-test"));
3384        assert!(debug_str.contains("test-plugin"));
3385    }
3386
3387    #[test]
3388    fn test_pool_error_debug() {
3389        use super::super::protocol::PoolError;
3390
3391        let error = PoolError {
3392            message: "Test error".to_string(),
3393            code: Some("TEST_ERR".to_string()),
3394            status: Some(400),
3395            details: Some(serde_json::json!({"info": "test"})),
3396        };
3397
3398        let debug_str = format!("{:?}", error);
3399        assert!(debug_str.contains("Test error"));
3400        assert!(debug_str.contains("TEST_ERR"));
3401    }
3402
3403    #[test]
3404    fn test_pool_response_debug() {
3405        use super::super::protocol::PoolResponse;
3406
3407        let response = PoolResponse {
3408            task_id: "resp-123".to_string(),
3409            success: true,
3410            result: Some(serde_json::json!("result")),
3411            error: None,
3412            logs: None,
3413        };
3414
3415        let debug_str = format!("{:?}", response);
3416        assert!(debug_str.contains("resp-123"));
3417        assert!(debug_str.contains("true"));
3418    }
3419
3420    #[test]
3421    fn test_pool_log_entry_debug() {
3422        use super::super::protocol::PoolLogEntry;
3423
3424        let entry = PoolLogEntry {
3425            level: "info".to_string(),
3426            message: "Test message".to_string(),
3427        };
3428
3429        let debug_str = format!("{:?}", entry);
3430        assert!(debug_str.contains("info"));
3431        assert!(debug_str.contains("Test message"));
3432    }
3433
3434    #[test]
3435    fn test_circuit_breaker_default_trait() {
3436        use super::super::health::CircuitBreaker;
3437
3438        let cb = CircuitBreaker::default();
3439        assert_eq!(cb.state(), CircuitState::Closed);
3440    }
3441
3442    #[test]
3443    fn test_circuit_breaker_set_state_all_variants() {
3444        use super::super::health::CircuitBreaker;
3445
3446        let cb = CircuitBreaker::new();
3447
3448        // Test setting all states
3449        cb.set_state(CircuitState::HalfOpen);
3450        assert_eq!(cb.state(), CircuitState::HalfOpen);
3451
3452        cb.set_state(CircuitState::Open);
3453        assert_eq!(cb.state(), CircuitState::Open);
3454
3455        cb.set_state(CircuitState::Closed);
3456        assert_eq!(cb.state(), CircuitState::Closed);
3457    }
3458
3459    #[test]
3460    fn test_circuit_breaker_failure_rate_triggers_open() {
3461        use super::super::health::CircuitBreaker;
3462
3463        let cb = CircuitBreaker::new();
3464
3465        // Record enough failures to trigger circuit opening
3466        for _ in 0..100 {
3467            cb.record_failure();
3468        }
3469
3470        assert_eq!(cb.state(), CircuitState::Open);
3471    }
3472
3473    #[test]
3474    fn test_circuit_breaker_low_failure_rate_stays_closed() {
3475        use super::super::health::CircuitBreaker;
3476
3477        let cb = CircuitBreaker::new();
3478
3479        // Record mostly successes with few failures
3480        for _ in 0..90 {
3481            cb.record_success(50);
3482        }
3483        for _ in 0..10 {
3484            cb.record_failure();
3485        }
3486
3487        // Should still be closed (10% failure rate)
3488        assert_eq!(cb.state(), CircuitState::Closed);
3489    }
3490
3491    #[test]
3492    fn test_circuit_breaker_ema_response_time() {
3493        use super::super::health::CircuitBreaker;
3494
3495        let cb = CircuitBreaker::new();
3496
3497        // Record several response times
3498        cb.record_success(100);
3499        let avg1 = cb.avg_response_time();
3500
3501        cb.record_success(100);
3502        cb.record_success(100);
3503        cb.record_success(100);
3504        let avg2 = cb.avg_response_time();
3505
3506        // Average should stabilize around 100
3507        assert!(avg1 > 0);
3508        assert!(avg2 > 0);
3509        assert!(avg2 <= 100);
3510    }
3511
3512    #[test]
3513    fn test_circuit_breaker_force_close_resets_counters() {
3514        use super::super::health::CircuitBreaker;
3515
3516        let cb = CircuitBreaker::new();
3517        cb.set_state(CircuitState::Open);
3518
3519        cb.force_close();
3520
3521        assert_eq!(cb.state(), CircuitState::Closed);
3522    }
3523
3524    #[test]
3525    fn test_process_status_debug() {
3526        use super::super::health::ProcessStatus;
3527
3528        assert_eq!(format!("{:?}", ProcessStatus::Running), "Running");
3529        assert_eq!(format!("{:?}", ProcessStatus::Exited), "Exited");
3530        assert_eq!(format!("{:?}", ProcessStatus::Unknown), "Unknown");
3531        assert_eq!(format!("{:?}", ProcessStatus::NoProcess), "NoProcess");
3532    }
3533
3534    #[test]
3535    fn test_process_status_clone() {
3536        use super::super::health::ProcessStatus;
3537
3538        let status = ProcessStatus::Running;
3539        let cloned = status;
3540        assert_eq!(status, cloned);
3541    }
3542
3543    // ============================================
3544    // Additional coverage tests - DeadServerIndicator
3545    // ============================================
3546
3547    #[test]
3548    fn test_dead_server_indicator_all_variants() {
3549        use super::super::health::DeadServerIndicator;
3550
3551        // Test all enum variants exist and are properly matched
3552        let variants = [
3553            ("eof while parsing", DeadServerIndicator::EofWhileParsing),
3554            ("broken pipe", DeadServerIndicator::BrokenPipe),
3555            ("connection refused", DeadServerIndicator::ConnectionRefused),
3556            ("connection reset", DeadServerIndicator::ConnectionReset),
3557            ("not connected", DeadServerIndicator::NotConnected),
3558            ("failed to connect", DeadServerIndicator::FailedToConnect),
3559            (
3560                "socket file missing",
3561                DeadServerIndicator::SocketFileMissing,
3562            ),
3563            ("no such file", DeadServerIndicator::NoSuchFile),
3564            (
3565                "connection timed out",
3566                DeadServerIndicator::ConnectionTimedOut,
3567            ),
3568            ("connect timed out", DeadServerIndicator::ConnectionTimedOut),
3569        ];
3570
3571        for (pattern, expected) in variants {
3572            let result = DeadServerIndicator::from_error_str(pattern);
3573            assert_eq!(result, Some(expected), "Pattern '{}' should match", pattern);
3574        }
3575    }
3576
3577    #[test]
3578    fn test_dead_server_indicator_debug_format() {
3579        use super::super::health::DeadServerIndicator;
3580
3581        let indicator = DeadServerIndicator::BrokenPipe;
3582        let debug_str = format!("{:?}", indicator);
3583        assert_eq!(debug_str, "BrokenPipe");
3584    }
3585
3586    #[test]
3587    fn test_dead_server_indicator_clone_copy() {
3588        use super::super::health::DeadServerIndicator;
3589
3590        let indicator = DeadServerIndicator::ConnectionRefused;
3591        let cloned = indicator;
3592        assert_eq!(indicator, cloned);
3593    }
3594
3595    #[test]
3596    fn test_result_ring_buffer_not_enough_data() {
3597        use super::super::health::ResultRingBuffer;
3598
3599        let buffer = ResultRingBuffer::new(100);
3600
3601        // Record less than 10 results
3602        for _ in 0..9 {
3603            buffer.record(false);
3604        }
3605
3606        // Should return 0.0 because not enough data
3607        assert_eq!(buffer.failure_rate(), 0.0);
3608    }
3609
3610    #[test]
3611    fn test_result_ring_buffer_exactly_10_samples() {
3612        use super::super::health::ResultRingBuffer;
3613
3614        let buffer = ResultRingBuffer::new(100);
3615
3616        // Record exactly 10 failures
3617        for _ in 0..10 {
3618            buffer.record(false);
3619        }
3620
3621        // Should return 1.0 (100% failure)
3622        assert_eq!(buffer.failure_rate(), 1.0);
3623    }
3624
3625    #[test]
3626    fn test_result_ring_buffer_wraps_correctly() {
3627        use super::super::health::ResultRingBuffer;
3628
3629        let buffer = ResultRingBuffer::new(10);
3630
3631        // Fill buffer with successes
3632        for _ in 0..10 {
3633            buffer.record(true);
3634        }
3635        assert_eq!(buffer.failure_rate(), 0.0);
3636
3637        // Overwrite with failures
3638        for _ in 0..10 {
3639            buffer.record(false);
3640        }
3641        assert_eq!(buffer.failure_rate(), 1.0);
3642    }
3643
3644    #[test]
3645    fn test_circuit_state_equality_all_pairs() {
3646        assert_eq!(CircuitState::Closed, CircuitState::Closed);
3647        assert_eq!(CircuitState::HalfOpen, CircuitState::HalfOpen);
3648        assert_eq!(CircuitState::Open, CircuitState::Open);
3649
3650        assert_ne!(CircuitState::Closed, CircuitState::HalfOpen);
3651        assert_ne!(CircuitState::Closed, CircuitState::Open);
3652        assert_ne!(CircuitState::HalfOpen, CircuitState::Open);
3653    }
3654
3655    #[test]
3656    fn test_circuit_state_clone_copy() {
3657        let state = CircuitState::HalfOpen;
3658        let copied = state;
3659        assert_eq!(state, copied);
3660    }
3661
3662    #[test]
3663    fn test_parse_pool_response_with_null_values() {
3664        use super::super::protocol::PoolResponse;
3665
3666        let response = PoolResponse {
3667            task_id: "null-test".to_string(),
3668            success: true,
3669            result: Some(serde_json::json!(null)),
3670            error: None,
3671            logs: None,
3672        };
3673
3674        let result = PoolManager::parse_pool_response(response).unwrap();
3675        assert_eq!(result.return_value, "null");
3676    }
3677
3678    #[test]
3679    fn test_parse_pool_response_with_nested_result() {
3680        use super::super::protocol::PoolResponse;
3681
3682        let response = PoolResponse {
3683            task_id: "nested-test".to_string(),
3684            success: true,
3685            result: Some(serde_json::json!({
3686                "level1": {
3687                    "level2": {
3688                        "level3": "deep value"
3689                    }
3690                }
3691            })),
3692            error: None,
3693            logs: None,
3694        };
3695
3696        let result = PoolManager::parse_pool_response(response).unwrap();
3697        assert!(result.return_value.contains("level1"));
3698        assert!(result.return_value.contains("level2"));
3699        assert!(result.return_value.contains("level3"));
3700        assert!(result.return_value.contains("deep value"));
3701    }
3702
3703    #[test]
3704    fn test_parse_pool_response_error_with_details() {
3705        use super::super::protocol::{PoolError, PoolResponse};
3706
3707        let response = PoolResponse {
3708            task_id: "error-details".to_string(),
3709            success: false,
3710            result: None,
3711            error: Some(PoolError {
3712                message: "Error with details".to_string(),
3713                code: Some("DETAILED_ERROR".to_string()),
3714                status: Some(422),
3715                details: Some(serde_json::json!({
3716                    "field": "email",
3717                    "expected": "string",
3718                    "received": "number"
3719                })),
3720            }),
3721            logs: None,
3722        };
3723
3724        let err = PoolManager::parse_pool_response(response).unwrap_err();
3725        match err {
3726            PluginError::HandlerError(payload) => {
3727                assert_eq!(payload.message, "Error with details");
3728                assert_eq!(payload.code, Some("DETAILED_ERROR".to_string()));
3729                assert!(payload.details.is_some());
3730                let details = payload.details.unwrap();
3731                assert_eq!(details.get("field").unwrap(), "email");
3732            }
3733            _ => panic!("Expected HandlerError"),
3734        }
3735    }
3736
3737    #[test]
3738    fn test_parse_health_result_with_all_optional_fields() {
3739        let json = serde_json::json!({
3740            "status": "healthy",
3741            "uptime": 999999,
3742            "memory": {
3743                "heapUsed": 123456789,
3744                "heapTotal": 987654321,
3745                "external": 111111,
3746                "arrayBuffers": 222222
3747            },
3748            "pool": {
3749                "completed": 50000,
3750                "queued": 100,
3751                "active": 50,
3752                "waiting": 25
3753            },
3754            "execution": {
3755                "successRate": 0.9999,
3756                "avgDuration": 45.5,
3757                "totalExecutions": 100000
3758            }
3759        });
3760
3761        let result = PoolManager::parse_health_result(&json);
3762        assert_eq!(result.status, "healthy");
3763        assert_eq!(result.uptime_ms, Some(999999));
3764        assert_eq!(result.memory, Some(123456789));
3765        assert_eq!(result.pool_completed, Some(50000));
3766        assert_eq!(result.pool_queued, Some(100));
3767        assert!((result.success_rate.unwrap() - 0.9999).abs() < 0.0001);
3768    }
3769
3770    #[tokio::test]
3771    async fn test_pool_manager_max_queue_size() {
3772        let manager = PoolManager::new();
3773        // max_queue_size should be set from config
3774        assert!(manager.max_queue_size > 0);
3775    }
3776
3777    #[tokio::test]
3778    async fn test_pool_manager_last_restart_time_initial() {
3779        let manager = PoolManager::new();
3780        assert_eq!(manager.last_restart_time_ms.load(Ordering::Relaxed), 0);
3781    }
3782
3783    #[tokio::test]
3784    async fn test_pool_manager_connection_pool_exists() {
3785        let manager = PoolManager::new();
3786        // Connection pool should be initialized
3787        let available = manager.connection_pool.semaphore.available_permits();
3788        assert!(available > 0);
3789    }
3790
3791    #[test]
3792    fn test_is_dead_server_error_with_whitespace() {
3793        // Patterns with extra whitespace
3794        let err = PluginError::SocketError("  connection refused  ".to_string());
3795        assert!(PoolManager::is_dead_server_error(&err));
3796
3797        let err = PluginError::SocketError("error: broken pipe occurred".to_string());
3798        assert!(PoolManager::is_dead_server_error(&err));
3799    }
3800
3801    #[test]
3802    fn test_is_dead_server_error_multiline() {
3803        // Multiline error messages
3804        let err = PluginError::SocketError(
3805            "Error occurred\nConnection refused\nPlease retry".to_string(),
3806        );
3807        assert!(PoolManager::is_dead_server_error(&err));
3808    }
3809
3810    #[test]
3811    fn test_is_dead_server_error_json_in_message() {
3812        // Error with JSON content
3813        let err = PluginError::PluginExecutionError(
3814            r#"{"error": "connection refused", "code": 61}"#.to_string(),
3815        );
3816        assert!(PoolManager::is_dead_server_error(&err));
3817    }
3818
3819    #[test]
3820    fn test_format_return_value_special_json() {
3821        // Test with special JSON values
3822        let value = Some(serde_json::json!(f64::MAX));
3823        let result = PoolManager::format_return_value(value);
3824        assert!(!result.is_empty());
3825
3826        let value = Some(serde_json::json!(i64::MIN));
3827        let result = PoolManager::format_return_value(value);
3828        assert!(result.contains("-"));
3829    }
3830
3831    #[test]
3832    fn test_format_return_value_with_escaped_chars() {
3833        let value = Some(serde_json::json!("line1\nline2\ttab\"quote"));
3834        let result = PoolManager::format_return_value(value);
3835        assert!(result.contains("line1"));
3836        assert!(result.contains("line2"));
3837    }
3838
3839    #[test]
3840    fn test_format_return_value_array_of_objects() {
3841        let value = Some(serde_json::json!([
3842            {"id": 1, "name": "first"},
3843            {"id": 2, "name": "second"}
3844        ]));
3845        let result = PoolManager::format_return_value(value);
3846        assert!(result.contains("first"));
3847        assert!(result.contains("second"));
3848    }
3849
3850    #[test]
3851    fn test_all_log_levels_conversion() {
3852        use super::super::protocol::PoolLogEntry;
3853
3854        let levels = [
3855            ("log", LogLevel::Log),
3856            ("debug", LogLevel::Debug),
3857            ("info", LogLevel::Info),
3858            ("warn", LogLevel::Warn),
3859            ("error", LogLevel::Error),
3860            ("result", LogLevel::Result),
3861            ("unknown_level", LogLevel::Log), // Unknown defaults to Log
3862            ("LOG", LogLevel::Log),           // Case matters - uppercase goes to default
3863            ("", LogLevel::Log),              // Empty string goes to default
3864        ];
3865
3866        for (input, expected) in levels {
3867            let entry = PoolLogEntry {
3868                level: input.to_string(),
3869                message: "test".to_string(),
3870            };
3871            let log_entry: LogEntry = entry.into();
3872            assert_eq!(
3873                log_entry.level, expected,
3874                "Level '{}' should convert to {:?}",
3875                input, expected
3876            );
3877        }
3878    }
3879
3880    #[tokio::test]
3881    async fn test_pool_manager_health_check_flag_manipulation() {
3882        let manager = PoolManager::new();
3883
3884        manager.health_check_needed.store(true, Ordering::Relaxed);
3885        assert!(manager.health_check_needed.load(Ordering::Relaxed));
3886
3887        manager.health_check_needed.store(false, Ordering::Relaxed);
3888        assert!(!manager.health_check_needed.load(Ordering::Relaxed));
3889    }
3890
3891    #[tokio::test]
3892    async fn test_pool_manager_consecutive_failures_manipulation() {
3893        let manager = PoolManager::new();
3894
3895        manager.consecutive_failures.fetch_add(1, Ordering::Relaxed);
3896        assert_eq!(manager.consecutive_failures.load(Ordering::Relaxed), 1);
3897
3898        manager.consecutive_failures.fetch_add(5, Ordering::Relaxed);
3899        assert_eq!(manager.consecutive_failures.load(Ordering::Relaxed), 6);
3900
3901        manager.consecutive_failures.store(0, Ordering::Relaxed);
3902        assert_eq!(manager.consecutive_failures.load(Ordering::Relaxed), 0);
3903    }
3904
3905    #[test]
3906    fn test_parsed_health_result_with_all_none() {
3907        let result = ParsedHealthResult {
3908            status: "minimal".to_string(),
3909            uptime_ms: None,
3910            memory: None,
3911            pool_completed: None,
3912            pool_queued: None,
3913            success_rate: None,
3914        };
3915
3916        assert_eq!(result.status, "minimal");
3917        assert!(result.uptime_ms.is_none());
3918        assert!(result.memory.is_none());
3919    }
3920
3921    #[test]
3922    fn test_parsed_health_result_with_all_some() {
3923        let result = ParsedHealthResult {
3924            status: "complete".to_string(),
3925            uptime_ms: Some(u64::MAX),
3926            memory: Some(u64::MAX),
3927            pool_completed: Some(u64::MAX),
3928            pool_queued: Some(u64::MAX),
3929            success_rate: Some(1.0),
3930        };
3931
3932        assert_eq!(result.status, "complete");
3933        assert_eq!(result.uptime_ms, Some(u64::MAX));
3934        assert_eq!(result.success_rate, Some(1.0));
3935    }
3936
3937    #[test]
3938    fn test_calculate_heap_size_extensive_values() {
3939        // Test many different concurrency values
3940        let test_cases = [
3941            (0, 512),
3942            (1, 512),
3943            (5, 512),
3944            (9, 512),
3945            (10, 544),
3946            (11, 544),
3947            (19, 544),
3948            (20, 576),
3949            (50, 672),
3950            (100, 832),
3951            (150, 992),
3952            (200, 1152),
3953            (250, 1312),
3954            (300, 1472),
3955            (400, 1792),
3956            (500, 2112),
3957            (1000, 3712),
3958            (2000, 6912),
3959            (2400, 8192),  // At cap
3960            (3000, 8192),  // Capped
3961            (5000, 8192),  // Capped
3962            (10000, 8192), // Capped
3963        ];
3964
3965        for (concurrency, expected_heap) in test_cases {
3966            let heap = PoolManager::calculate_heap_size(concurrency);
3967            assert_eq!(
3968                heap, expected_heap,
3969                "Concurrency {} should give heap {}",
3970                concurrency, expected_heap
3971            );
3972        }
3973    }
3974
3975    #[tokio::test]
3976    async fn test_pool_manager_drop_cleans_socket() {
3977        let socket_path = format!("/tmp/test-drop-{}.sock", uuid::Uuid::new_v4());
3978
3979        // Create a file at the socket path
3980        std::fs::write(&socket_path, "test").unwrap();
3981        assert!(std::path::Path::new(&socket_path).exists());
3982
3983        // Create manager with this socket path
3984        {
3985            let _manager = PoolManager::with_socket_path(socket_path.clone());
3986            // Manager exists here
3987        }
3988        // Manager dropped here - should clean up socket
3989
3990        // Socket should be removed
3991        assert!(!std::path::Path::new(&socket_path).exists());
3992    }
3993
3994    #[test]
3995    fn test_script_result_with_traces() {
3996        let result = ScriptResult {
3997            logs: vec![],
3998            error: String::new(),
3999            return_value: "with traces".to_string(),
4000            trace: vec![
4001                serde_json::json!({"action": "GET", "url": "/api/test"}),
4002                serde_json::json!({"action": "POST", "url": "/api/submit"}),
4003            ],
4004        };
4005
4006        assert_eq!(result.trace.len(), 2);
4007        assert!(result.trace[0].get("action").is_some());
4008    }
4009
4010    #[test]
4011    fn test_script_result_with_error() {
4012        let result = ScriptResult {
4013            logs: vec![LogEntry {
4014                level: LogLevel::Error,
4015                message: "Something went wrong".to_string(),
4016            }],
4017            error: "RuntimeError: undefined is not a function".to_string(),
4018            return_value: String::new(),
4019            trace: vec![],
4020        };
4021
4022        assert!(!result.error.is_empty());
4023        assert!(result.error.contains("RuntimeError"));
4024        assert_eq!(result.logs.len(), 1);
4025    }
4026
4027    #[test]
4028    fn test_plugin_handler_payload_with_traces() {
4029        let payload = PluginHandlerPayload {
4030            message: "Error with traces".to_string(),
4031            status: 500,
4032            code: None,
4033            details: None,
4034            logs: None,
4035            traces: Some(vec![
4036                serde_json::json!({"method": "GET", "path": "/health"}),
4037                serde_json::json!({"method": "POST", "path": "/execute"}),
4038            ]),
4039        };
4040
4041        assert!(payload.traces.is_some());
4042        assert_eq!(payload.traces.as_ref().unwrap().len(), 2);
4043    }
4044
4045    #[test]
4046    fn test_queued_request_all_optional_fields() {
4047        let (tx, _rx) = oneshot::channel();
4048
4049        let mut headers = HashMap::new();
4050        headers.insert(
4051            "X-Custom".to_string(),
4052            vec!["value1".to_string(), "value2".to_string()],
4053        );
4054
4055        let request = QueuedRequest {
4056            plugin_id: "full-request".to_string(),
4057            compiled_code: Some("compiled code here".to_string()),
4058            plugin_path: Some("/path/to/plugin.ts".to_string()),
4059            params: serde_json::json!({"key": "value", "number": 42}),
4060            headers: Some(headers),
4061            socket_path: "/tmp/full.sock".to_string(),
4062            http_request_id: Some("http-123".to_string()),
4063            timeout_secs: Some(60),
4064            route: Some("/api/v1/execute".to_string()),
4065            config: Some(serde_json::json!({"setting": true})),
4066            method: Some("PUT".to_string()),
4067            query: Some(serde_json::json!({"page": 1, "limit": 10})),
4068            response_tx: tx,
4069        };
4070
4071        assert_eq!(request.plugin_id, "full-request");
4072        assert!(request.compiled_code.is_some());
4073        assert!(request.plugin_path.is_some());
4074        assert!(request.headers.is_some());
4075        assert_eq!(request.timeout_secs, Some(60));
4076        assert_eq!(request.method, Some("PUT".to_string()));
4077    }
4078}