1use std::collections::HashMap;
10use std::process::Stdio;
11use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14use tokio::io::{AsyncBufReadExt, BufReader};
15use tokio::process::{Child, Command};
16use tokio::sync::oneshot;
17use uuid::Uuid;
18
19use super::config::get_config;
20use super::connection::{ConnectionPool, PoolConnection};
21use super::health::{
22 CircuitBreaker, CircuitState, DeadServerIndicator, HealthStatus, ProcessStatus,
23};
24use super::protocol::{PoolError, PoolRequest, PoolResponse};
25use super::shared_socket::get_shared_socket_service;
26use super::{LogEntry, PluginError, PluginHandlerPayload, ScriptResult};
27
28struct QueuedRequest {
30 plugin_id: String,
31 compiled_code: Option<String>,
32 plugin_path: Option<String>,
33 params: serde_json::Value,
34 headers: Option<HashMap<String, Vec<String>>>,
35 socket_path: String,
36 http_request_id: Option<String>,
37 timeout_secs: Option<u64>,
38 route: Option<String>,
39 config: Option<serde_json::Value>,
40 method: Option<String>,
41 query: Option<serde_json::Value>,
42 response_tx: oneshot::Sender<Result<ScriptResult, PluginError>>,
43}
44
45#[derive(Debug, Default, PartialEq)]
50pub struct ParsedHealthResult {
51 pub status: String,
52 pub uptime_ms: Option<u64>,
53 pub memory: Option<u64>,
54 pub pool_completed: Option<u64>,
55 pub pool_queued: Option<u64>,
56 pub success_rate: Option<f64>,
57}
58
59pub struct PoolManager {
61 socket_path: String,
62 process: tokio::sync::Mutex<Option<Child>>,
63 initialized: Arc<AtomicBool>,
64 restart_lock: tokio::sync::Mutex<()>,
66 connection_pool: Arc<ConnectionPool>,
68 request_tx: async_channel::Sender<QueuedRequest>,
70 max_queue_size: usize,
72 health_check_needed: Arc<AtomicBool>,
74 consecutive_failures: Arc<AtomicU32>,
76 circuit_breaker: Arc<CircuitBreaker>,
78 last_restart_time_ms: Arc<AtomicU64>,
80 recovery_mode: Arc<AtomicBool>,
82 recovery_allowance: Arc<AtomicU32>,
84 shutdown_signal: Arc<tokio::sync::Notify>,
86}
87
88impl Default for PoolManager {
89 fn default() -> Self {
90 Self::new()
91 }
92}
93
94impl PoolManager {
95 const BASE_HEAP_MB: usize = 512;
98
99 const CONCURRENCY_DIVISOR: usize = 10;
102
103 const HEAP_INCREMENT_PER_DIVISOR_MB: usize = 32;
107
108 const MAX_HEAP_MB: usize = 8192;
112
113 pub fn calculate_heap_size(max_concurrency: usize) -> usize {
120 let calculated = Self::BASE_HEAP_MB
121 + ((max_concurrency / Self::CONCURRENCY_DIVISOR) * Self::HEAP_INCREMENT_PER_DIVISOR_MB);
122 calculated.min(Self::MAX_HEAP_MB)
123 }
124
125 pub fn format_return_value(value: Option<serde_json::Value>) -> String {
130 value
131 .map(|v| {
132 if v.is_string() {
133 v.as_str().unwrap_or("").to_string()
134 } else {
135 serde_json::to_string(&v).unwrap_or_default()
136 }
137 })
138 .unwrap_or_default()
139 }
140
141 pub fn parse_success_response(response: PoolResponse) -> ScriptResult {
145 let logs: Vec<LogEntry> = response
146 .logs
147 .map(|logs| logs.into_iter().map(|l| l.into()).collect())
148 .unwrap_or_default();
149
150 ScriptResult {
151 logs,
152 error: String::new(),
153 return_value: Self::format_return_value(response.result),
154 trace: Vec::new(),
155 }
156 }
157
158 pub fn parse_error_response(response: PoolResponse) -> PluginError {
162 let logs: Vec<LogEntry> = response
163 .logs
164 .map(|logs| logs.into_iter().map(|l| l.into()).collect())
165 .unwrap_or_default();
166
167 let error = response.error.unwrap_or(PoolError {
168 message: "Unknown error".to_string(),
169 code: None,
170 status: None,
171 details: None,
172 });
173
174 PluginError::HandlerError(Box::new(PluginHandlerPayload {
175 message: error.message,
176 status: error.status.unwrap_or(500),
177 code: error.code,
178 details: error.details,
179 logs: Some(logs),
180 traces: None,
181 }))
182 }
183
184 pub fn parse_pool_response(response: PoolResponse) -> Result<ScriptResult, PluginError> {
189 if response.success {
190 Ok(Self::parse_success_response(response))
191 } else {
192 Err(Self::parse_error_response(response))
193 }
194 }
195
196 pub fn parse_health_result(result: &serde_json::Value) -> ParsedHealthResult {
201 ParsedHealthResult {
202 status: result
203 .get("status")
204 .and_then(|v| v.as_str())
205 .unwrap_or("unknown")
206 .to_string(),
207 uptime_ms: result.get("uptime").and_then(|v| v.as_u64()),
208 memory: result
209 .get("memory")
210 .and_then(|v| v.get("heapUsed"))
211 .and_then(|v| v.as_u64()),
212 pool_completed: result
213 .get("pool")
214 .and_then(|v| v.get("completed"))
215 .and_then(|v| v.as_u64()),
216 pool_queued: result
217 .get("pool")
218 .and_then(|v| v.get("queued"))
219 .and_then(|v| v.as_u64()),
220 success_rate: result
221 .get("execution")
222 .and_then(|v| v.get("successRate"))
223 .and_then(|v| v.as_f64()),
224 }
225 }
226
227 pub fn new() -> Self {
229 Self::init(format!("/tmp/relayer-plugin-pool-{}.sock", Uuid::new_v4()))
230 }
231
232 pub fn with_socket_path(socket_path: String) -> Self {
234 Self::init(socket_path)
235 }
236
237 fn init(socket_path: String) -> Self {
239 let config = get_config();
240 let max_connections = config.pool_max_connections;
241 let max_queue_size = config.pool_max_queue_size;
242
243 let (tx, rx) = async_channel::bounded(max_queue_size);
244
245 let connection_pool = Arc::new(ConnectionPool::new(socket_path.clone(), max_connections));
246 let connection_pool_clone = connection_pool.clone();
247
248 let shutdown_signal = Arc::new(tokio::sync::Notify::new());
249
250 Self::spawn_queue_workers(
251 rx,
252 connection_pool_clone,
253 config.pool_workers,
254 shutdown_signal.clone(),
255 );
256
257 let health_check_needed = Arc::new(AtomicBool::new(false));
258 let consecutive_failures = Arc::new(AtomicU32::new(0));
259 let circuit_breaker = Arc::new(CircuitBreaker::new());
260 let last_restart_time_ms = Arc::new(AtomicU64::new(0));
261 let recovery_mode = Arc::new(AtomicBool::new(false));
262 let recovery_allowance = Arc::new(AtomicU32::new(0));
263
264 Self::spawn_health_check_task(
265 health_check_needed.clone(),
266 config.health_check_interval_secs,
267 shutdown_signal.clone(),
268 );
269
270 Self::spawn_recovery_task(
271 recovery_mode.clone(),
272 recovery_allowance.clone(),
273 shutdown_signal.clone(),
274 );
275
276 Self {
277 connection_pool,
278 socket_path,
279 process: tokio::sync::Mutex::new(None),
280 initialized: Arc::new(AtomicBool::new(false)),
281 restart_lock: tokio::sync::Mutex::new(()),
282 request_tx: tx,
283 max_queue_size,
284 health_check_needed,
285 consecutive_failures,
286 circuit_breaker,
287 last_restart_time_ms,
288 recovery_mode,
289 recovery_allowance,
290 shutdown_signal,
291 }
292 }
293
294 fn spawn_recovery_task(
296 recovery_mode: Arc<AtomicBool>,
297 recovery_allowance: Arc<AtomicU32>,
298 shutdown_signal: Arc<tokio::sync::Notify>,
299 ) {
300 tokio::spawn(async move {
301 let mut interval = tokio::time::interval(Duration::from_millis(500));
302 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
303
304 loop {
305 tokio::select! {
306 biased;
307
308 _ = shutdown_signal.notified() => {
309 tracing::debug!("Recovery task received shutdown signal");
310 break;
311 }
312
313 _ = interval.tick() => {
314 if recovery_mode.load(Ordering::Relaxed) {
315 let current = recovery_allowance.load(Ordering::Relaxed);
316 if current < 100 {
317 let new_allowance = (current + 10).min(100);
318 recovery_allowance.store(new_allowance, Ordering::Relaxed);
319 tracing::debug!(
320 allowance = new_allowance,
321 "Recovery mode: increasing request allowance"
322 );
323 } else {
324 recovery_mode.store(false, Ordering::Relaxed);
325 tracing::info!("Recovery mode complete - full capacity restored");
326 }
327 }
328 }
329 }
330 }
331 });
332 }
333
334 fn spawn_health_check_task(
336 health_check_needed: Arc<AtomicBool>,
337 interval_secs: u64,
338 shutdown_signal: Arc<tokio::sync::Notify>,
339 ) {
340 tokio::spawn(async move {
341 let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
342 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
343
344 loop {
345 tokio::select! {
346 biased;
347
348 _ = shutdown_signal.notified() => {
349 tracing::debug!("Health check task received shutdown signal");
350 break;
351 }
352
353 _ = interval.tick() => {
354 health_check_needed.store(true, Ordering::Relaxed);
355 }
356 }
357 }
358 });
359 }
360
361 fn spawn_queue_workers(
363 rx: async_channel::Receiver<QueuedRequest>,
364 connection_pool: Arc<ConnectionPool>,
365 configured_workers: usize,
366 shutdown_signal: Arc<tokio::sync::Notify>,
367 ) {
368 let num_workers = if configured_workers > 0 {
369 configured_workers
370 } else {
371 std::thread::available_parallelism()
372 .map(|n| n.get().clamp(4, 32))
373 .unwrap_or(8)
374 };
375
376 tracing::info!(num_workers = num_workers, "Starting request queue workers");
377
378 for worker_id in 0..num_workers {
379 let rx_clone = rx.clone();
380 let pool_clone = connection_pool.clone();
381 let shutdown = shutdown_signal.clone();
382
383 tokio::spawn(async move {
384 loop {
385 tokio::select! {
386 biased;
387
388 _ = shutdown.notified() => {
389 tracing::debug!(worker_id = worker_id, "Request queue worker received shutdown signal");
390 break;
391 }
392
393 request_result = rx_clone.recv() => {
394 let request = match request_result {
395 Ok(r) => r,
396 Err(_) => break,
397 };
398
399 let start = std::time::Instant::now();
400 let plugin_id = request.plugin_id.clone();
401
402 let result = Self::execute_plugin_internal(
403 &pool_clone,
404 request.plugin_id,
405 request.compiled_code,
406 request.plugin_path,
407 request.params,
408 request.headers,
409 request.socket_path,
410 request.http_request_id,
411 request.timeout_secs,
412 request.route,
413 request.config,
414 request.method,
415 request.query,
416 )
417 .await;
418
419 let elapsed = start.elapsed();
420 if let Err(ref e) = result {
421 let error_str = format!("{e:?}");
422 if error_str.contains("shutdown") || error_str.contains("Shutdown") {
423 tracing::debug!(
424 worker_id = worker_id,
425 plugin_id = %plugin_id,
426 "Plugin execution cancelled during shutdown"
427 );
428 } else {
429 tracing::warn!(
430 worker_id = worker_id,
431 plugin_id = %plugin_id,
432 elapsed_ms = elapsed.as_millis() as u64,
433 error = ?e,
434 "Plugin execution failed"
435 );
436 }
437 } else if elapsed.as_secs() > 1 {
438 tracing::debug!(
439 worker_id = worker_id,
440 plugin_id = %plugin_id,
441 elapsed_ms = elapsed.as_millis() as u64,
442 "Slow plugin execution"
443 );
444 }
445
446 let _ = request.response_tx.send(result);
447 }
448 }
449 }
450
451 tracing::debug!(worker_id = worker_id, "Request queue worker exited");
452 });
453 }
454 }
455
456 fn spawn_rate_limited_stderr_reader(stderr: tokio::process::ChildStderr) {
458 tokio::spawn(async move {
459 let reader = BufReader::new(stderr);
460 let mut lines = reader.lines();
461
462 let mut last_log_time = std::time::Instant::now();
463 let mut suppressed_count = 0u64;
464 let min_interval = Duration::from_millis(100);
465
466 while let Ok(Some(line)) = lines.next_line().await {
467 let now = std::time::Instant::now();
468 let elapsed = now.duration_since(last_log_time);
469
470 if elapsed >= min_interval {
471 if suppressed_count > 0 {
472 tracing::warn!(
473 target: "pool_server",
474 suppressed = suppressed_count,
475 "... ({} lines suppressed due to rate limiting)",
476 suppressed_count
477 );
478 suppressed_count = 0;
479 }
480 tracing::error!(target: "pool_server", "{}", line);
481 last_log_time = now;
482 } else {
483 suppressed_count += 1;
484 if suppressed_count % 100 == 0 {
485 tracing::warn!(
486 target: "pool_server",
487 suppressed = suppressed_count,
488 "Pool server producing excessive stderr output"
489 );
490 }
491 }
492 }
493
494 if suppressed_count > 0 {
495 tracing::warn!(
496 target: "pool_server",
497 suppressed = suppressed_count,
498 "Pool server stderr closed ({} final lines suppressed)",
499 suppressed_count
500 );
501 }
502 });
503 }
504
505 #[allow(clippy::too_many_arguments)]
507 async fn execute_with_permit(
508 connection_pool: &Arc<ConnectionPool>,
509 permit: Option<tokio::sync::OwnedSemaphorePermit>,
510 plugin_id: String,
511 compiled_code: Option<String>,
512 plugin_path: Option<String>,
513 params: serde_json::Value,
514 headers: Option<HashMap<String, Vec<String>>>,
515 socket_path: String,
516 http_request_id: Option<String>,
517 timeout_secs: Option<u64>,
518 route: Option<String>,
519 config: Option<serde_json::Value>,
520 method: Option<String>,
521 query: Option<serde_json::Value>,
522 ) -> Result<ScriptResult, PluginError> {
523 let mut conn = connection_pool.acquire_with_permit(permit).await?;
524
525 let request = PoolRequest::Execute(Box::new(super::protocol::ExecuteRequest {
526 task_id: Uuid::new_v4().to_string(),
527 plugin_id: plugin_id.clone(),
528 compiled_code,
529 plugin_path,
530 params,
531 headers,
532 socket_path,
533 http_request_id,
534 timeout: timeout_secs.map(|s| s * 1000),
535 route,
536 config,
537 method,
538 query,
539 }));
540
541 let timeout = timeout_secs.unwrap_or(get_config().pool_request_timeout_secs);
542 let response = conn.send_request_with_timeout(&request, timeout).await?;
543
544 Self::parse_pool_response(response)
546 }
547
548 #[allow(clippy::too_many_arguments)]
550 async fn execute_plugin_internal(
551 connection_pool: &Arc<ConnectionPool>,
552 plugin_id: String,
553 compiled_code: Option<String>,
554 plugin_path: Option<String>,
555 params: serde_json::Value,
556 headers: Option<HashMap<String, Vec<String>>>,
557 socket_path: String,
558 http_request_id: Option<String>,
559 timeout_secs: Option<u64>,
560 route: Option<String>,
561 config: Option<serde_json::Value>,
562 method: Option<String>,
563 query: Option<serde_json::Value>,
564 ) -> Result<ScriptResult, PluginError> {
565 Self::execute_with_permit(
566 connection_pool,
567 None,
568 plugin_id,
569 compiled_code,
570 plugin_path,
571 params,
572 headers,
573 socket_path,
574 http_request_id,
575 timeout_secs,
576 route,
577 config,
578 method,
579 query,
580 )
581 .await
582 }
583
584 pub async fn is_initialized(&self) -> bool {
589 self.initialized.load(Ordering::Acquire)
590 }
591
592 pub async fn ensure_started(&self) -> Result<(), PluginError> {
594 if self.initialized.load(Ordering::Acquire) {
595 return Ok(());
596 }
597
598 let _startup_guard = self.restart_lock.lock().await;
599
600 if self.initialized.load(Ordering::Acquire) {
601 return Ok(());
602 }
603
604 self.start_pool_server().await?;
605 self.initialized.store(true, Ordering::Release);
606 Ok(())
607 }
608
609 async fn ensure_started_and_healthy(&self) -> Result<(), PluginError> {
611 self.ensure_started().await?;
612
613 if !self.health_check_needed.load(Ordering::Relaxed) {
614 return Ok(());
615 }
616
617 if self
618 .health_check_needed
619 .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed)
620 .is_err()
621 {
622 return Ok(());
623 }
624
625 self.check_and_restart_if_needed().await
626 }
627
628 async fn check_and_restart_if_needed(&self) -> Result<(), PluginError> {
630 let process_status = {
632 let mut process_guard = self.process.lock().await;
633 if let Some(child) = process_guard.as_mut() {
634 match child.try_wait() {
635 Ok(Some(exit_status)) => {
636 tracing::warn!(
637 exit_status = ?exit_status,
638 "Pool server process has exited"
639 );
640 *process_guard = None;
641 ProcessStatus::Exited
642 }
643 Ok(None) => ProcessStatus::Running,
644 Err(e) => {
645 tracing::warn!(
646 error = %e,
647 "Failed to check pool server process status, assuming dead"
648 );
649 *process_guard = None;
650 ProcessStatus::Unknown
651 }
652 }
653 } else {
654 ProcessStatus::NoProcess
655 }
656 };
657
658 let needs_restart = match process_status {
660 ProcessStatus::Running => {
661 let socket_exists = std::path::Path::new(&self.socket_path).exists();
662 if !socket_exists {
663 tracing::warn!(
664 socket_path = %self.socket_path,
665 "Pool server socket file missing, needs restart"
666 );
667 true
668 } else {
669 false
670 }
671 }
672 ProcessStatus::Exited | ProcessStatus::Unknown | ProcessStatus::NoProcess => {
673 tracing::warn!("Pool server not running, needs restart");
674 true
675 }
676 };
677
678 if needs_restart {
680 let _restart_guard = self.restart_lock.lock().await;
681 self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
682 self.restart_internal().await?;
683 self.consecutive_failures.store(0, Ordering::Relaxed);
684 }
685
686 Ok(())
687 }
688
689 async fn cleanup_socket_file(socket_path: &str) {
691 let max_cleanup_attempts = 5;
692 let mut attempts = 0;
693
694 while attempts < max_cleanup_attempts {
695 match std::fs::remove_file(socket_path) {
696 Ok(_) => break,
697 Err(e) if e.kind() == std::io::ErrorKind::NotFound => break,
698 Err(e) => {
699 attempts += 1;
700 if attempts >= max_cleanup_attempts {
701 tracing::warn!(
702 socket_path = %socket_path,
703 error = %e,
704 "Failed to remove socket file after {} attempts, proceeding anyway",
705 max_cleanup_attempts
706 );
707 break;
708 }
709 let delay_ms = 10 * (1 << attempts.min(3));
710 tokio::time::sleep(Duration::from_millis(delay_ms)).await;
711 }
712 }
713 }
714
715 tokio::time::sleep(Duration::from_millis(50)).await;
716 }
717
718 async fn spawn_pool_server_process(
720 socket_path: &str,
721 context: &str,
722 ) -> Result<Child, PluginError> {
723 let pool_server_path = std::env::current_dir()
724 .map(|cwd| cwd.join("plugins/lib/pool-server.ts").display().to_string())
725 .unwrap_or_else(|_| "plugins/lib/pool-server.ts".to_string());
726
727 let config = get_config();
728
729 let pool_server_heap_mb = Self::calculate_heap_size(config.max_concurrency);
731
732 let uncapped_heap = Self::BASE_HEAP_MB
734 + ((config.max_concurrency / Self::CONCURRENCY_DIVISOR)
735 * Self::HEAP_INCREMENT_PER_DIVISOR_MB);
736 if uncapped_heap > Self::MAX_HEAP_MB {
737 tracing::warn!(
738 calculated_heap_mb = uncapped_heap,
739 capped_heap_mb = pool_server_heap_mb,
740 max_concurrency = config.max_concurrency,
741 "Pool server heap calculation exceeded 8GB cap"
742 );
743 }
744
745 tracing::info!(
746 socket_path = %socket_path,
747 heap_mb = pool_server_heap_mb,
748 max_concurrency = config.max_concurrency,
749 context = context,
750 "Spawning plugin pool server"
751 );
752
753 let node_options = format!("--max-old-space-size={pool_server_heap_mb} --expose-gc");
754
755 let mut child = Command::new("ts-node")
756 .arg("--transpile-only")
757 .arg(&pool_server_path)
758 .arg(socket_path)
759 .env("NODE_OPTIONS", node_options)
760 .env("PLUGIN_MAX_CONCURRENCY", config.max_concurrency.to_string())
761 .env(
762 "PLUGIN_POOL_MIN_THREADS",
763 config.nodejs_pool_min_threads.to_string(),
764 )
765 .env(
766 "PLUGIN_POOL_MAX_THREADS",
767 config.nodejs_pool_max_threads.to_string(),
768 )
769 .env(
770 "PLUGIN_POOL_CONCURRENT_TASKS",
771 config.nodejs_pool_concurrent_tasks.to_string(),
772 )
773 .env(
774 "PLUGIN_POOL_IDLE_TIMEOUT",
775 config.nodejs_pool_idle_timeout_ms.to_string(),
776 )
777 .env(
778 "PLUGIN_WORKER_HEAP_MB",
779 config.nodejs_worker_heap_mb.to_string(),
780 )
781 .env(
782 "PLUGIN_POOL_SOCKET_BACKLOG",
783 config.pool_socket_backlog.to_string(),
784 )
785 .stdin(Stdio::null())
786 .stdout(Stdio::piped())
787 .stderr(Stdio::piped())
788 .spawn()
789 .map_err(|e| {
790 PluginError::PluginExecutionError(format!("Failed to {context} pool server: {e}"))
791 })?;
792
793 if let Some(stderr) = child.stderr.take() {
794 Self::spawn_rate_limited_stderr_reader(stderr);
795 }
796
797 if let Some(stdout) = child.stdout.take() {
798 let reader = BufReader::new(stdout);
799 let mut lines = reader.lines();
800
801 let timeout_result = tokio::time::timeout(Duration::from_secs(10), async {
802 while let Ok(Some(line)) = lines.next_line().await {
803 if line.contains("POOL_SERVER_READY") {
804 return Ok(());
805 }
806 }
807 Err(PluginError::PluginExecutionError(
808 "Pool server did not send ready signal".to_string(),
809 ))
810 })
811 .await;
812
813 match timeout_result {
814 Ok(Ok(())) => {
815 tracing::info!(context = context, "Plugin pool server ready");
816 }
817 Ok(Err(e)) => return Err(e),
818 Err(_) => {
819 return Err(PluginError::PluginExecutionError(format!(
820 "Timeout waiting for pool server to {context}"
821 )))
822 }
823 }
824 }
825
826 Ok(child)
827 }
828
829 async fn start_pool_server(&self) -> Result<(), PluginError> {
830 let mut process_guard = self.process.lock().await;
831
832 if process_guard.is_some() {
833 return Ok(());
834 }
835
836 Self::cleanup_socket_file(&self.socket_path).await;
837
838 let child = Self::spawn_pool_server_process(&self.socket_path, "start").await?;
839
840 *process_guard = Some(child);
841 Ok(())
842 }
843
844 #[allow(clippy::too_many_arguments)]
846 pub async fn execute_plugin(
847 &self,
848 plugin_id: String,
849 compiled_code: Option<String>,
850 plugin_path: Option<String>,
851 params: serde_json::Value,
852 headers: Option<HashMap<String, Vec<String>>>,
853 socket_path: String,
854 http_request_id: Option<String>,
855 timeout_secs: Option<u64>,
856 route: Option<String>,
857 config: Option<serde_json::Value>,
858 method: Option<String>,
859 query: Option<serde_json::Value>,
860 ) -> Result<ScriptResult, PluginError> {
861 let rid = http_request_id.as_deref().unwrap_or("unknown");
862 let effective_timeout =
863 timeout_secs.unwrap_or_else(|| get_config().pool_request_timeout_secs);
864 tracing::debug!(
865 plugin_id = %plugin_id,
866 http_request_id = %rid,
867 timeout_secs = effective_timeout,
868 "Pool execute request received"
869 );
870 let recovery_allowance = if self.recovery_mode.load(Ordering::Relaxed) {
871 Some(self.recovery_allowance.load(Ordering::Relaxed))
872 } else {
873 None
874 };
875
876 if !self
877 .circuit_breaker
878 .should_allow_request(recovery_allowance)
879 {
880 let state = self.circuit_breaker.state();
881 tracing::warn!(
882 plugin_id = %plugin_id,
883 circuit_state = ?state,
884 recovery_allowance = ?recovery_allowance,
885 "Request rejected by circuit breaker"
886 );
887 return Err(PluginError::PluginExecutionError(
888 "Plugin system temporarily unavailable due to high load. Please retry shortly."
889 .to_string(),
890 ));
891 }
892
893 let start_time = Instant::now();
894
895 self.ensure_started_and_healthy().await?;
896 tracing::debug!(
897 plugin_id = %plugin_id,
898 http_request_id = %rid,
899 "Pool execute start (healthy/started)"
900 );
901
902 let circuit_breaker = self.circuit_breaker.clone();
903 match self.connection_pool.semaphore.clone().try_acquire_owned() {
904 Ok(permit) => {
905 tracing::debug!(
906 plugin_id = %plugin_id,
907 http_request_id = %rid,
908 "Pool execute acquired connection permit (fast path)"
909 );
910 let result = Self::execute_with_permit(
911 &self.connection_pool,
912 Some(permit),
913 plugin_id,
914 compiled_code,
915 plugin_path,
916 params,
917 headers,
918 socket_path,
919 http_request_id,
920 timeout_secs,
921 route,
922 config,
923 method,
924 query,
925 )
926 .await;
927
928 let elapsed_ms = start_time.elapsed().as_millis() as u32;
929 match &result {
930 Ok(_) => circuit_breaker.record_success(elapsed_ms),
931 Err(e) => {
932 if Self::is_dead_server_error(e) {
935 circuit_breaker.record_failure();
936 tracing::warn!(
937 error = %e,
938 "Detected dead pool server error, triggering health check for restart"
939 );
940 self.health_check_needed.store(true, Ordering::Relaxed);
941 } else {
942 circuit_breaker.record_success(elapsed_ms);
944 }
945 }
946 }
947
948 tracing::debug!(
949 elapsed_ms = elapsed_ms,
950 result_ok = result.is_ok(),
951 "Pool execute finished (fast path)"
952 );
953 result
954 }
955 Err(_) => {
956 tracing::debug!(
957 plugin_id = %plugin_id,
958 http_request_id = %rid,
959 "Pool execute queueing (no permits)"
960 );
961 let (response_tx, response_rx) = oneshot::channel();
962
963 let queued_request = QueuedRequest {
964 plugin_id,
965 compiled_code,
966 plugin_path,
967 params,
968 headers,
969 socket_path,
970 http_request_id,
971 timeout_secs,
972 route,
973 config,
974 method,
975 query,
976 response_tx,
977 };
978
979 let result = match self.request_tx.try_send(queued_request) {
980 Ok(()) => {
981 let queue_len = self.request_tx.len();
982 if queue_len > self.max_queue_size / 2 {
983 tracing::warn!(
984 queue_len = queue_len,
985 max_queue_size = self.max_queue_size,
986 "Plugin queue is over 50% capacity"
987 );
988 }
989 let response_timeout = timeout_secs
991 .map(Duration::from_secs)
992 .unwrap_or(Duration::from_secs(get_config().pool_request_timeout_secs))
993 + Duration::from_secs(5); match tokio::time::timeout(response_timeout, response_rx).await {
996 Ok(Ok(result)) => result,
997 Ok(Err(_)) => Err(PluginError::PluginExecutionError(
998 "Request queue processor closed".to_string(),
999 )),
1000 Err(_) => Err(PluginError::PluginExecutionError(format!(
1001 "Request timed out after {}s waiting for worker response",
1002 response_timeout.as_secs()
1003 ))),
1004 }
1005 }
1006 Err(async_channel::TrySendError::Full(req)) => {
1007 let queue_timeout_ms = get_config().pool_queue_send_timeout_ms;
1008 let queue_timeout = Duration::from_millis(queue_timeout_ms);
1009 match tokio::time::timeout(queue_timeout, self.request_tx.send(req)).await {
1010 Ok(Ok(())) => {
1011 let queue_len = self.request_tx.len();
1012 tracing::debug!(
1013 queue_len = queue_len,
1014 "Request queued after waiting for queue space"
1015 );
1016 let response_timeout =
1018 timeout_secs.map(Duration::from_secs).unwrap_or(
1019 Duration::from_secs(get_config().pool_request_timeout_secs),
1020 ) + Duration::from_secs(5); match tokio::time::timeout(response_timeout, response_rx).await {
1023 Ok(Ok(result)) => result,
1024 Ok(Err(_)) => Err(PluginError::PluginExecutionError(
1025 "Request queue processor closed".to_string(),
1026 )),
1027 Err(_) => Err(PluginError::PluginExecutionError(format!(
1028 "Request timed out after {}s waiting for worker response",
1029 response_timeout.as_secs()
1030 ))),
1031 }
1032 }
1033 Ok(Err(async_channel::SendError(_))) => {
1034 Err(PluginError::PluginExecutionError(
1035 "Plugin execution queue is closed".to_string(),
1036 ))
1037 }
1038 Err(_) => {
1039 let queue_len = self.request_tx.len();
1040 tracing::error!(
1041 queue_len = queue_len,
1042 max_queue_size = self.max_queue_size,
1043 timeout_ms = queue_timeout.as_millis(),
1044 "Plugin execution queue is FULL - timeout waiting for space"
1045 );
1046 Err(PluginError::PluginExecutionError(format!(
1047 "Plugin execution queue is full (max: {}) and timeout waiting for space. \
1048 Consider increasing PLUGIN_POOL_MAX_QUEUE_SIZE or PLUGIN_POOL_MAX_CONNECTIONS.",
1049 self.max_queue_size
1050 )))
1051 }
1052 }
1053 }
1054 Err(async_channel::TrySendError::Closed(_)) => {
1055 Err(PluginError::PluginExecutionError(
1056 "Plugin execution queue is closed".to_string(),
1057 ))
1058 }
1059 };
1060
1061 let elapsed_ms = start_time.elapsed().as_millis() as u32;
1062 match &result {
1063 Ok(_) => circuit_breaker.record_success(elapsed_ms),
1064 Err(e) => {
1065 if Self::is_dead_server_error(e) {
1067 circuit_breaker.record_failure();
1068 tracing::warn!(
1069 error = %e,
1070 "Detected dead pool server error (queued path), triggering health check for restart"
1071 );
1072 self.health_check_needed.store(true, Ordering::Relaxed);
1073 } else {
1074 circuit_breaker.record_success(elapsed_ms);
1076 }
1077 }
1078 }
1079
1080 tracing::debug!(
1081 elapsed_ms = elapsed_ms,
1082 result_ok = result.is_ok(),
1083 "Pool execute finished (queued path)"
1084 );
1085 result
1086 }
1087 }
1088 }
1089
1090 pub fn is_dead_server_error(err: &PluginError) -> bool {
1092 let error_str = err.to_string();
1093 let lower = error_str.to_lowercase();
1094
1095 if lower.contains("handler timed out")
1096 || (lower.contains("plugin") && lower.contains("timed out"))
1097 {
1098 return false;
1099 }
1100
1101 DeadServerIndicator::from_error_str(&error_str).is_some()
1102 }
1103
1104 pub async fn precompile_plugin(
1106 &self,
1107 plugin_id: String,
1108 plugin_path: Option<String>,
1109 source_code: Option<String>,
1110 ) -> Result<String, PluginError> {
1111 self.ensure_started().await?;
1112
1113 let mut conn = self.connection_pool.acquire().await?;
1114
1115 let request = PoolRequest::Precompile {
1116 task_id: Uuid::new_v4().to_string(),
1117 plugin_id: plugin_id.clone(),
1118 plugin_path,
1119 source_code,
1120 };
1121
1122 let response = conn
1123 .send_request_with_timeout(&request, get_config().pool_request_timeout_secs)
1124 .await?;
1125
1126 if response.success {
1127 response
1128 .result
1129 .and_then(|v| {
1130 v.get("code")
1131 .and_then(|c| c.as_str())
1132 .map(|s| s.to_string())
1133 })
1134 .ok_or_else(|| {
1135 PluginError::PluginExecutionError("No compiled code in response".to_string())
1136 })
1137 } else {
1138 let error = response.error.unwrap_or(PoolError {
1139 message: "Compilation failed".to_string(),
1140 code: None,
1141 status: None,
1142 details: None,
1143 });
1144 Err(PluginError::PluginExecutionError(error.message))
1145 }
1146 }
1147
1148 pub async fn cache_compiled_code(
1150 &self,
1151 plugin_id: String,
1152 compiled_code: String,
1153 ) -> Result<(), PluginError> {
1154 self.ensure_started().await?;
1155
1156 let mut conn = self.connection_pool.acquire().await?;
1157
1158 let request = PoolRequest::Cache {
1159 task_id: Uuid::new_v4().to_string(),
1160 plugin_id: plugin_id.clone(),
1161 compiled_code,
1162 };
1163
1164 let response = conn
1165 .send_request_with_timeout(&request, get_config().pool_request_timeout_secs)
1166 .await?;
1167
1168 if response.success {
1169 Ok(())
1170 } else {
1171 let error = response.error.unwrap_or(PoolError {
1172 message: "Cache failed".to_string(),
1173 code: None,
1174 status: None,
1175 details: None,
1176 });
1177 Err(PluginError::PluginError(error.message))
1178 }
1179 }
1180
1181 pub async fn invalidate_plugin(&self, plugin_id: String) -> Result<(), PluginError> {
1183 if !self.initialized.load(Ordering::Acquire) {
1184 return Ok(());
1185 }
1186
1187 let mut conn = self.connection_pool.acquire().await?;
1188
1189 let request = PoolRequest::Invalidate {
1190 task_id: Uuid::new_v4().to_string(),
1191 plugin_id,
1192 };
1193
1194 let _ = conn
1195 .send_request_with_timeout(&request, get_config().pool_request_timeout_secs)
1196 .await?;
1197 Ok(())
1198 }
1199
1200 async fn collect_socket_stats(
1203 &self,
1204 ) -> (
1205 Option<usize>,
1206 Option<usize>,
1207 Option<usize>,
1208 Option<usize>,
1209 Option<usize>,
1210 ) {
1211 let (shared_available, shared_active, shared_executions) = match get_shared_socket_service()
1213 {
1214 Ok(service) => {
1215 let available = service.available_connection_slots();
1216 let active = service.active_connection_count();
1217 let executions = service.registered_executions_count().await;
1218 (Some(available), Some(active), Some(executions))
1219 }
1220 Err(_) => (None, None, None),
1221 };
1222
1223 let pool_available = self.connection_pool.semaphore.available_permits();
1225 let pool_max = get_config().pool_max_connections;
1226 let pool_active = pool_max.saturating_sub(pool_available);
1227
1228 (
1229 shared_available,
1230 shared_active,
1231 shared_executions,
1232 Some(pool_available),
1233 Some(pool_active),
1234 )
1235 }
1236
1237 pub async fn health_check(&self) -> Result<HealthStatus, PluginError> {
1238 let circuit_info = || {
1239 let state = match self.circuit_breaker.state() {
1240 CircuitState::Closed => "closed",
1241 CircuitState::HalfOpen => "half_open",
1242 CircuitState::Open => "open",
1243 };
1244 (
1245 Some(state.to_string()),
1246 Some(self.circuit_breaker.avg_response_time()),
1247 Some(self.recovery_mode.load(Ordering::Relaxed)),
1248 Some(self.recovery_allowance.load(Ordering::Relaxed)),
1249 )
1250 };
1251
1252 let socket_stats = self.collect_socket_stats().await;
1253
1254 if !self.initialized.load(Ordering::Acquire) {
1255 let (circuit_state, avg_rt, recovering, recovery_pct) = circuit_info();
1256 let (shared_available, shared_active, shared_executions, pool_available, pool_active) =
1257 socket_stats;
1258 return Ok(HealthStatus {
1259 healthy: false,
1260 status: "not_initialized".to_string(),
1261 uptime_ms: None,
1262 memory: None,
1263 pool_completed: None,
1264 pool_queued: None,
1265 success_rate: None,
1266 circuit_state,
1267 avg_response_time_ms: avg_rt,
1268 recovering,
1269 recovery_percent: recovery_pct,
1270 shared_socket_available_slots: shared_available,
1271 shared_socket_active_connections: shared_active,
1272 shared_socket_registered_executions: shared_executions,
1273 connection_pool_available_slots: pool_available,
1274 connection_pool_active_connections: pool_active,
1275 });
1276 }
1277
1278 if !std::path::Path::new(&self.socket_path).exists() {
1279 let (circuit_state, avg_rt, recovering, recovery_pct) = circuit_info();
1280 let (shared_available, shared_active, shared_executions, pool_available, pool_active) =
1281 socket_stats;
1282 return Ok(HealthStatus {
1283 healthy: false,
1284 status: "socket_missing".to_string(),
1285 uptime_ms: None,
1286 memory: None,
1287 pool_completed: None,
1288 pool_queued: None,
1289 success_rate: None,
1290 circuit_state,
1291 avg_response_time_ms: avg_rt,
1292 recovering,
1293 recovery_percent: recovery_pct,
1294 shared_socket_available_slots: shared_available,
1295 shared_socket_active_connections: shared_active,
1296 shared_socket_registered_executions: shared_executions,
1297 connection_pool_available_slots: pool_available,
1298 connection_pool_active_connections: pool_active,
1299 });
1300 }
1301
1302 let mut conn =
1303 match tokio::time::timeout(Duration::from_millis(100), self.connection_pool.acquire())
1304 .await
1305 {
1306 Ok(Ok(c)) => c,
1307 Ok(Err(e)) => {
1308 let err_str = e.to_string();
1309 let is_pool_exhausted =
1310 err_str.contains("semaphore") || err_str.contains("Connection refused");
1311
1312 let process_status = match self.process.try_lock() {
1314 Ok(guard) => {
1315 if let Some(child) = guard.as_ref() {
1316 format!("process_pid_{}", child.id().unwrap_or(0))
1317 } else {
1318 "no_process".to_string()
1319 }
1320 }
1321 Err(_) => "process_lock_busy".to_string(),
1322 };
1323
1324 let (circuit_state, avg_rt, recovering, recovery_pct) = circuit_info();
1325 let (
1326 shared_available,
1327 shared_active,
1328 shared_executions,
1329 pool_available,
1330 pool_active,
1331 ) = socket_stats;
1332 return Ok(HealthStatus {
1333 healthy: is_pool_exhausted,
1334 status: if is_pool_exhausted {
1335 format!("pool_exhausted: {e} ({process_status})")
1336 } else {
1337 format!("connection_failed: {e} ({process_status})")
1338 },
1339 uptime_ms: None,
1340 memory: None,
1341 pool_completed: None,
1342 pool_queued: None,
1343 success_rate: None,
1344 circuit_state,
1345 avg_response_time_ms: avg_rt,
1346 recovering,
1347 recovery_percent: recovery_pct,
1348 shared_socket_available_slots: shared_available,
1349 shared_socket_active_connections: shared_active,
1350 shared_socket_registered_executions: shared_executions,
1351 connection_pool_available_slots: pool_available,
1352 connection_pool_active_connections: pool_active,
1353 });
1354 }
1355 Err(_) => {
1356 let (circuit_state, avg_rt, recovering, recovery_pct) = circuit_info();
1357 let (
1358 shared_available,
1359 shared_active,
1360 shared_executions,
1361 pool_available,
1362 pool_active,
1363 ) = socket_stats;
1364 return Ok(HealthStatus {
1365 healthy: true,
1366 status: "pool_busy".to_string(),
1367 uptime_ms: None,
1368 memory: None,
1369 pool_completed: None,
1370 pool_queued: None,
1371 success_rate: None,
1372 circuit_state,
1373 avg_response_time_ms: avg_rt,
1374 recovering,
1375 recovery_percent: recovery_pct,
1376 shared_socket_available_slots: shared_available,
1377 shared_socket_active_connections: shared_active,
1378 shared_socket_registered_executions: shared_executions,
1379 connection_pool_available_slots: pool_available,
1380 connection_pool_active_connections: pool_active,
1381 });
1382 }
1383 };
1384
1385 let request = PoolRequest::Health {
1386 task_id: Uuid::new_v4().to_string(),
1387 };
1388
1389 let (circuit_state, avg_rt, recovering, recovery_pct) = circuit_info();
1390
1391 match conn.send_request_with_timeout(&request, 5).await {
1392 Ok(response) => {
1393 if response.success {
1394 let result = response.result.unwrap_or_default();
1395 let parsed = Self::parse_health_result(&result);
1397
1398 {
1399 let (
1400 shared_available,
1401 shared_active,
1402 shared_executions,
1403 pool_available,
1404 pool_active,
1405 ) = socket_stats;
1406 Ok(HealthStatus {
1407 healthy: true,
1408 status: parsed.status,
1409 uptime_ms: parsed.uptime_ms,
1410 memory: parsed.memory,
1411 pool_completed: parsed.pool_completed,
1412 pool_queued: parsed.pool_queued,
1413 success_rate: parsed.success_rate,
1414 circuit_state,
1415 avg_response_time_ms: avg_rt,
1416 recovering,
1417 recovery_percent: recovery_pct,
1418 shared_socket_available_slots: shared_available,
1419 shared_socket_active_connections: shared_active,
1420 shared_socket_registered_executions: shared_executions,
1421 connection_pool_available_slots: pool_available,
1422 connection_pool_active_connections: pool_active,
1423 })
1424 }
1425 } else {
1426 let (
1427 shared_available,
1428 shared_active,
1429 shared_executions,
1430 pool_available,
1431 pool_active,
1432 ) = socket_stats;
1433 Ok(HealthStatus {
1434 healthy: false,
1435 status: response
1436 .error
1437 .map(|e| e.message)
1438 .unwrap_or_else(|| "unknown_error".to_string()),
1439 uptime_ms: None,
1440 memory: None,
1441 pool_completed: None,
1442 pool_queued: None,
1443 success_rate: None,
1444 circuit_state,
1445 avg_response_time_ms: avg_rt,
1446 recovering,
1447 recovery_percent: recovery_pct,
1448 shared_socket_available_slots: shared_available,
1449 shared_socket_active_connections: shared_active,
1450 shared_socket_registered_executions: shared_executions,
1451 connection_pool_available_slots: pool_available,
1452 connection_pool_active_connections: pool_active,
1453 })
1454 }
1455 }
1456 Err(e) => {
1457 let (
1458 shared_available,
1459 shared_active,
1460 shared_executions,
1461 pool_available,
1462 pool_active,
1463 ) = socket_stats;
1464 Ok(HealthStatus {
1465 healthy: false,
1466 status: format!("request_failed: {e}"),
1467 uptime_ms: None,
1468 memory: None,
1469 pool_completed: None,
1470 pool_queued: None,
1471 success_rate: None,
1472 circuit_state,
1473 avg_response_time_ms: avg_rt,
1474 recovering,
1475 recovery_percent: recovery_pct,
1476 shared_socket_available_slots: shared_available,
1477 shared_socket_active_connections: shared_active,
1478 shared_socket_registered_executions: shared_executions,
1479 connection_pool_available_slots: pool_available,
1480 connection_pool_active_connections: pool_active,
1481 })
1482 }
1483 }
1484 }
1485
1486 pub async fn ensure_healthy(&self) -> Result<bool, PluginError> {
1488 let health = self.health_check().await?;
1489
1490 if health.healthy {
1491 return Ok(true);
1492 }
1493
1494 match self.restart_lock.try_lock() {
1495 Ok(_guard) => {
1496 let health_recheck = self.health_check().await?;
1497 if health_recheck.healthy {
1498 return Ok(true);
1499 }
1500
1501 tracing::warn!(status = %health.status, "Pool server unhealthy, attempting restart");
1502 self.restart_internal().await?;
1503 }
1504 Err(_) => {
1505 tracing::debug!("Waiting for another task to complete pool server restart");
1506 let _guard = self.restart_lock.lock().await;
1507 }
1508 }
1509
1510 let health_after = self.health_check().await?;
1511 Ok(health_after.healthy)
1512 }
1513
1514 pub async fn restart(&self) -> Result<(), PluginError> {
1516 let _guard = self.restart_lock.lock().await;
1517 self.restart_internal().await
1518 }
1519
1520 async fn restart_internal(&self) -> Result<(), PluginError> {
1522 tracing::info!("Restarting plugin pool server");
1523
1524 {
1525 let mut process_guard = self.process.lock().await;
1526 if let Some(mut child) = process_guard.take() {
1527 let _ = child.kill().await;
1528 tokio::time::sleep(Duration::from_millis(100)).await;
1529 }
1530 }
1531
1532 Self::cleanup_socket_file(&self.socket_path).await;
1533
1534 self.initialized.store(false, Ordering::Release);
1535
1536 let mut process_guard = self.process.lock().await;
1537 if process_guard.is_some() {
1538 return Ok(());
1539 }
1540
1541 let child = Self::spawn_pool_server_process(&self.socket_path, "restart").await?;
1542 *process_guard = Some(child);
1543
1544 self.initialized.store(true, Ordering::Release);
1545
1546 self.recovery_allowance.store(10, Ordering::Relaxed);
1547 self.recovery_mode.store(true, Ordering::Relaxed);
1548
1549 self.circuit_breaker.force_close();
1550
1551 let now = std::time::SystemTime::now()
1552 .duration_since(std::time::UNIX_EPOCH)
1553 .unwrap_or_default()
1554 .as_millis() as u64;
1555 self.last_restart_time_ms.store(now, Ordering::Relaxed);
1556
1557 tracing::info!("Recovery mode enabled - requests will gradually increase from 10%");
1558
1559 Ok(())
1560 }
1561
1562 pub fn circuit_state(&self) -> CircuitState {
1564 self.circuit_breaker.state()
1565 }
1566
1567 pub fn avg_response_time_ms(&self) -> u32 {
1569 self.circuit_breaker.avg_response_time()
1570 }
1571
1572 pub fn is_recovering(&self) -> bool {
1574 self.recovery_mode.load(Ordering::Relaxed)
1575 }
1576
1577 pub fn recovery_allowance_percent(&self) -> u32 {
1579 self.recovery_allowance.load(Ordering::Relaxed)
1580 }
1581
1582 pub async fn shutdown(&self) -> Result<(), PluginError> {
1584 if !self.initialized.load(Ordering::Acquire) {
1585 return Ok(());
1586 }
1587
1588 tracing::info!("Initiating graceful shutdown of plugin pool server");
1589
1590 self.shutdown_signal.notify_waiters();
1591
1592 let shutdown_timeout = std::time::Duration::from_secs(35);
1593 let shutdown_result = self.send_shutdown_request(shutdown_timeout).await;
1594
1595 match &shutdown_result {
1596 Ok(response) => {
1597 tracing::info!(
1598 response = ?response,
1599 "Pool server acknowledged shutdown, waiting for graceful exit"
1600 );
1601 }
1602 Err(e) => {
1603 tracing::warn!(
1604 error = %e,
1605 "Failed to send shutdown request, will force kill"
1606 );
1607 }
1608 }
1609
1610 let mut process_guard = self.process.lock().await;
1611 if let Some(ref mut child) = *process_guard {
1612 let graceful_wait = std::time::Duration::from_secs(35);
1613 let start = std::time::Instant::now();
1614
1615 loop {
1616 match child.try_wait() {
1617 Ok(Some(status)) => {
1618 tracing::info!(
1619 exit_status = ?status,
1620 elapsed_ms = start.elapsed().as_millis(),
1621 "Pool server exited gracefully"
1622 );
1623 break;
1624 }
1625 Ok(None) => {
1626 if start.elapsed() >= graceful_wait {
1627 tracing::warn!(
1628 "Pool server did not exit within graceful timeout, force killing"
1629 );
1630 let _ = child.kill().await;
1631 break;
1632 }
1633 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1634 }
1635 Err(e) => {
1636 tracing::warn!(error = %e, "Error checking pool server status");
1637 let _ = child.kill().await;
1638 break;
1639 }
1640 }
1641 }
1642 }
1643 *process_guard = None;
1644
1645 let _ = std::fs::remove_file(&self.socket_path);
1646
1647 self.initialized.store(false, Ordering::Release);
1648 tracing::info!("Plugin pool server shutdown complete");
1649 Ok(())
1650 }
1651
1652 async fn send_shutdown_request(
1654 &self,
1655 timeout: std::time::Duration,
1656 ) -> Result<PoolResponse, PluginError> {
1657 let request = PoolRequest::Shutdown {
1658 task_id: Uuid::new_v4().to_string(),
1659 };
1660
1661 let connection_id = self.connection_pool.next_connection_id();
1664 let mut conn = match PoolConnection::new(&self.socket_path, connection_id).await {
1665 Ok(c) => c,
1666 Err(e) => {
1667 return Err(PluginError::PluginExecutionError(format!(
1668 "Failed to connect for shutdown: {e}"
1669 )));
1670 }
1671 };
1672
1673 conn.send_request_with_timeout(&request, timeout.as_secs())
1674 .await
1675 }
1676}
1677
1678impl Drop for PoolManager {
1679 fn drop(&mut self) {
1680 let _ = std::fs::remove_file(&self.socket_path);
1681 }
1682}
1683
1684static POOL_MANAGER: std::sync::OnceLock<Arc<PoolManager>> = std::sync::OnceLock::new();
1686
1687pub fn get_pool_manager() -> Arc<PoolManager> {
1689 POOL_MANAGER
1690 .get_or_init(|| Arc::new(PoolManager::new()))
1691 .clone()
1692}
1693
1694#[cfg(test)]
1695mod tests {
1696 use super::*;
1697 use crate::services::plugins::script_executor::LogLevel;
1698
1699 #[test]
1700 fn test_is_dead_server_error_detects_dead_server() {
1701 let err = PluginError::PluginExecutionError("Connection refused".to_string());
1702 assert!(PoolManager::is_dead_server_error(&err));
1703
1704 let err = PluginError::PluginExecutionError("Broken pipe".to_string());
1705 assert!(PoolManager::is_dead_server_error(&err));
1706 }
1707
1708 #[test]
1709 fn test_is_dead_server_error_excludes_plugin_timeouts() {
1710 let err = PluginError::PluginExecutionError("Plugin timed out after 30s".to_string());
1711 assert!(!PoolManager::is_dead_server_error(&err));
1712
1713 let err = PluginError::PluginExecutionError("Handler timed out".to_string());
1714 assert!(!PoolManager::is_dead_server_error(&err));
1715 }
1716
1717 #[test]
1718 fn test_is_dead_server_error_normal_errors() {
1719 let err =
1720 PluginError::PluginExecutionError("TypeError: undefined is not a function".to_string());
1721 assert!(!PoolManager::is_dead_server_error(&err));
1722
1723 let err = PluginError::PluginExecutionError("Plugin returned invalid JSON".to_string());
1724 assert!(!PoolManager::is_dead_server_error(&err));
1725 }
1726
1727 #[test]
1728 fn test_is_dead_server_error_detects_all_dead_server_indicators() {
1729 let dead_server_errors = vec![
1731 "EOF while parsing JSON response",
1732 "Broken pipe when writing to socket",
1733 "Connection refused: server not running",
1734 "Connection reset by peer",
1735 "Socket not connected",
1736 "Failed to connect to pool server",
1737 "Socket file missing: /tmp/test.sock",
1738 "No such file or directory",
1739 ];
1740
1741 for error_msg in dead_server_errors {
1742 let err = PluginError::PluginExecutionError(error_msg.to_string());
1743 assert!(
1744 PoolManager::is_dead_server_error(&err),
1745 "Expected '{}' to be detected as dead server error",
1746 error_msg
1747 );
1748 }
1749 }
1750
1751 #[test]
1752 fn test_dead_server_indicator_patterns() {
1753 use super::super::health::DeadServerIndicator;
1755
1756 assert!(DeadServerIndicator::from_error_str("eof while parsing").is_some());
1758 assert!(DeadServerIndicator::from_error_str("broken pipe").is_some());
1759 assert!(DeadServerIndicator::from_error_str("connection refused").is_some());
1760 assert!(DeadServerIndicator::from_error_str("connection reset").is_some());
1761 assert!(DeadServerIndicator::from_error_str("not connected").is_some());
1762 assert!(DeadServerIndicator::from_error_str("failed to connect").is_some());
1763 assert!(DeadServerIndicator::from_error_str("socket file missing").is_some());
1764 assert!(DeadServerIndicator::from_error_str("no such file").is_some());
1765 assert!(DeadServerIndicator::from_error_str("connection timed out").is_some());
1766 assert!(DeadServerIndicator::from_error_str("connect timed out").is_some());
1767
1768 assert!(DeadServerIndicator::from_error_str("handler timed out").is_none());
1770 assert!(DeadServerIndicator::from_error_str("validation error").is_none());
1771 assert!(DeadServerIndicator::from_error_str("TypeError: undefined").is_none());
1772 }
1773
1774 #[test]
1775 fn test_is_dead_server_error_excludes_plugin_timeouts_with_connection() {
1776 let plugin_timeout =
1778 PluginError::PluginExecutionError("plugin connection timed out".to_string());
1779 assert!(!PoolManager::is_dead_server_error(&plugin_timeout));
1781 }
1782
1783 #[test]
1784 fn test_is_dead_server_error_case_insensitive() {
1785 let err = PluginError::PluginExecutionError("CONNECTION REFUSED".to_string());
1787 assert!(PoolManager::is_dead_server_error(&err));
1788
1789 let err = PluginError::PluginExecutionError("BROKEN PIPE".to_string());
1790 assert!(PoolManager::is_dead_server_error(&err));
1791
1792 let err = PluginError::PluginExecutionError("Connection Reset By Peer".to_string());
1793 assert!(PoolManager::is_dead_server_error(&err));
1794 }
1795
1796 #[test]
1797 fn test_is_dead_server_error_handler_timeout_variations() {
1798 let timeout_errors = vec![
1800 "Handler timed out",
1801 "handler timed out after 30000ms",
1802 "Plugin handler timed out",
1803 "plugin timed out",
1804 "Plugin execution timed out after 60s",
1805 ];
1806
1807 for error_msg in timeout_errors {
1808 let err = PluginError::PluginExecutionError(error_msg.to_string());
1809 assert!(
1810 !PoolManager::is_dead_server_error(&err),
1811 "Expected '{}' to NOT be detected as dead server error",
1812 error_msg
1813 );
1814 }
1815 }
1816
1817 #[test]
1818 fn test_is_dead_server_error_business_errors_not_detected() {
1819 let business_errors = vec![
1821 "ReferenceError: x is not defined",
1822 "SyntaxError: Unexpected token",
1823 "TypeError: Cannot read property 'foo' of undefined",
1824 "Plugin returned status 400: Bad Request",
1825 "Validation error: missing required field",
1826 "Authorization failed",
1827 "Rate limit exceeded",
1828 "Plugin threw an error: Invalid input",
1829 ];
1830
1831 for error_msg in business_errors {
1832 let err = PluginError::PluginExecutionError(error_msg.to_string());
1833 assert!(
1834 !PoolManager::is_dead_server_error(&err),
1835 "Expected '{}' to NOT be detected as dead server error",
1836 error_msg
1837 );
1838 }
1839 }
1840
1841 #[test]
1842 fn test_is_dead_server_error_with_handler_error_type() {
1843 let handler_payload = PluginHandlerPayload {
1845 message: "Connection refused".to_string(),
1846 status: 500,
1847 code: None,
1848 details: None,
1849 logs: None,
1850 traces: None,
1851 };
1852 let err = PluginError::HandlerError(Box::new(handler_payload));
1853 assert!(PoolManager::is_dead_server_error(&err));
1856 }
1857
1858 #[test]
1863 fn test_heap_calculation_base_case() {
1864 let base = PoolManager::BASE_HEAP_MB;
1866 let divisor = PoolManager::CONCURRENCY_DIVISOR;
1867 let increment = PoolManager::HEAP_INCREMENT_PER_DIVISOR_MB;
1868
1869 let concurrency = 100;
1872 let expected = base + ((concurrency / divisor) * increment);
1873 assert_eq!(expected, 832);
1874 }
1875
1876 #[test]
1877 fn test_heap_calculation_minimum() {
1878 let base = PoolManager::BASE_HEAP_MB;
1880 let divisor = PoolManager::CONCURRENCY_DIVISOR;
1881 let increment = PoolManager::HEAP_INCREMENT_PER_DIVISOR_MB;
1882
1883 let concurrency = 5;
1886 let expected = base + ((concurrency / divisor) * increment);
1887 assert_eq!(expected, 512);
1888 }
1889
1890 #[test]
1891 fn test_heap_calculation_high_concurrency() {
1892 let base = PoolManager::BASE_HEAP_MB;
1894 let divisor = PoolManager::CONCURRENCY_DIVISOR;
1895 let increment = PoolManager::HEAP_INCREMENT_PER_DIVISOR_MB;
1896
1897 let concurrency = 500;
1900 let expected = base + ((concurrency / divisor) * increment);
1901 assert_eq!(expected, 2112);
1902 }
1903
1904 #[test]
1905 fn test_heap_calculation_max_cap() {
1906 let max_heap = PoolManager::MAX_HEAP_MB;
1908 assert_eq!(max_heap, 8192);
1909
1910 let base = PoolManager::BASE_HEAP_MB;
1914 let divisor = PoolManager::CONCURRENCY_DIVISOR;
1915 let increment = PoolManager::HEAP_INCREMENT_PER_DIVISOR_MB;
1916
1917 let concurrency = 3000;
1918 let calculated = base + ((concurrency / divisor) * increment);
1919 let capped = calculated.min(max_heap);
1920
1921 assert_eq!(calculated, 10112);
1922 assert_eq!(capped, 8192);
1923 }
1924
1925 #[test]
1930 fn test_pool_manager_constants() {
1931 assert_eq!(PoolManager::BASE_HEAP_MB, 512);
1933 assert_eq!(PoolManager::CONCURRENCY_DIVISOR, 10);
1934 assert_eq!(PoolManager::HEAP_INCREMENT_PER_DIVISOR_MB, 32);
1935 assert_eq!(PoolManager::MAX_HEAP_MB, 8192);
1936 }
1937
1938 #[test]
1943 fn test_calculate_heap_size_low_concurrency() {
1944 assert_eq!(PoolManager::calculate_heap_size(5), 512);
1946 assert_eq!(PoolManager::calculate_heap_size(9), 512);
1947 }
1948
1949 #[test]
1950 fn test_calculate_heap_size_medium_concurrency() {
1951 assert_eq!(PoolManager::calculate_heap_size(10), 544);
1953 assert_eq!(PoolManager::calculate_heap_size(50), 672);
1955 assert_eq!(PoolManager::calculate_heap_size(100), 832);
1957 }
1958
1959 #[test]
1960 fn test_calculate_heap_size_high_concurrency() {
1961 assert_eq!(PoolManager::calculate_heap_size(500), 2112);
1963 assert_eq!(PoolManager::calculate_heap_size(1000), 3712);
1965 }
1966
1967 #[test]
1968 fn test_calculate_heap_size_capped_at_max() {
1969 assert_eq!(PoolManager::calculate_heap_size(3000), 8192);
1971 assert_eq!(PoolManager::calculate_heap_size(10000), 8192);
1973 }
1974
1975 #[test]
1976 fn test_calculate_heap_size_zero_concurrency() {
1977 assert_eq!(PoolManager::calculate_heap_size(0), 512);
1979 }
1980
1981 #[test]
1986 fn test_format_return_value_none() {
1987 assert_eq!(PoolManager::format_return_value(None), "");
1988 }
1989
1990 #[test]
1991 fn test_format_return_value_string() {
1992 let value = Some(serde_json::json!("hello world"));
1993 assert_eq!(PoolManager::format_return_value(value), "hello world");
1994 }
1995
1996 #[test]
1997 fn test_format_return_value_empty_string() {
1998 let value = Some(serde_json::json!(""));
1999 assert_eq!(PoolManager::format_return_value(value), "");
2000 }
2001
2002 #[test]
2003 fn test_format_return_value_object() {
2004 let value = Some(serde_json::json!({"key": "value", "num": 42}));
2005 let result = PoolManager::format_return_value(value);
2006 assert!(result.contains("key"));
2008 assert!(result.contains("value"));
2009 assert!(result.contains("42"));
2010 }
2011
2012 #[test]
2013 fn test_format_return_value_array() {
2014 let value = Some(serde_json::json!([1, 2, 3]));
2015 assert_eq!(PoolManager::format_return_value(value), "[1,2,3]");
2016 }
2017
2018 #[test]
2019 fn test_format_return_value_number() {
2020 let value = Some(serde_json::json!(42));
2021 assert_eq!(PoolManager::format_return_value(value), "42");
2022 }
2023
2024 #[test]
2025 fn test_format_return_value_boolean() {
2026 assert_eq!(
2027 PoolManager::format_return_value(Some(serde_json::json!(true))),
2028 "true"
2029 );
2030 assert_eq!(
2031 PoolManager::format_return_value(Some(serde_json::json!(false))),
2032 "false"
2033 );
2034 }
2035
2036 #[test]
2037 fn test_format_return_value_null() {
2038 let value = Some(serde_json::json!(null));
2039 assert_eq!(PoolManager::format_return_value(value), "null");
2040 }
2041
2042 #[test]
2047 fn test_parse_pool_response_success_with_string_result() {
2048 use super::super::protocol::{PoolLogEntry, PoolResponse};
2049
2050 let response = PoolResponse {
2051 task_id: "test-123".to_string(),
2052 success: true,
2053 result: Some(serde_json::json!("success result")),
2054 error: None,
2055 logs: Some(vec![PoolLogEntry {
2056 level: "info".to_string(),
2057 message: "test log".to_string(),
2058 }]),
2059 };
2060
2061 let result = PoolManager::parse_pool_response(response).unwrap();
2062 assert_eq!(result.return_value, "success result");
2063 assert!(result.error.is_empty());
2064 assert_eq!(result.logs.len(), 1);
2065 assert_eq!(result.logs[0].level, LogLevel::Info);
2066 assert_eq!(result.logs[0].message, "test log");
2067 }
2068
2069 #[test]
2070 fn test_parse_pool_response_success_with_object_result() {
2071 use super::super::protocol::PoolResponse;
2072
2073 let response = PoolResponse {
2074 task_id: "test-456".to_string(),
2075 success: true,
2076 result: Some(serde_json::json!({"data": "value"})),
2077 error: None,
2078 logs: None,
2079 };
2080
2081 let result = PoolManager::parse_pool_response(response).unwrap();
2082 assert!(result.return_value.contains("data"));
2083 assert!(result.return_value.contains("value"));
2084 assert!(result.logs.is_empty());
2085 }
2086
2087 #[test]
2088 fn test_parse_pool_response_success_no_result() {
2089 use super::super::protocol::PoolResponse;
2090
2091 let response = PoolResponse {
2092 task_id: "test-789".to_string(),
2093 success: true,
2094 result: None,
2095 error: None,
2096 logs: None,
2097 };
2098
2099 let result = PoolManager::parse_pool_response(response).unwrap();
2100 assert_eq!(result.return_value, "");
2101 assert!(result.error.is_empty());
2102 }
2103
2104 #[test]
2105 fn test_parse_pool_response_failure_with_error() {
2106 use super::super::protocol::{PoolError, PoolResponse};
2107
2108 let response = PoolResponse {
2109 task_id: "test-error".to_string(),
2110 success: false,
2111 result: None,
2112 error: Some(PoolError {
2113 message: "Something went wrong".to_string(),
2114 code: Some("ERR_001".to_string()),
2115 status: Some(400),
2116 details: Some(serde_json::json!({"field": "name"})),
2117 }),
2118 logs: None,
2119 };
2120
2121 let err = PoolManager::parse_pool_response(response).unwrap_err();
2122 match err {
2123 PluginError::HandlerError(payload) => {
2124 assert_eq!(payload.message, "Something went wrong");
2125 assert_eq!(payload.status, 400);
2126 assert_eq!(payload.code, Some("ERR_001".to_string()));
2127 }
2128 _ => panic!("Expected HandlerError"),
2129 }
2130 }
2131
2132 #[test]
2133 fn test_parse_pool_response_failure_no_error_details() {
2134 use super::super::protocol::PoolResponse;
2135
2136 let response = PoolResponse {
2137 task_id: "test-unknown".to_string(),
2138 success: false,
2139 result: None,
2140 error: None,
2141 logs: None,
2142 };
2143
2144 let err = PoolManager::parse_pool_response(response).unwrap_err();
2145 match err {
2146 PluginError::HandlerError(payload) => {
2147 assert_eq!(payload.message, "Unknown error");
2148 assert_eq!(payload.status, 500);
2149 }
2150 _ => panic!("Expected HandlerError"),
2151 }
2152 }
2153
2154 #[test]
2155 fn test_parse_pool_response_failure_preserves_logs() {
2156 use super::super::protocol::{PoolError, PoolLogEntry, PoolResponse};
2157
2158 let response = PoolResponse {
2159 task_id: "test-logs".to_string(),
2160 success: false,
2161 result: None,
2162 error: Some(PoolError {
2163 message: "Error with logs".to_string(),
2164 code: None,
2165 status: None,
2166 details: None,
2167 }),
2168 logs: Some(vec![
2169 PoolLogEntry {
2170 level: "debug".to_string(),
2171 message: "debug message".to_string(),
2172 },
2173 PoolLogEntry {
2174 level: "error".to_string(),
2175 message: "error message".to_string(),
2176 },
2177 ]),
2178 };
2179
2180 let err = PoolManager::parse_pool_response(response).unwrap_err();
2181 match err {
2182 PluginError::HandlerError(payload) => {
2183 let logs = payload.logs.unwrap();
2184 assert_eq!(logs.len(), 2);
2185 assert_eq!(logs[0].level, LogLevel::Debug);
2186 assert_eq!(logs[1].level, LogLevel::Error);
2187 }
2188 _ => panic!("Expected HandlerError"),
2189 }
2190 }
2191
2192 #[test]
2197 fn test_parse_success_response_complete() {
2198 use super::super::protocol::{PoolLogEntry, PoolResponse};
2199
2200 let response = PoolResponse {
2201 task_id: "task-1".to_string(),
2202 success: true,
2203 result: Some(serde_json::json!("completed")),
2204 error: None,
2205 logs: Some(vec![
2206 PoolLogEntry {
2207 level: "log".to_string(),
2208 message: "starting".to_string(),
2209 },
2210 PoolLogEntry {
2211 level: "result".to_string(),
2212 message: "finished".to_string(),
2213 },
2214 ]),
2215 };
2216
2217 let result = PoolManager::parse_success_response(response);
2218 assert_eq!(result.return_value, "completed");
2219 assert!(result.error.is_empty());
2220 assert_eq!(result.logs.len(), 2);
2221 assert_eq!(result.logs[0].level, LogLevel::Log);
2222 assert_eq!(result.logs[1].level, LogLevel::Result);
2223 }
2224
2225 #[test]
2230 fn test_parse_error_response_with_all_fields() {
2231 use super::super::protocol::{PoolError, PoolLogEntry, PoolResponse};
2232
2233 let response = PoolResponse {
2234 task_id: "err-task".to_string(),
2235 success: false,
2236 result: None,
2237 error: Some(PoolError {
2238 message: "Validation failed".to_string(),
2239 code: Some("VALIDATION_ERROR".to_string()),
2240 status: Some(422),
2241 details: Some(serde_json::json!({"fields": ["email"]})),
2242 }),
2243 logs: Some(vec![PoolLogEntry {
2244 level: "warn".to_string(),
2245 message: "validation warning".to_string(),
2246 }]),
2247 };
2248
2249 let err = PoolManager::parse_error_response(response);
2250 match err {
2251 PluginError::HandlerError(payload) => {
2252 assert_eq!(payload.message, "Validation failed");
2253 assert_eq!(payload.status, 422);
2254 assert_eq!(payload.code, Some("VALIDATION_ERROR".to_string()));
2255 assert!(payload.details.is_some());
2256 let logs = payload.logs.unwrap();
2257 assert_eq!(logs.len(), 1);
2258 assert_eq!(logs[0].level, LogLevel::Warn);
2259 }
2260 _ => panic!("Expected HandlerError"),
2261 }
2262 }
2263
2264 #[test]
2269 fn test_parse_health_result_complete() {
2270 let json = serde_json::json!({
2271 "status": "healthy",
2272 "uptime": 123456,
2273 "memory": {
2274 "heapUsed": 50000000,
2275 "heapTotal": 100000000
2276 },
2277 "pool": {
2278 "completed": 1000,
2279 "queued": 5
2280 },
2281 "execution": {
2282 "successRate": 0.99
2283 }
2284 });
2285
2286 let result = PoolManager::parse_health_result(&json);
2287
2288 assert_eq!(result.status, "healthy");
2289 assert_eq!(result.uptime_ms, Some(123456));
2290 assert_eq!(result.memory, Some(50000000));
2291 assert_eq!(result.pool_completed, Some(1000));
2292 assert_eq!(result.pool_queued, Some(5));
2293 assert!((result.success_rate.unwrap() - 0.99).abs() < 0.001);
2294 }
2295
2296 #[test]
2297 fn test_parse_health_result_minimal() {
2298 let json = serde_json::json!({});
2299
2300 let result = PoolManager::parse_health_result(&json);
2301
2302 assert_eq!(result.status, "unknown");
2303 assert_eq!(result.uptime_ms, None);
2304 assert_eq!(result.memory, None);
2305 assert_eq!(result.pool_completed, None);
2306 assert_eq!(result.pool_queued, None);
2307 assert_eq!(result.success_rate, None);
2308 }
2309
2310 #[test]
2311 fn test_parse_health_result_partial() {
2312 let json = serde_json::json!({
2313 "status": "degraded",
2314 "uptime": 5000,
2315 "memory": {
2316 "heapTotal": 100000000
2317 }
2319 });
2320
2321 let result = PoolManager::parse_health_result(&json);
2322
2323 assert_eq!(result.status, "degraded");
2324 assert_eq!(result.uptime_ms, Some(5000));
2325 assert_eq!(result.memory, None); assert_eq!(result.pool_completed, None);
2327 assert_eq!(result.pool_queued, None);
2328 assert_eq!(result.success_rate, None);
2329 }
2330
2331 #[test]
2332 fn test_parse_health_result_wrong_types() {
2333 let json = serde_json::json!({
2334 "status": 123, "uptime": "not a number", "memory": "invalid" });
2338
2339 let result = PoolManager::parse_health_result(&json);
2340
2341 assert_eq!(result.status, "unknown"); assert_eq!(result.uptime_ms, None);
2343 assert_eq!(result.memory, None);
2344 assert_eq!(result.pool_completed, None);
2345 assert_eq!(result.pool_queued, None);
2346 assert_eq!(result.success_rate, None);
2347 }
2348
2349 #[test]
2350 fn test_parse_health_result_nested_values() {
2351 let json = serde_json::json!({
2352 "pool": {
2353 "completed": 0,
2354 "queued": 0
2355 },
2356 "execution": {
2357 "successRate": 1.0
2358 }
2359 });
2360
2361 let result = PoolManager::parse_health_result(&json);
2362
2363 assert_eq!(result.status, "unknown");
2364 assert_eq!(result.uptime_ms, None);
2365 assert_eq!(result.memory, None);
2366 assert_eq!(result.pool_completed, Some(0));
2367 assert_eq!(result.pool_queued, Some(0));
2368 assert!((result.success_rate.unwrap() - 1.0).abs() < 0.001);
2369 }
2370
2371 #[tokio::test]
2376 async fn test_pool_manager_new_creates_unique_socket_path() {
2377 let manager1 = PoolManager::new();
2379 let manager2 = PoolManager::new();
2380
2381 assert_ne!(manager1.socket_path, manager2.socket_path);
2382 assert!(manager1
2383 .socket_path
2384 .starts_with("/tmp/relayer-plugin-pool-"));
2385 assert!(manager2
2386 .socket_path
2387 .starts_with("/tmp/relayer-plugin-pool-"));
2388 }
2389
2390 #[tokio::test]
2391 async fn test_pool_manager_with_custom_socket_path() {
2392 let custom_path = "/tmp/custom-test-pool.sock".to_string();
2393 let manager = PoolManager::with_socket_path(custom_path.clone());
2394
2395 assert_eq!(manager.socket_path, custom_path);
2396 }
2397
2398 #[tokio::test]
2399 async fn test_pool_manager_default_trait() {
2400 let manager = PoolManager::default();
2402 assert!(manager.socket_path.starts_with("/tmp/relayer-plugin-pool-"));
2403 }
2404
2405 #[tokio::test]
2410 async fn test_circuit_state_initial() {
2411 let manager = PoolManager::new();
2412
2413 assert_eq!(manager.circuit_state(), CircuitState::Closed);
2415 }
2416
2417 #[tokio::test]
2418 async fn test_avg_response_time_initial() {
2419 let manager = PoolManager::new();
2420
2421 assert_eq!(manager.avg_response_time_ms(), 0);
2423 }
2424
2425 #[tokio::test]
2430 async fn test_recovery_mode_initial() {
2431 let manager = PoolManager::new();
2432
2433 assert!(!manager.is_recovering());
2435 assert_eq!(manager.recovery_allowance_percent(), 0);
2436 }
2437
2438 #[test]
2443 fn test_script_result_success_construction() {
2444 let result = ScriptResult {
2445 logs: vec![LogEntry {
2446 level: LogLevel::Info,
2447 message: "Test log".to_string(),
2448 }],
2449 error: String::new(),
2450 return_value: r#"{"success": true}"#.to_string(),
2451 trace: vec![],
2452 };
2453
2454 assert!(result.error.is_empty());
2455 assert_eq!(result.logs.len(), 1);
2456 assert_eq!(result.logs[0].level, LogLevel::Info);
2457 }
2458
2459 #[test]
2460 fn test_script_result_with_multiple_logs() {
2461 let result = ScriptResult {
2462 logs: vec![
2463 LogEntry {
2464 level: LogLevel::Log,
2465 message: "Starting execution".to_string(),
2466 },
2467 LogEntry {
2468 level: LogLevel::Debug,
2469 message: "Processing data".to_string(),
2470 },
2471 LogEntry {
2472 level: LogLevel::Warn,
2473 message: "Deprecated API used".to_string(),
2474 },
2475 LogEntry {
2476 level: LogLevel::Error,
2477 message: "Non-fatal error".to_string(),
2478 },
2479 ],
2480 error: String::new(),
2481 return_value: "done".to_string(),
2482 trace: vec![],
2483 };
2484
2485 assert_eq!(result.logs.len(), 4);
2486 assert_eq!(result.logs[0].level, LogLevel::Log);
2487 assert_eq!(result.logs[1].level, LogLevel::Debug);
2488 assert_eq!(result.logs[2].level, LogLevel::Warn);
2489 assert_eq!(result.logs[3].level, LogLevel::Error);
2490 }
2491
2492 #[test]
2497 fn test_queued_request_required_fields() {
2498 let (tx, _rx) = oneshot::channel();
2499
2500 let request = QueuedRequest {
2501 plugin_id: "test-plugin".to_string(),
2502 compiled_code: Some("module.exports.handler = () => {}".to_string()),
2503 plugin_path: None,
2504 params: serde_json::json!({"key": "value"}),
2505 headers: None,
2506 socket_path: "/tmp/test.sock".to_string(),
2507 http_request_id: Some("req-123".to_string()),
2508 timeout_secs: Some(30),
2509 route: Some("/api/test".to_string()),
2510 config: Some(serde_json::json!({"setting": true})),
2511 method: Some("POST".to_string()),
2512 query: Some(serde_json::json!({"page": "1"})),
2513 response_tx: tx,
2514 };
2515
2516 assert_eq!(request.plugin_id, "test-plugin");
2517 assert!(request.compiled_code.is_some());
2518 assert!(request.plugin_path.is_none());
2519 assert_eq!(request.timeout_secs, Some(30));
2520 }
2521
2522 #[test]
2523 fn test_queued_request_minimal() {
2524 let (tx, _rx) = oneshot::channel();
2525
2526 let request = QueuedRequest {
2527 plugin_id: "minimal".to_string(),
2528 compiled_code: None,
2529 plugin_path: Some("/path/to/plugin.ts".to_string()),
2530 params: serde_json::json!(null),
2531 headers: None,
2532 socket_path: "/tmp/min.sock".to_string(),
2533 http_request_id: None,
2534 timeout_secs: None,
2535 route: None,
2536 config: None,
2537 method: None,
2538 query: None,
2539 response_tx: tx,
2540 };
2541
2542 assert_eq!(request.plugin_id, "minimal");
2543 assert!(request.compiled_code.is_none());
2544 assert!(request.plugin_path.is_some());
2545 }
2546
2547 #[test]
2552 fn test_plugin_error_socket_error() {
2553 let err = PluginError::SocketError("Connection failed".to_string());
2554 let display = format!("{}", err);
2555 assert!(display.contains("Socket error"));
2556 assert!(display.contains("Connection failed"));
2557 }
2558
2559 #[test]
2560 fn test_plugin_error_plugin_execution_error() {
2561 let err = PluginError::PluginExecutionError("Execution failed".to_string());
2562 let display = format!("{}", err);
2563 assert!(display.contains("Execution failed"));
2564 }
2565
2566 #[test]
2567 fn test_plugin_error_handler_error() {
2568 let payload = PluginHandlerPayload {
2569 message: "Handler error".to_string(),
2570 status: 400,
2571 code: Some("BAD_REQUEST".to_string()),
2572 details: Some(serde_json::json!({"field": "name"})),
2573 logs: None,
2574 traces: None,
2575 };
2576 let err = PluginError::HandlerError(Box::new(payload));
2577
2578 let display = format!("{:?}", err);
2580 assert!(display.contains("HandlerError"));
2581 }
2582
2583 #[test]
2588 fn test_plugin_handler_payload_full() {
2589 let payload = PluginHandlerPayload {
2590 message: "Validation failed".to_string(),
2591 status: 422,
2592 code: Some("VALIDATION_ERROR".to_string()),
2593 details: Some(serde_json::json!({
2594 "errors": [
2595 {"field": "email", "message": "Invalid format"}
2596 ]
2597 })),
2598 logs: Some(vec![LogEntry {
2599 level: LogLevel::Error,
2600 message: "Validation failed for email".to_string(),
2601 }]),
2602 traces: Some(vec![serde_json::json!({"stack": "Error at line 10"})]),
2603 };
2604
2605 assert_eq!(payload.status, 422);
2606 assert_eq!(payload.code, Some("VALIDATION_ERROR".to_string()));
2607 assert!(payload.logs.is_some());
2608 assert!(payload.traces.is_some());
2609 }
2610
2611 #[test]
2612 fn test_plugin_handler_payload_minimal() {
2613 let payload = PluginHandlerPayload {
2614 message: "Error".to_string(),
2615 status: 500,
2616 code: None,
2617 details: None,
2618 logs: None,
2619 traces: None,
2620 };
2621
2622 assert_eq!(payload.status, 500);
2623 assert!(payload.code.is_none());
2624 assert!(payload.details.is_none());
2625 }
2626
2627 #[tokio::test]
2632 async fn test_pool_manager_not_initialized_health_check() {
2633 let manager = PoolManager::with_socket_path("/tmp/test-health.sock".to_string());
2634
2635 let health = manager.health_check().await.unwrap();
2637
2638 assert!(!health.healthy);
2639 assert_eq!(health.status, "not_initialized");
2640 assert!(health.uptime_ms.is_none());
2641 assert!(health.memory.is_none());
2642 }
2643
2644 #[tokio::test]
2645 async fn test_pool_manager_circuit_info_in_health_status() {
2646 let manager = PoolManager::with_socket_path("/tmp/test-circuit.sock".to_string());
2647
2648 let health = manager.health_check().await.unwrap();
2649
2650 assert!(health.circuit_state.is_some());
2652 assert_eq!(health.circuit_state, Some("closed".to_string()));
2653 assert!(health.avg_response_time_ms.is_some());
2654 assert!(health.recovering.is_some());
2655 assert!(health.recovery_percent.is_some());
2656 }
2657
2658 #[tokio::test]
2659 async fn test_invalidate_plugin_when_not_initialized() {
2660 let manager = PoolManager::with_socket_path("/tmp/test-invalidate.sock".to_string());
2661
2662 let result = manager.invalidate_plugin("test-plugin".to_string()).await;
2664
2665 assert!(result.is_ok());
2667 }
2668
2669 #[tokio::test]
2670 async fn test_shutdown_when_not_initialized() {
2671 let manager = PoolManager::with_socket_path("/tmp/test-shutdown.sock".to_string());
2672
2673 let result = manager.shutdown().await;
2675
2676 assert!(result.is_ok());
2678 }
2679
2680 #[test]
2685 fn test_parsed_health_result_default() {
2686 let result = ParsedHealthResult::default();
2687 assert_eq!(result.status, "");
2688 assert_eq!(result.uptime_ms, None);
2689 assert_eq!(result.memory, None);
2690 assert_eq!(result.pool_completed, None);
2691 assert_eq!(result.pool_queued, None);
2692 assert_eq!(result.success_rate, None);
2693 }
2694
2695 #[test]
2696 fn test_parsed_health_result_equality() {
2697 let result1 = ParsedHealthResult {
2698 status: "ok".to_string(),
2699 uptime_ms: Some(1000),
2700 memory: Some(500000),
2701 pool_completed: Some(50),
2702 pool_queued: Some(2),
2703 success_rate: Some(1.0),
2704 };
2705 let result2 = ParsedHealthResult {
2706 status: "ok".to_string(),
2707 uptime_ms: Some(1000),
2708 memory: Some(500000),
2709 pool_completed: Some(50),
2710 pool_queued: Some(2),
2711 success_rate: Some(1.0),
2712 };
2713 assert_eq!(result1, result2);
2714 }
2715
2716 #[test]
2717 fn test_format_return_value_nested_object() {
2718 let value = Some(serde_json::json!({
2719 "user": { "name": "John", "age": 30 }
2720 }));
2721 let result = PoolManager::format_return_value(value);
2722 assert!(result.contains("John"));
2723 assert!(result.contains("30"));
2724 }
2725
2726 #[test]
2727 fn test_format_return_value_empty_collections() {
2728 let value = Some(serde_json::json!({}));
2729 assert_eq!(PoolManager::format_return_value(value), "{}");
2730 let value = Some(serde_json::json!([]));
2731 assert_eq!(PoolManager::format_return_value(value), "[]");
2732 }
2733
2734 #[test]
2735 fn test_parse_health_result_zero_values() {
2736 let json = serde_json::json!({
2737 "status": "starting",
2738 "uptime": 0,
2739 "memory": { "heapUsed": 0 },
2740 "pool": { "completed": 0, "queued": 0 },
2741 "execution": { "successRate": 0.0 }
2742 });
2743 let result = PoolManager::parse_health_result(&json);
2744 assert_eq!(result.status, "starting");
2745 assert_eq!(result.uptime_ms, Some(0));
2746 assert_eq!(result.memory, Some(0));
2747 assert_eq!(result.pool_completed, Some(0));
2748 assert_eq!(result.pool_queued, Some(0));
2749 assert_eq!(result.success_rate, Some(0.0));
2750 }
2751
2752 #[test]
2753 fn test_calculate_heap_size_precise_calculations() {
2754 assert_eq!(PoolManager::calculate_heap_size(0), 512);
2755 assert_eq!(PoolManager::calculate_heap_size(1), 512);
2756 assert_eq!(PoolManager::calculate_heap_size(10), 544);
2757 assert_eq!(PoolManager::calculate_heap_size(20), 576);
2758 assert_eq!(PoolManager::calculate_heap_size(100), 832);
2759 assert_eq!(PoolManager::calculate_heap_size(200), 1152);
2760 }
2761
2762 #[tokio::test]
2763 async fn test_pool_manager_health_check_flag_initial() {
2764 let manager = PoolManager::new();
2765 assert!(!manager.health_check_needed.load(Ordering::Relaxed));
2766 }
2767
2768 #[tokio::test]
2769 async fn test_pool_manager_consecutive_failures_initial() {
2770 let manager = PoolManager::new();
2771 assert_eq!(manager.consecutive_failures.load(Ordering::Relaxed), 0);
2772 }
2773
2774 #[tokio::test]
2775 async fn test_recovery_allowance_bounds() {
2776 let manager = PoolManager::new();
2777 manager.recovery_allowance.store(0, Ordering::Relaxed);
2778 assert_eq!(manager.recovery_allowance_percent(), 0);
2779 manager.recovery_allowance.store(50, Ordering::Relaxed);
2780 assert_eq!(manager.recovery_allowance_percent(), 50);
2781 manager.recovery_allowance.store(100, Ordering::Relaxed);
2782 assert_eq!(manager.recovery_allowance_percent(), 100);
2783 }
2784
2785 #[tokio::test]
2786 async fn test_is_initialized_changes_with_state() {
2787 let manager = PoolManager::with_socket_path("/tmp/init-test-123.sock".to_string());
2788 assert!(!manager.is_initialized().await);
2789 manager.initialized.store(true, Ordering::Release);
2790 assert!(manager.is_initialized().await);
2791 manager.initialized.store(false, Ordering::Release);
2792 assert!(!manager.is_initialized().await);
2793 }
2794
2795 #[test]
2800 fn test_is_dead_server_error_with_script_timeout() {
2801 let err = PluginError::ScriptTimeout(30);
2803 assert!(!PoolManager::is_dead_server_error(&err));
2804 }
2805
2806 #[test]
2807 fn test_is_dead_server_error_with_plugin_error() {
2808 let err = PluginError::PluginError("some plugin error".to_string());
2809 assert!(!PoolManager::is_dead_server_error(&err));
2810 }
2811
2812 #[test]
2813 fn test_is_dead_server_error_with_connection_timeout_in_plugin_error() {
2814 let err = PluginError::PluginExecutionError("connection timed out".to_string());
2819 assert!(!PoolManager::is_dead_server_error(&err));
2822
2823 let err = PluginError::SocketError("connect timed out".to_string());
2826 assert!(PoolManager::is_dead_server_error(&err));
2827 }
2828
2829 #[test]
2830 fn test_parse_pool_response_success_with_logs_various_levels() {
2831 use super::super::protocol::{PoolLogEntry, PoolResponse};
2832
2833 let response = PoolResponse {
2834 task_id: "test-levels".to_string(),
2835 success: true,
2836 result: Some(serde_json::json!("ok")),
2837 error: None,
2838 logs: Some(vec![
2839 PoolLogEntry {
2840 level: "log".to_string(),
2841 message: "log level".to_string(),
2842 },
2843 PoolLogEntry {
2844 level: "debug".to_string(),
2845 message: "debug level".to_string(),
2846 },
2847 PoolLogEntry {
2848 level: "info".to_string(),
2849 message: "info level".to_string(),
2850 },
2851 PoolLogEntry {
2852 level: "warn".to_string(),
2853 message: "warn level".to_string(),
2854 },
2855 PoolLogEntry {
2856 level: "error".to_string(),
2857 message: "error level".to_string(),
2858 },
2859 PoolLogEntry {
2860 level: "result".to_string(),
2861 message: "result level".to_string(),
2862 },
2863 ]),
2864 };
2865
2866 let result = PoolManager::parse_pool_response(response).unwrap();
2867 assert_eq!(result.logs.len(), 6);
2868 assert_eq!(result.logs[0].level, LogLevel::Log);
2869 assert_eq!(result.logs[1].level, LogLevel::Debug);
2870 assert_eq!(result.logs[2].level, LogLevel::Info);
2871 assert_eq!(result.logs[3].level, LogLevel::Warn);
2872 assert_eq!(result.logs[4].level, LogLevel::Error);
2873 assert_eq!(result.logs[5].level, LogLevel::Result);
2874 }
2875
2876 #[test]
2877 fn test_parse_error_response_defaults() {
2878 use super::super::protocol::PoolResponse;
2879
2880 let response = PoolResponse {
2882 task_id: "no-error".to_string(),
2883 success: false,
2884 result: None,
2885 error: None,
2886 logs: None,
2887 };
2888
2889 let err = PoolManager::parse_error_response(response);
2890 match err {
2891 PluginError::HandlerError(payload) => {
2892 assert_eq!(payload.message, "Unknown error");
2893 assert_eq!(payload.status, 500);
2894 assert!(payload.code.is_none());
2895 assert!(payload.details.is_none());
2896 }
2897 _ => panic!("Expected HandlerError"),
2898 }
2899 }
2900
2901 #[test]
2902 fn test_format_return_value_float() {
2903 let value = Some(serde_json::json!(3.14159));
2904 let result = PoolManager::format_return_value(value);
2905 assert!(result.contains("3.14159"));
2906 }
2907
2908 #[test]
2909 fn test_format_return_value_large_array() {
2910 let value = Some(serde_json::json!([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]));
2911 let result = PoolManager::format_return_value(value);
2912 assert_eq!(result, "[1,2,3,4,5,6,7,8,9,10]");
2913 }
2914
2915 #[test]
2916 fn test_format_return_value_string_with_special_chars() {
2917 let value = Some(serde_json::json!("hello\nworld\ttab"));
2918 assert_eq!(PoolManager::format_return_value(value), "hello\nworld\ttab");
2919 }
2920
2921 #[test]
2922 fn test_format_return_value_unicode() {
2923 let value = Some(serde_json::json!("こんにちは世界 🌍"));
2924 assert_eq!(PoolManager::format_return_value(value), "こんにちは世界 🌍");
2925 }
2926
2927 #[test]
2928 fn test_parse_health_result_large_values() {
2929 let json = serde_json::json!({
2930 "status": "healthy",
2931 "uptime": 999999999999_u64,
2932 "memory": { "heapUsed": 9999999999_u64 },
2933 "pool": { "completed": 999999999_u64, "queued": 999999_u64 },
2934 "execution": { "successRate": 0.999999 }
2935 });
2936
2937 let result = PoolManager::parse_health_result(&json);
2938 assert_eq!(result.status, "healthy");
2939 assert_eq!(result.uptime_ms, Some(999999999999));
2940 assert_eq!(result.memory, Some(9999999999));
2941 assert_eq!(result.pool_completed, Some(999999999));
2942 assert_eq!(result.pool_queued, Some(999999));
2943 assert!((result.success_rate.unwrap() - 0.999999).abs() < 0.0000001);
2944 }
2945
2946 #[test]
2947 fn test_parse_health_result_negative_values_treated_as_none() {
2948 let json = serde_json::json!({
2950 "status": "error",
2951 "uptime": -1,
2952 "memory": { "heapUsed": -100 }
2953 });
2954
2955 let result = PoolManager::parse_health_result(&json);
2956 assert_eq!(result.status, "error");
2957 assert_eq!(result.uptime_ms, None); assert_eq!(result.memory, None);
2959 }
2960
2961 #[test]
2962 fn test_parsed_health_result_debug() {
2963 let result = ParsedHealthResult {
2964 status: "test".to_string(),
2965 uptime_ms: Some(100),
2966 memory: Some(200),
2967 pool_completed: Some(50),
2968 pool_queued: Some(5),
2969 success_rate: Some(0.95),
2970 };
2971
2972 let debug_str = format!("{:?}", result);
2973 assert!(debug_str.contains("test"));
2974 assert!(debug_str.contains("100"));
2975 assert!(debug_str.contains("200"));
2976 }
2977
2978 #[test]
2979 fn test_calculate_heap_size_boundary_values() {
2980 assert_eq!(PoolManager::calculate_heap_size(9), 512);
2983 assert_eq!(PoolManager::calculate_heap_size(10), 544);
2985
2986 assert_eq!(PoolManager::calculate_heap_size(2400), 8192);
2989 assert_eq!(PoolManager::calculate_heap_size(2399), 8160);
2991 }
2992
2993 #[tokio::test]
2994 async fn test_pool_manager_socket_path_format() {
2995 let manager = PoolManager::new();
2996 assert!(manager.socket_path.starts_with("/tmp/relayer-plugin-pool-"));
2998 assert!(manager.socket_path.ends_with(".sock"));
2999 let uuid_part = manager
3001 .socket_path
3002 .strip_prefix("/tmp/relayer-plugin-pool-")
3003 .unwrap()
3004 .strip_suffix(".sock")
3005 .unwrap();
3006 assert_eq!(uuid_part.len(), 36);
3007 }
3008
3009 #[tokio::test]
3010 async fn test_health_check_socket_missing() {
3011 let manager =
3012 PoolManager::with_socket_path("/tmp/nonexistent-socket-12345.sock".to_string());
3013 manager.initialized.store(true, Ordering::Release);
3015
3016 let health = manager.health_check().await.unwrap();
3017 assert!(!health.healthy);
3018 assert_eq!(health.status, "socket_missing");
3019 }
3020
3021 #[test]
3022 fn test_is_dead_server_error_embedded_patterns() {
3023 let err = PluginError::PluginExecutionError(
3025 "Error: ECONNREFUSED connection refused at 127.0.0.1:3000".to_string(),
3026 );
3027 assert!(PoolManager::is_dead_server_error(&err));
3028
3029 let err = PluginError::PluginExecutionError(
3030 "SocketError: broken pipe while writing to /tmp/socket".to_string(),
3031 );
3032 assert!(PoolManager::is_dead_server_error(&err));
3033
3034 let err = PluginError::PluginExecutionError(
3035 "IO Error: No such file or directory (os error 2)".to_string(),
3036 );
3037 assert!(PoolManager::is_dead_server_error(&err));
3038 }
3039
3040 #[test]
3041 fn test_is_dead_server_error_mixed_case_timeout_patterns() {
3042 let variants = vec![
3044 "HANDLER TIMED OUT",
3045 "Handler Timed Out after 30s",
3046 "the handler timed out waiting for response",
3047 ];
3048
3049 for msg in variants {
3050 let err = PluginError::PluginExecutionError(msg.to_string());
3051 assert!(
3052 !PoolManager::is_dead_server_error(&err),
3053 "Expected '{}' to NOT be dead server error",
3054 msg
3055 );
3056 }
3057 }
3058
3059 #[tokio::test]
3060 async fn test_ensure_started_idempotent() {
3061 let manager = PoolManager::with_socket_path("/tmp/idempotent-test-999.sock".to_string());
3062
3063 assert!(!manager.is_initialized().await);
3065
3066 manager.initialized.store(true, Ordering::Release);
3068
3069 let result = manager.ensure_started().await;
3071 assert!(result.is_ok());
3072 assert!(manager.is_initialized().await);
3073 }
3074
3075 #[test]
3076 fn test_queued_request_with_headers() {
3077 let (tx, _rx) = oneshot::channel();
3078
3079 let mut headers = HashMap::new();
3080 headers.insert(
3081 "Authorization".to_string(),
3082 vec!["Bearer token".to_string()],
3083 );
3084 headers.insert(
3085 "Content-Type".to_string(),
3086 vec!["application/json".to_string()],
3087 );
3088
3089 let request = QueuedRequest {
3090 plugin_id: "headers-test".to_string(),
3091 compiled_code: None,
3092 plugin_path: Some("/path/to/plugin.ts".to_string()),
3093 params: serde_json::json!({}),
3094 headers: Some(headers),
3095 socket_path: "/tmp/test.sock".to_string(),
3096 http_request_id: None,
3097 timeout_secs: None,
3098 route: None,
3099 config: None,
3100 method: None,
3101 query: None,
3102 response_tx: tx,
3103 };
3104
3105 assert!(request.headers.is_some());
3106 let headers = request.headers.unwrap();
3107 assert!(headers.contains_key("Authorization"));
3108 assert!(headers.contains_key("Content-Type"));
3109 }
3110
3111 #[test]
3112 fn test_plugin_error_display_formats() {
3113 let err = PluginError::SocketError("test socket error".to_string());
3115 assert!(format!("{}", err).contains("Socket error"));
3116
3117 let err = PluginError::PluginExecutionError("test execution error".to_string());
3118 assert!(format!("{}", err).contains("test execution error"));
3119
3120 let err = PluginError::ScriptTimeout(60);
3121 assert!(format!("{}", err).contains("60"));
3122
3123 let err = PluginError::PluginError("test plugin error".to_string());
3124 assert!(format!("{}", err).contains("test plugin error"));
3125 }
3126
3127 #[test]
3128 fn test_pool_log_entry_to_log_entry_conversion() {
3129 use super::super::protocol::PoolLogEntry;
3130
3131 let pool_log = PoolLogEntry {
3133 level: "info".to_string(),
3134 message: "test message".to_string(),
3135 };
3136
3137 let log_entry: LogEntry = pool_log.into();
3138 assert_eq!(log_entry.level, LogLevel::Info);
3139 assert_eq!(log_entry.message, "test message");
3140
3141 let pool_log = PoolLogEntry {
3143 level: "unknown_level".to_string(),
3144 message: "unknown level message".to_string(),
3145 };
3146
3147 let log_entry: LogEntry = pool_log.into();
3148 assert_eq!(log_entry.level, LogLevel::Log); }
3150
3151 #[tokio::test]
3152 async fn test_circuit_breaker_records_success() {
3153 let manager = PoolManager::new();
3154
3155 manager.circuit_breaker.record_success(100);
3157 manager.circuit_breaker.record_success(150);
3158 manager.circuit_breaker.record_success(200);
3159
3160 let avg = manager.avg_response_time_ms();
3162 assert!(avg > 0);
3163 }
3164
3165 #[tokio::test]
3166 async fn test_circuit_breaker_state_transitions() {
3167 let manager = PoolManager::new();
3168
3169 assert_eq!(manager.circuit_state(), CircuitState::Closed);
3171
3172 for _ in 0..20 {
3174 manager.circuit_breaker.record_failure();
3175 }
3176
3177 let state = manager.circuit_state();
3179 assert!(matches!(
3180 state,
3181 CircuitState::Closed | CircuitState::HalfOpen | CircuitState::Open
3182 ));
3183 }
3184
3185 #[tokio::test]
3186 async fn test_recovery_mode_activation() {
3187 let manager = PoolManager::new();
3188
3189 manager.recovery_allowance.store(10, Ordering::Relaxed);
3191 manager.recovery_mode.store(true, Ordering::Relaxed);
3192
3193 assert!(manager.is_recovering());
3194 assert_eq!(manager.recovery_allowance_percent(), 10);
3195
3196 manager.recovery_allowance.store(50, Ordering::Relaxed);
3198 assert_eq!(manager.recovery_allowance_percent(), 50);
3199
3200 manager.recovery_mode.store(false, Ordering::Relaxed);
3202 assert!(!manager.is_recovering());
3203 }
3204
3205 #[test]
3206 fn test_parse_pool_response_with_empty_logs() {
3207 use super::super::protocol::PoolResponse;
3208
3209 let response = PoolResponse {
3210 task_id: "empty-logs".to_string(),
3211 success: true,
3212 result: Some(serde_json::json!("done")),
3213 error: None,
3214 logs: Some(vec![]), };
3216
3217 let result = PoolManager::parse_pool_response(response).unwrap();
3218 assert!(result.logs.is_empty());
3219 assert_eq!(result.return_value, "done");
3220 }
3221
3222 #[test]
3223 fn test_handler_payload_with_complex_details() {
3224 let payload = PluginHandlerPayload {
3225 message: "Complex error".to_string(),
3226 status: 400,
3227 code: Some("VALIDATION_ERROR".to_string()),
3228 details: Some(serde_json::json!({
3229 "errors": [
3230 {"field": "email", "code": "invalid", "message": "Invalid email format"},
3231 {"field": "password", "code": "weak", "message": "Password too weak"}
3232 ],
3233 "metadata": {
3234 "requestId": "req-123",
3235 "timestamp": "2024-01-01T00:00:00Z"
3236 }
3237 })),
3238 logs: None,
3239 traces: None,
3240 };
3241
3242 assert_eq!(payload.status, 400);
3243 let details = payload.details.unwrap();
3244 assert!(details.get("errors").is_some());
3245 assert!(details.get("metadata").is_some());
3246 }
3247
3248 #[test]
3249 fn test_health_status_construction_healthy() {
3250 use super::super::health::HealthStatus;
3251
3252 let status = HealthStatus {
3253 healthy: true,
3254 status: "ok".to_string(),
3255 uptime_ms: Some(1000000),
3256 memory: Some(500000000),
3257 pool_completed: Some(1000),
3258 pool_queued: Some(5),
3259 success_rate: Some(0.99),
3260 circuit_state: Some("closed".to_string()),
3261 avg_response_time_ms: Some(50),
3262 recovering: Some(false),
3263 recovery_percent: Some(100),
3264 shared_socket_available_slots: Some(100),
3265 shared_socket_active_connections: Some(10),
3266 shared_socket_registered_executions: Some(5),
3267 connection_pool_available_slots: Some(50),
3268 connection_pool_active_connections: Some(5),
3269 };
3270
3271 assert!(status.healthy);
3272 assert_eq!(status.status, "ok");
3273 assert_eq!(status.uptime_ms, Some(1000000));
3274 assert_eq!(status.circuit_state, Some("closed".to_string()));
3275 }
3276
3277 #[test]
3278 fn test_health_status_construction_unhealthy() {
3279 use super::super::health::HealthStatus;
3280
3281 let status = HealthStatus {
3282 healthy: false,
3283 status: "connection_failed".to_string(),
3284 uptime_ms: None,
3285 memory: None,
3286 pool_completed: None,
3287 pool_queued: None,
3288 success_rate: None,
3289 circuit_state: Some("open".to_string()),
3290 avg_response_time_ms: Some(0),
3291 recovering: Some(true),
3292 recovery_percent: Some(10),
3293 shared_socket_available_slots: None,
3294 shared_socket_active_connections: None,
3295 shared_socket_registered_executions: None,
3296 connection_pool_available_slots: None,
3297 connection_pool_active_connections: None,
3298 };
3299
3300 assert!(!status.healthy);
3301 assert_eq!(status.status, "connection_failed");
3302 assert!(status.uptime_ms.is_none());
3303 }
3304
3305 #[test]
3306 fn test_health_status_debug_format() {
3307 use super::super::health::HealthStatus;
3308
3309 let status = HealthStatus {
3310 healthy: true,
3311 status: "test".to_string(),
3312 uptime_ms: Some(100),
3313 memory: None,
3314 pool_completed: None,
3315 pool_queued: None,
3316 success_rate: None,
3317 circuit_state: None,
3318 avg_response_time_ms: None,
3319 recovering: None,
3320 recovery_percent: None,
3321 shared_socket_available_slots: None,
3322 shared_socket_active_connections: None,
3323 shared_socket_registered_executions: None,
3324 connection_pool_available_slots: None,
3325 connection_pool_active_connections: None,
3326 };
3327
3328 let debug_str = format!("{:?}", status);
3329 assert!(debug_str.contains("healthy: true"));
3330 assert!(debug_str.contains("test"));
3331 }
3332
3333 #[test]
3334 fn test_health_status_clone() {
3335 use super::super::health::HealthStatus;
3336
3337 let status = HealthStatus {
3338 healthy: true,
3339 status: "original".to_string(),
3340 uptime_ms: Some(500),
3341 memory: Some(100),
3342 pool_completed: Some(10),
3343 pool_queued: Some(1),
3344 success_rate: Some(0.95),
3345 circuit_state: Some("closed".to_string()),
3346 avg_response_time_ms: Some(25),
3347 recovering: Some(false),
3348 recovery_percent: Some(100),
3349 shared_socket_available_slots: Some(50),
3350 shared_socket_active_connections: Some(2),
3351 shared_socket_registered_executions: Some(1),
3352 connection_pool_available_slots: Some(25),
3353 connection_pool_active_connections: Some(1),
3354 };
3355
3356 let cloned = status.clone();
3357 assert_eq!(cloned.healthy, status.healthy);
3358 assert_eq!(cloned.status, status.status);
3359 assert_eq!(cloned.uptime_ms, status.uptime_ms);
3360 }
3361
3362 #[test]
3363 fn test_execute_request_debug() {
3364 use super::super::protocol::ExecuteRequest;
3365
3366 let request = ExecuteRequest {
3367 task_id: "debug-test".to_string(),
3368 plugin_id: "test-plugin".to_string(),
3369 compiled_code: None,
3370 plugin_path: Some("/path/to/plugin.ts".to_string()),
3371 params: serde_json::json!({"test": true}),
3372 headers: None,
3373 socket_path: "/tmp/test.sock".to_string(),
3374 http_request_id: None,
3375 timeout: None,
3376 route: None,
3377 config: None,
3378 method: None,
3379 query: None,
3380 };
3381
3382 let debug_str = format!("{:?}", request);
3383 assert!(debug_str.contains("debug-test"));
3384 assert!(debug_str.contains("test-plugin"));
3385 }
3386
3387 #[test]
3388 fn test_pool_error_debug() {
3389 use super::super::protocol::PoolError;
3390
3391 let error = PoolError {
3392 message: "Test error".to_string(),
3393 code: Some("TEST_ERR".to_string()),
3394 status: Some(400),
3395 details: Some(serde_json::json!({"info": "test"})),
3396 };
3397
3398 let debug_str = format!("{:?}", error);
3399 assert!(debug_str.contains("Test error"));
3400 assert!(debug_str.contains("TEST_ERR"));
3401 }
3402
3403 #[test]
3404 fn test_pool_response_debug() {
3405 use super::super::protocol::PoolResponse;
3406
3407 let response = PoolResponse {
3408 task_id: "resp-123".to_string(),
3409 success: true,
3410 result: Some(serde_json::json!("result")),
3411 error: None,
3412 logs: None,
3413 };
3414
3415 let debug_str = format!("{:?}", response);
3416 assert!(debug_str.contains("resp-123"));
3417 assert!(debug_str.contains("true"));
3418 }
3419
3420 #[test]
3421 fn test_pool_log_entry_debug() {
3422 use super::super::protocol::PoolLogEntry;
3423
3424 let entry = PoolLogEntry {
3425 level: "info".to_string(),
3426 message: "Test message".to_string(),
3427 };
3428
3429 let debug_str = format!("{:?}", entry);
3430 assert!(debug_str.contains("info"));
3431 assert!(debug_str.contains("Test message"));
3432 }
3433
3434 #[test]
3435 fn test_circuit_breaker_default_trait() {
3436 use super::super::health::CircuitBreaker;
3437
3438 let cb = CircuitBreaker::default();
3439 assert_eq!(cb.state(), CircuitState::Closed);
3440 }
3441
3442 #[test]
3443 fn test_circuit_breaker_set_state_all_variants() {
3444 use super::super::health::CircuitBreaker;
3445
3446 let cb = CircuitBreaker::new();
3447
3448 cb.set_state(CircuitState::HalfOpen);
3450 assert_eq!(cb.state(), CircuitState::HalfOpen);
3451
3452 cb.set_state(CircuitState::Open);
3453 assert_eq!(cb.state(), CircuitState::Open);
3454
3455 cb.set_state(CircuitState::Closed);
3456 assert_eq!(cb.state(), CircuitState::Closed);
3457 }
3458
3459 #[test]
3460 fn test_circuit_breaker_failure_rate_triggers_open() {
3461 use super::super::health::CircuitBreaker;
3462
3463 let cb = CircuitBreaker::new();
3464
3465 for _ in 0..100 {
3467 cb.record_failure();
3468 }
3469
3470 assert_eq!(cb.state(), CircuitState::Open);
3471 }
3472
3473 #[test]
3474 fn test_circuit_breaker_low_failure_rate_stays_closed() {
3475 use super::super::health::CircuitBreaker;
3476
3477 let cb = CircuitBreaker::new();
3478
3479 for _ in 0..90 {
3481 cb.record_success(50);
3482 }
3483 for _ in 0..10 {
3484 cb.record_failure();
3485 }
3486
3487 assert_eq!(cb.state(), CircuitState::Closed);
3489 }
3490
3491 #[test]
3492 fn test_circuit_breaker_ema_response_time() {
3493 use super::super::health::CircuitBreaker;
3494
3495 let cb = CircuitBreaker::new();
3496
3497 cb.record_success(100);
3499 let avg1 = cb.avg_response_time();
3500
3501 cb.record_success(100);
3502 cb.record_success(100);
3503 cb.record_success(100);
3504 let avg2 = cb.avg_response_time();
3505
3506 assert!(avg1 > 0);
3508 assert!(avg2 > 0);
3509 assert!(avg2 <= 100);
3510 }
3511
3512 #[test]
3513 fn test_circuit_breaker_force_close_resets_counters() {
3514 use super::super::health::CircuitBreaker;
3515
3516 let cb = CircuitBreaker::new();
3517 cb.set_state(CircuitState::Open);
3518
3519 cb.force_close();
3520
3521 assert_eq!(cb.state(), CircuitState::Closed);
3522 }
3523
3524 #[test]
3525 fn test_process_status_debug() {
3526 use super::super::health::ProcessStatus;
3527
3528 assert_eq!(format!("{:?}", ProcessStatus::Running), "Running");
3529 assert_eq!(format!("{:?}", ProcessStatus::Exited), "Exited");
3530 assert_eq!(format!("{:?}", ProcessStatus::Unknown), "Unknown");
3531 assert_eq!(format!("{:?}", ProcessStatus::NoProcess), "NoProcess");
3532 }
3533
3534 #[test]
3535 fn test_process_status_clone() {
3536 use super::super::health::ProcessStatus;
3537
3538 let status = ProcessStatus::Running;
3539 let cloned = status;
3540 assert_eq!(status, cloned);
3541 }
3542
3543 #[test]
3548 fn test_dead_server_indicator_all_variants() {
3549 use super::super::health::DeadServerIndicator;
3550
3551 let variants = [
3553 ("eof while parsing", DeadServerIndicator::EofWhileParsing),
3554 ("broken pipe", DeadServerIndicator::BrokenPipe),
3555 ("connection refused", DeadServerIndicator::ConnectionRefused),
3556 ("connection reset", DeadServerIndicator::ConnectionReset),
3557 ("not connected", DeadServerIndicator::NotConnected),
3558 ("failed to connect", DeadServerIndicator::FailedToConnect),
3559 (
3560 "socket file missing",
3561 DeadServerIndicator::SocketFileMissing,
3562 ),
3563 ("no such file", DeadServerIndicator::NoSuchFile),
3564 (
3565 "connection timed out",
3566 DeadServerIndicator::ConnectionTimedOut,
3567 ),
3568 ("connect timed out", DeadServerIndicator::ConnectionTimedOut),
3569 ];
3570
3571 for (pattern, expected) in variants {
3572 let result = DeadServerIndicator::from_error_str(pattern);
3573 assert_eq!(result, Some(expected), "Pattern '{}' should match", pattern);
3574 }
3575 }
3576
3577 #[test]
3578 fn test_dead_server_indicator_debug_format() {
3579 use super::super::health::DeadServerIndicator;
3580
3581 let indicator = DeadServerIndicator::BrokenPipe;
3582 let debug_str = format!("{:?}", indicator);
3583 assert_eq!(debug_str, "BrokenPipe");
3584 }
3585
3586 #[test]
3587 fn test_dead_server_indicator_clone_copy() {
3588 use super::super::health::DeadServerIndicator;
3589
3590 let indicator = DeadServerIndicator::ConnectionRefused;
3591 let cloned = indicator;
3592 assert_eq!(indicator, cloned);
3593 }
3594
3595 #[test]
3596 fn test_result_ring_buffer_not_enough_data() {
3597 use super::super::health::ResultRingBuffer;
3598
3599 let buffer = ResultRingBuffer::new(100);
3600
3601 for _ in 0..9 {
3603 buffer.record(false);
3604 }
3605
3606 assert_eq!(buffer.failure_rate(), 0.0);
3608 }
3609
3610 #[test]
3611 fn test_result_ring_buffer_exactly_10_samples() {
3612 use super::super::health::ResultRingBuffer;
3613
3614 let buffer = ResultRingBuffer::new(100);
3615
3616 for _ in 0..10 {
3618 buffer.record(false);
3619 }
3620
3621 assert_eq!(buffer.failure_rate(), 1.0);
3623 }
3624
3625 #[test]
3626 fn test_result_ring_buffer_wraps_correctly() {
3627 use super::super::health::ResultRingBuffer;
3628
3629 let buffer = ResultRingBuffer::new(10);
3630
3631 for _ in 0..10 {
3633 buffer.record(true);
3634 }
3635 assert_eq!(buffer.failure_rate(), 0.0);
3636
3637 for _ in 0..10 {
3639 buffer.record(false);
3640 }
3641 assert_eq!(buffer.failure_rate(), 1.0);
3642 }
3643
3644 #[test]
3645 fn test_circuit_state_equality_all_pairs() {
3646 assert_eq!(CircuitState::Closed, CircuitState::Closed);
3647 assert_eq!(CircuitState::HalfOpen, CircuitState::HalfOpen);
3648 assert_eq!(CircuitState::Open, CircuitState::Open);
3649
3650 assert_ne!(CircuitState::Closed, CircuitState::HalfOpen);
3651 assert_ne!(CircuitState::Closed, CircuitState::Open);
3652 assert_ne!(CircuitState::HalfOpen, CircuitState::Open);
3653 }
3654
3655 #[test]
3656 fn test_circuit_state_clone_copy() {
3657 let state = CircuitState::HalfOpen;
3658 let copied = state;
3659 assert_eq!(state, copied);
3660 }
3661
3662 #[test]
3663 fn test_parse_pool_response_with_null_values() {
3664 use super::super::protocol::PoolResponse;
3665
3666 let response = PoolResponse {
3667 task_id: "null-test".to_string(),
3668 success: true,
3669 result: Some(serde_json::json!(null)),
3670 error: None,
3671 logs: None,
3672 };
3673
3674 let result = PoolManager::parse_pool_response(response).unwrap();
3675 assert_eq!(result.return_value, "null");
3676 }
3677
3678 #[test]
3679 fn test_parse_pool_response_with_nested_result() {
3680 use super::super::protocol::PoolResponse;
3681
3682 let response = PoolResponse {
3683 task_id: "nested-test".to_string(),
3684 success: true,
3685 result: Some(serde_json::json!({
3686 "level1": {
3687 "level2": {
3688 "level3": "deep value"
3689 }
3690 }
3691 })),
3692 error: None,
3693 logs: None,
3694 };
3695
3696 let result = PoolManager::parse_pool_response(response).unwrap();
3697 assert!(result.return_value.contains("level1"));
3698 assert!(result.return_value.contains("level2"));
3699 assert!(result.return_value.contains("level3"));
3700 assert!(result.return_value.contains("deep value"));
3701 }
3702
3703 #[test]
3704 fn test_parse_pool_response_error_with_details() {
3705 use super::super::protocol::{PoolError, PoolResponse};
3706
3707 let response = PoolResponse {
3708 task_id: "error-details".to_string(),
3709 success: false,
3710 result: None,
3711 error: Some(PoolError {
3712 message: "Error with details".to_string(),
3713 code: Some("DETAILED_ERROR".to_string()),
3714 status: Some(422),
3715 details: Some(serde_json::json!({
3716 "field": "email",
3717 "expected": "string",
3718 "received": "number"
3719 })),
3720 }),
3721 logs: None,
3722 };
3723
3724 let err = PoolManager::parse_pool_response(response).unwrap_err();
3725 match err {
3726 PluginError::HandlerError(payload) => {
3727 assert_eq!(payload.message, "Error with details");
3728 assert_eq!(payload.code, Some("DETAILED_ERROR".to_string()));
3729 assert!(payload.details.is_some());
3730 let details = payload.details.unwrap();
3731 assert_eq!(details.get("field").unwrap(), "email");
3732 }
3733 _ => panic!("Expected HandlerError"),
3734 }
3735 }
3736
3737 #[test]
3738 fn test_parse_health_result_with_all_optional_fields() {
3739 let json = serde_json::json!({
3740 "status": "healthy",
3741 "uptime": 999999,
3742 "memory": {
3743 "heapUsed": 123456789,
3744 "heapTotal": 987654321,
3745 "external": 111111,
3746 "arrayBuffers": 222222
3747 },
3748 "pool": {
3749 "completed": 50000,
3750 "queued": 100,
3751 "active": 50,
3752 "waiting": 25
3753 },
3754 "execution": {
3755 "successRate": 0.9999,
3756 "avgDuration": 45.5,
3757 "totalExecutions": 100000
3758 }
3759 });
3760
3761 let result = PoolManager::parse_health_result(&json);
3762 assert_eq!(result.status, "healthy");
3763 assert_eq!(result.uptime_ms, Some(999999));
3764 assert_eq!(result.memory, Some(123456789));
3765 assert_eq!(result.pool_completed, Some(50000));
3766 assert_eq!(result.pool_queued, Some(100));
3767 assert!((result.success_rate.unwrap() - 0.9999).abs() < 0.0001);
3768 }
3769
3770 #[tokio::test]
3771 async fn test_pool_manager_max_queue_size() {
3772 let manager = PoolManager::new();
3773 assert!(manager.max_queue_size > 0);
3775 }
3776
3777 #[tokio::test]
3778 async fn test_pool_manager_last_restart_time_initial() {
3779 let manager = PoolManager::new();
3780 assert_eq!(manager.last_restart_time_ms.load(Ordering::Relaxed), 0);
3781 }
3782
3783 #[tokio::test]
3784 async fn test_pool_manager_connection_pool_exists() {
3785 let manager = PoolManager::new();
3786 let available = manager.connection_pool.semaphore.available_permits();
3788 assert!(available > 0);
3789 }
3790
3791 #[test]
3792 fn test_is_dead_server_error_with_whitespace() {
3793 let err = PluginError::SocketError(" connection refused ".to_string());
3795 assert!(PoolManager::is_dead_server_error(&err));
3796
3797 let err = PluginError::SocketError("error: broken pipe occurred".to_string());
3798 assert!(PoolManager::is_dead_server_error(&err));
3799 }
3800
3801 #[test]
3802 fn test_is_dead_server_error_multiline() {
3803 let err = PluginError::SocketError(
3805 "Error occurred\nConnection refused\nPlease retry".to_string(),
3806 );
3807 assert!(PoolManager::is_dead_server_error(&err));
3808 }
3809
3810 #[test]
3811 fn test_is_dead_server_error_json_in_message() {
3812 let err = PluginError::PluginExecutionError(
3814 r#"{"error": "connection refused", "code": 61}"#.to_string(),
3815 );
3816 assert!(PoolManager::is_dead_server_error(&err));
3817 }
3818
3819 #[test]
3820 fn test_format_return_value_special_json() {
3821 let value = Some(serde_json::json!(f64::MAX));
3823 let result = PoolManager::format_return_value(value);
3824 assert!(!result.is_empty());
3825
3826 let value = Some(serde_json::json!(i64::MIN));
3827 let result = PoolManager::format_return_value(value);
3828 assert!(result.contains("-"));
3829 }
3830
3831 #[test]
3832 fn test_format_return_value_with_escaped_chars() {
3833 let value = Some(serde_json::json!("line1\nline2\ttab\"quote"));
3834 let result = PoolManager::format_return_value(value);
3835 assert!(result.contains("line1"));
3836 assert!(result.contains("line2"));
3837 }
3838
3839 #[test]
3840 fn test_format_return_value_array_of_objects() {
3841 let value = Some(serde_json::json!([
3842 {"id": 1, "name": "first"},
3843 {"id": 2, "name": "second"}
3844 ]));
3845 let result = PoolManager::format_return_value(value);
3846 assert!(result.contains("first"));
3847 assert!(result.contains("second"));
3848 }
3849
3850 #[test]
3851 fn test_all_log_levels_conversion() {
3852 use super::super::protocol::PoolLogEntry;
3853
3854 let levels = [
3855 ("log", LogLevel::Log),
3856 ("debug", LogLevel::Debug),
3857 ("info", LogLevel::Info),
3858 ("warn", LogLevel::Warn),
3859 ("error", LogLevel::Error),
3860 ("result", LogLevel::Result),
3861 ("unknown_level", LogLevel::Log), ("LOG", LogLevel::Log), ("", LogLevel::Log), ];
3865
3866 for (input, expected) in levels {
3867 let entry = PoolLogEntry {
3868 level: input.to_string(),
3869 message: "test".to_string(),
3870 };
3871 let log_entry: LogEntry = entry.into();
3872 assert_eq!(
3873 log_entry.level, expected,
3874 "Level '{}' should convert to {:?}",
3875 input, expected
3876 );
3877 }
3878 }
3879
3880 #[tokio::test]
3881 async fn test_pool_manager_health_check_flag_manipulation() {
3882 let manager = PoolManager::new();
3883
3884 manager.health_check_needed.store(true, Ordering::Relaxed);
3885 assert!(manager.health_check_needed.load(Ordering::Relaxed));
3886
3887 manager.health_check_needed.store(false, Ordering::Relaxed);
3888 assert!(!manager.health_check_needed.load(Ordering::Relaxed));
3889 }
3890
3891 #[tokio::test]
3892 async fn test_pool_manager_consecutive_failures_manipulation() {
3893 let manager = PoolManager::new();
3894
3895 manager.consecutive_failures.fetch_add(1, Ordering::Relaxed);
3896 assert_eq!(manager.consecutive_failures.load(Ordering::Relaxed), 1);
3897
3898 manager.consecutive_failures.fetch_add(5, Ordering::Relaxed);
3899 assert_eq!(manager.consecutive_failures.load(Ordering::Relaxed), 6);
3900
3901 manager.consecutive_failures.store(0, Ordering::Relaxed);
3902 assert_eq!(manager.consecutive_failures.load(Ordering::Relaxed), 0);
3903 }
3904
3905 #[test]
3906 fn test_parsed_health_result_with_all_none() {
3907 let result = ParsedHealthResult {
3908 status: "minimal".to_string(),
3909 uptime_ms: None,
3910 memory: None,
3911 pool_completed: None,
3912 pool_queued: None,
3913 success_rate: None,
3914 };
3915
3916 assert_eq!(result.status, "minimal");
3917 assert!(result.uptime_ms.is_none());
3918 assert!(result.memory.is_none());
3919 }
3920
3921 #[test]
3922 fn test_parsed_health_result_with_all_some() {
3923 let result = ParsedHealthResult {
3924 status: "complete".to_string(),
3925 uptime_ms: Some(u64::MAX),
3926 memory: Some(u64::MAX),
3927 pool_completed: Some(u64::MAX),
3928 pool_queued: Some(u64::MAX),
3929 success_rate: Some(1.0),
3930 };
3931
3932 assert_eq!(result.status, "complete");
3933 assert_eq!(result.uptime_ms, Some(u64::MAX));
3934 assert_eq!(result.success_rate, Some(1.0));
3935 }
3936
3937 #[test]
3938 fn test_calculate_heap_size_extensive_values() {
3939 let test_cases = [
3941 (0, 512),
3942 (1, 512),
3943 (5, 512),
3944 (9, 512),
3945 (10, 544),
3946 (11, 544),
3947 (19, 544),
3948 (20, 576),
3949 (50, 672),
3950 (100, 832),
3951 (150, 992),
3952 (200, 1152),
3953 (250, 1312),
3954 (300, 1472),
3955 (400, 1792),
3956 (500, 2112),
3957 (1000, 3712),
3958 (2000, 6912),
3959 (2400, 8192), (3000, 8192), (5000, 8192), (10000, 8192), ];
3964
3965 for (concurrency, expected_heap) in test_cases {
3966 let heap = PoolManager::calculate_heap_size(concurrency);
3967 assert_eq!(
3968 heap, expected_heap,
3969 "Concurrency {} should give heap {}",
3970 concurrency, expected_heap
3971 );
3972 }
3973 }
3974
3975 #[tokio::test]
3976 async fn test_pool_manager_drop_cleans_socket() {
3977 let socket_path = format!("/tmp/test-drop-{}.sock", uuid::Uuid::new_v4());
3978
3979 std::fs::write(&socket_path, "test").unwrap();
3981 assert!(std::path::Path::new(&socket_path).exists());
3982
3983 {
3985 let _manager = PoolManager::with_socket_path(socket_path.clone());
3986 }
3988 assert!(!std::path::Path::new(&socket_path).exists());
3992 }
3993
3994 #[test]
3995 fn test_script_result_with_traces() {
3996 let result = ScriptResult {
3997 logs: vec![],
3998 error: String::new(),
3999 return_value: "with traces".to_string(),
4000 trace: vec![
4001 serde_json::json!({"action": "GET", "url": "/api/test"}),
4002 serde_json::json!({"action": "POST", "url": "/api/submit"}),
4003 ],
4004 };
4005
4006 assert_eq!(result.trace.len(), 2);
4007 assert!(result.trace[0].get("action").is_some());
4008 }
4009
4010 #[test]
4011 fn test_script_result_with_error() {
4012 let result = ScriptResult {
4013 logs: vec![LogEntry {
4014 level: LogLevel::Error,
4015 message: "Something went wrong".to_string(),
4016 }],
4017 error: "RuntimeError: undefined is not a function".to_string(),
4018 return_value: String::new(),
4019 trace: vec![],
4020 };
4021
4022 assert!(!result.error.is_empty());
4023 assert!(result.error.contains("RuntimeError"));
4024 assert_eq!(result.logs.len(), 1);
4025 }
4026
4027 #[test]
4028 fn test_plugin_handler_payload_with_traces() {
4029 let payload = PluginHandlerPayload {
4030 message: "Error with traces".to_string(),
4031 status: 500,
4032 code: None,
4033 details: None,
4034 logs: None,
4035 traces: Some(vec![
4036 serde_json::json!({"method": "GET", "path": "/health"}),
4037 serde_json::json!({"method": "POST", "path": "/execute"}),
4038 ]),
4039 };
4040
4041 assert!(payload.traces.is_some());
4042 assert_eq!(payload.traces.as_ref().unwrap().len(), 2);
4043 }
4044
4045 #[test]
4046 fn test_queued_request_all_optional_fields() {
4047 let (tx, _rx) = oneshot::channel();
4048
4049 let mut headers = HashMap::new();
4050 headers.insert(
4051 "X-Custom".to_string(),
4052 vec!["value1".to_string(), "value2".to_string()],
4053 );
4054
4055 let request = QueuedRequest {
4056 plugin_id: "full-request".to_string(),
4057 compiled_code: Some("compiled code here".to_string()),
4058 plugin_path: Some("/path/to/plugin.ts".to_string()),
4059 params: serde_json::json!({"key": "value", "number": 42}),
4060 headers: Some(headers),
4061 socket_path: "/tmp/full.sock".to_string(),
4062 http_request_id: Some("http-123".to_string()),
4063 timeout_secs: Some(60),
4064 route: Some("/api/v1/execute".to_string()),
4065 config: Some(serde_json::json!({"setting": true})),
4066 method: Some("PUT".to_string()),
4067 query: Some(serde_json::json!({"page": 1, "limit": 10})),
4068 response_tx: tx,
4069 };
4070
4071 assert_eq!(request.plugin_id, "full-request");
4072 assert!(request.compiled_code.is_some());
4073 assert!(request.plugin_path.is_some());
4074 assert!(request.headers.is_some());
4075 assert_eq!(request.timeout_secs, Some(60));
4076 assert_eq!(request.method, Some("PUT".to_string()));
4077 }
4078}