1use std::{fmt, sync::Arc};
4
5use crate::observability::request_id::get_request_id;
6use crate::{
7 jobs::JobProducerTrait,
8 models::{
9 AppState, NetworkRepoModel, NotificationRepoModel, PluginCallRequest, PluginMetadata,
10 PluginModel, RelayerRepoModel, SignerRepoModel, ThinDataAppState, TransactionRepoModel,
11 },
12 repositories::{
13 ApiKeyRepositoryTrait, NetworkRepository, PluginRepositoryTrait, RelayerRepository,
14 Repository, TransactionCounterTrait, TransactionRepository,
15 },
16};
17use actix_web::web;
18use async_trait::async_trait;
19use serde::{Deserialize, Serialize};
20use thiserror::Error;
21use uuid::Uuid;
22
23pub mod config;
24pub use config::*;
25
26pub mod health;
27pub use health::*;
28
29pub mod protocol;
30pub use protocol::*;
31
32pub mod connection;
33pub use connection::*;
34
35pub mod runner;
36pub use runner::*;
37
38pub mod relayer_api;
39pub use relayer_api::*;
40
41pub mod script_executor;
42pub use script_executor::*;
43
44pub mod pool_executor;
45pub use pool_executor::*;
46
47pub mod shared_socket;
48pub use shared_socket::*;
49
50#[cfg(test)]
51use mockall::automock;
52
53#[derive(Error, Debug, Serialize)]
54pub enum PluginError {
55 #[error("Socket error: {0}")]
56 SocketError(String),
57 #[error("Plugin error: {0}")]
58 PluginError(String),
59 #[error("Relayer error: {0}")]
60 RelayerError(String),
61 #[error("Plugin execution error: {0}")]
62 PluginExecutionError(String),
63 #[error("Script execution timed out after {0} seconds")]
64 ScriptTimeout(u64),
65 #[error("Invalid method: {0}")]
66 InvalidMethod(String),
67 #[error("Invalid payload: {0}")]
68 InvalidPayload(String),
69 #[error("{0}")]
70 HandlerError(Box<PluginHandlerPayload>),
71}
72
73impl PluginError {
74 pub fn with_traces(self, traces: Vec<serde_json::Value>) -> Self {
77 match self {
78 PluginError::HandlerError(mut payload) => {
79 payload.append_traces(traces);
80 PluginError::HandlerError(payload)
81 }
82 other => other,
83 }
84 }
85}
86
87impl From<PluginError> for String {
88 fn from(error: PluginError) -> Self {
89 error.to_string()
90 }
91}
92
93#[derive(Debug, Serialize, Deserialize, utoipa::ToSchema)]
94pub struct PluginCallResponse {
95 pub result: serde_json::Value,
97 #[serde(skip_serializing_if = "Option::is_none")]
99 pub metadata: Option<PluginMetadata>,
100}
101
102#[derive(Debug, Serialize, Deserialize, utoipa::ToSchema)]
103pub struct PluginHandlerError {
104 #[serde(skip_serializing_if = "Option::is_none")]
105 pub code: Option<String>,
106 #[serde(skip_serializing_if = "Option::is_none")]
107 pub details: Option<serde_json::Value>,
108}
109
110#[derive(Debug)]
111pub struct PluginHandlerResponse {
112 pub status: u16,
113 pub message: String,
114 pub error: PluginHandlerError,
115 pub metadata: Option<PluginMetadata>,
116}
117
118#[derive(Debug, Serialize)]
119pub struct PluginHandlerPayload {
120 pub status: u16,
121 pub message: String,
122 #[serde(skip_serializing_if = "Option::is_none")]
123 pub code: Option<String>,
124 #[serde(skip_serializing_if = "Option::is_none")]
125 pub details: Option<serde_json::Value>,
126 #[serde(skip_serializing_if = "Option::is_none")]
127 pub logs: Option<Vec<LogEntry>>,
128 #[serde(skip_serializing_if = "Option::is_none")]
129 pub traces: Option<Vec<serde_json::Value>>,
130}
131
132impl PluginHandlerPayload {
133 fn append_traces(&mut self, traces: Vec<serde_json::Value>) {
134 match &mut self.traces {
135 Some(existing) => existing.extend(traces),
136 None => self.traces = Some(traces),
137 }
138 }
139
140 fn into_response(self, emit_logs: bool, emit_traces: bool) -> PluginHandlerResponse {
141 let logs = if emit_logs { self.logs } else { None };
142 let traces = if emit_traces { self.traces } else { None };
143 let message = derive_handler_message(&self.message, logs.as_deref());
144 let metadata = build_metadata(logs, traces);
145
146 PluginHandlerResponse {
147 status: self.status,
148 message,
149 error: PluginHandlerError {
150 code: self.code,
151 details: self.details,
152 },
153 metadata,
154 }
155 }
156}
157
158impl fmt::Display for PluginHandlerPayload {
159 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
160 f.write_str(&self.message)
161 }
162}
163
164fn derive_handler_message(message: &str, logs: Option<&[LogEntry]>) -> String {
165 if !message.trim().is_empty() {
166 return message.to_string();
167 }
168
169 if let Some(logs) = logs {
170 if let Some(entry) = logs
171 .iter()
172 .rev()
173 .find(|entry| matches!(entry.level, LogLevel::Error | LogLevel::Warn))
174 {
175 return entry.message.clone();
176 }
177
178 if let Some(entry) = logs.last() {
179 return entry.message.clone();
180 }
181 }
182
183 "Plugin execution failed".to_string()
184}
185
186fn build_metadata(
187 logs: Option<Vec<LogEntry>>,
188 traces: Option<Vec<serde_json::Value>>,
189) -> Option<PluginMetadata> {
190 if logs.is_some() || traces.is_some() {
191 Some(PluginMetadata { logs, traces })
192 } else {
193 None
194 }
195}
196
197fn forward_logs_to_tracing(plugin_id: &str, logs: &[LogEntry], request_id: &str) {
198 for entry in logs {
199 match entry.level {
200 LogLevel::Error => {
201 tracing::error!(target: "plugin", plugin_id = %plugin_id, request_id = %request_id, "{}", entry.message)
202 }
203 LogLevel::Warn => {
204 tracing::warn!(target: "plugin", plugin_id = %plugin_id, request_id = %request_id, "{}", entry.message)
205 }
206 LogLevel::Info | LogLevel::Log => {
207 tracing::info!(target: "plugin", plugin_id = %plugin_id, request_id = %request_id, "{}", entry.message)
208 }
209 LogLevel::Debug => {
210 tracing::debug!(target: "plugin", plugin_id = %plugin_id, request_id = %request_id, "{}", entry.message)
211 }
212 LogLevel::Result => {}
213 }
214 }
215}
216
217#[derive(Debug)]
218pub enum PluginCallResult {
219 Success(PluginCallResponse),
220 Handler(PluginHandlerResponse),
221 Fatal(PluginError),
222}
223
224#[derive(Default)]
225pub struct PluginService<R: PluginRunnerTrait> {
226 runner: R,
227}
228
229impl<R: PluginRunnerTrait> PluginService<R> {
230 pub fn new(runner: R) -> Self {
231 Self { runner }
232 }
233
234 pub fn resolve_plugin_path(plugin_path: &str) -> String {
235 if plugin_path.starts_with("plugins/") {
236 plugin_path.to_string()
237 } else {
238 format!("plugins/{plugin_path}")
239 }
240 }
241
242 #[allow(clippy::type_complexity)]
243 async fn call_plugin<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
244 &self,
245 plugin: PluginModel,
246 plugin_call_request: PluginCallRequest,
247 state: Arc<ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>>,
248 ) -> PluginCallResult
249 where
250 J: JobProducerTrait + Send + Sync + 'static,
251 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
252 TR: TransactionRepository
253 + Repository<TransactionRepoModel, String>
254 + Send
255 + Sync
256 + 'static,
257 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
258 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
259 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
260 TCR: TransactionCounterTrait + Send + Sync + 'static,
261 PR: PluginRepositoryTrait + Send + Sync + 'static,
262 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
263 {
264 let socket_path = format!("/tmp/{}.sock", Uuid::new_v4());
265 let script_path = Self::resolve_plugin_path(&plugin.path);
266 let script_params = plugin_call_request.params.to_string();
267 let headers_json = plugin_call_request
268 .headers
269 .map(|h| serde_json::to_string(&h).unwrap_or_default());
270 let route = plugin_call_request.route;
271 let config_json = plugin
272 .config
273 .map(|c| serde_json::to_string(&c).unwrap_or_default());
274 let method = plugin_call_request.method;
275 let query_json = plugin_call_request
276 .query
277 .map(|q| serde_json::to_string(&q).unwrap_or_default());
278
279 let request_id = get_request_id();
280 let request_id_for_logs: String = request_id
281 .as_deref()
282 .filter(|id| !id.trim().is_empty())
283 .map(str::to_owned)
284 .unwrap_or_else(|| Uuid::new_v4().to_string());
285 let result = self
286 .runner
287 .run(
288 plugin.id.clone(),
289 &socket_path,
290 script_path,
291 plugin.timeout,
292 script_params,
293 request_id,
294 headers_json,
295 route,
296 config_json,
297 method,
298 query_json,
299 plugin.emit_traces,
300 state,
301 )
302 .await;
303
304 match result {
305 Ok(script_result) => {
306 if plugin.forward_logs {
307 forward_logs_to_tracing(&plugin.id, &script_result.logs, &request_id_for_logs);
308 }
309 let logs = if plugin.emit_logs {
311 Some(script_result.logs)
312 } else {
313 None
314 };
315 let traces = if plugin.emit_traces {
316 Some(script_result.trace)
317 } else {
318 None
319 };
320 let metadata = build_metadata(logs, traces);
321
322 let result = if script_result.return_value.trim() == "undefined" {
324 serde_json::Value::Null
325 } else {
326 serde_json::from_str::<serde_json::Value>(&script_result.return_value)
327 .unwrap_or(serde_json::Value::String(script_result.return_value))
328 };
329
330 PluginCallResult::Success(PluginCallResponse { result, metadata })
331 }
332 Err(e) => match e {
333 PluginError::HandlerError(payload) => {
334 if plugin.forward_logs {
335 if let Some(logs) = payload.logs.as_deref() {
336 forward_logs_to_tracing(&plugin.id, logs, &request_id_for_logs);
337 }
338 }
339 let failure = payload.into_response(plugin.emit_logs, plugin.emit_traces);
340 let has_logs = failure
341 .metadata
342 .as_ref()
343 .and_then(|meta| meta.logs.as_ref())
344 .is_some();
345 let has_traces = failure
346 .metadata
347 .as_ref()
348 .and_then(|meta| meta.traces.as_ref())
349 .is_some();
350
351 tracing::debug!(
352 status = failure.status,
353 message = %failure.message,
354 code = ?failure.error.code.as_ref(),
355 details = ?failure.error.details.as_ref(),
356 has_logs,
357 has_traces,
358 "Plugin handler returned error"
359 );
360
361 PluginCallResult::Handler(failure)
362 }
363 other => {
364 tracing::error!("Plugin execution failed: {:?}", other);
366 PluginCallResult::Fatal(other)
367 }
368 },
369 }
370 }
371}
372
373#[async_trait]
374#[cfg_attr(test, automock)]
375pub trait PluginServiceTrait<J, TR, RR, NR, NFR, SR, TCR, PR, AKR>: Send + Sync
376where
377 J: JobProducerTrait + 'static,
378 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
379 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
380 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
381 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
382 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
383 TCR: TransactionCounterTrait + Send + Sync + 'static,
384 PR: PluginRepositoryTrait + Send + Sync + 'static,
385 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
386{
387 fn new(runner: PluginRunner) -> Self;
388 async fn call_plugin(
389 &self,
390 plugin: PluginModel,
391 plugin_call_request: PluginCallRequest,
392 state: Arc<web::ThinData<AppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>>>,
393 ) -> PluginCallResult;
394}
395
396#[async_trait]
397impl<J, TR, RR, NR, NFR, SR, TCR, PR, AKR> PluginServiceTrait<J, TR, RR, NR, NFR, SR, TCR, PR, AKR>
398 for PluginService<PluginRunner>
399where
400 J: JobProducerTrait + 'static,
401 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
402 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
403 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
404 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
405 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
406 TCR: TransactionCounterTrait + Send + Sync + 'static,
407 PR: PluginRepositoryTrait + Send + Sync + 'static,
408 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
409{
410 fn new(runner: PluginRunner) -> Self {
411 Self::new(runner)
412 }
413
414 async fn call_plugin(
415 &self,
416 plugin: PluginModel,
417 plugin_call_request: PluginCallRequest,
418 state: Arc<web::ThinData<AppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>>>,
419 ) -> PluginCallResult {
420 self.call_plugin(plugin, plugin_call_request, state).await
421 }
422}
423
424#[cfg(test)]
425mod tests {
426 use std::{io::Write, time::Duration};
427
428 use crate::{
429 constants::DEFAULT_PLUGIN_TIMEOUT_SECONDS,
430 jobs::MockJobProducerTrait,
431 models::PluginModel,
432 repositories::{
433 ApiKeyRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage,
434 PluginRepositoryStorage, RelayerRepositoryStorage, SignerRepositoryStorage,
435 TransactionCounterRepositoryStorage, TransactionRepositoryStorage,
436 },
437 utils::mocks::mockutils::create_mock_app_state,
438 };
439
440 use super::*;
441
442 #[test]
443 fn test_resolve_plugin_path() {
444 assert_eq!(
445 PluginService::<MockPluginRunnerTrait>::resolve_plugin_path("plugins/examples/test.ts"),
446 "plugins/examples/test.ts"
447 );
448
449 assert_eq!(
450 PluginService::<MockPluginRunnerTrait>::resolve_plugin_path("examples/test.ts"),
451 "plugins/examples/test.ts"
452 );
453
454 assert_eq!(
455 PluginService::<MockPluginRunnerTrait>::resolve_plugin_path("test.ts"),
456 "plugins/test.ts"
457 );
458 }
459
460 #[tokio::test]
461 async fn test_call_plugin() {
462 let plugin = PluginModel {
463 id: "test-plugin".to_string(),
464 path: "test-path".to_string(),
465 timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
466 emit_logs: true,
467 emit_traces: false,
468 raw_response: false,
469 allow_get_invocation: false,
470 config: None,
471 forward_logs: false,
472 };
473 let app_state =
474 create_mock_app_state(None, None, None, None, Some(vec![plugin.clone()]), None).await;
475
476 let mut plugin_runner = MockPluginRunnerTrait::default();
477
478 plugin_runner
479 .expect_run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>()
480 .returning(|_, _, _, _, _, _, _, _, _, _, _, _, _| {
481 Ok(ScriptResult {
482 logs: vec![LogEntry {
483 level: LogLevel::Log,
484 message: "test-log".to_string(),
485 }],
486 error: "test-error".to_string(),
487 return_value: "test-result".to_string(),
488 trace: Vec::new(),
489 })
490 });
491
492 let plugin_service = PluginService::<MockPluginRunnerTrait>::new(plugin_runner);
493 let outcome = plugin_service
494 .call_plugin(
495 plugin,
496 PluginCallRequest {
497 params: serde_json::Value::Null,
498 headers: None,
499 route: None,
500 method: Some("POST".to_string()),
501 query: None,
502 },
503 Arc::new(web::ThinData(app_state)),
504 )
505 .await;
506 match outcome {
507 PluginCallResult::Success(result) => {
508 assert_eq!(
510 result.result,
511 serde_json::Value::String("test-result".to_string())
512 );
513 assert!(result.metadata.and_then(|meta| meta.logs).is_some());
515 }
516 PluginCallResult::Handler(_) | PluginCallResult::Fatal(_) => {
517 panic!("expected success outcome")
518 }
519 }
520 }
521
522 #[tokio::test]
523 async fn test_from_plugin_error_to_string() {
524 let error = PluginError::PluginExecutionError("test-error".to_string());
525 let result: String = error.into();
526 assert_eq!(result, "Plugin execution error: test-error");
527 }
528
529 #[test]
530 fn test_plugin_error_with_traces_handler_error() {
531 let payload = PluginHandlerPayload {
532 status: 400,
533 message: "test message".to_string(),
534 code: Some("TEST_CODE".to_string()),
535 details: None,
536 logs: None,
537 traces: Some(vec![serde_json::json!({"trace": "1"})]),
538 };
539 let error = PluginError::HandlerError(Box::new(payload));
540 let new_traces = vec![
541 serde_json::json!({"trace": "2"}),
542 serde_json::json!({"trace": "3"}),
543 ];
544
545 let enriched_error = error.with_traces(new_traces);
546
547 match enriched_error {
548 PluginError::HandlerError(payload) => {
549 let traces = payload.traces.unwrap();
550 assert_eq!(traces.len(), 3);
551 assert_eq!(traces[0], serde_json::json!({"trace": "1"}));
552 assert_eq!(traces[1], serde_json::json!({"trace": "2"}));
553 assert_eq!(traces[2], serde_json::json!({"trace": "3"}));
554 }
555 _ => panic!("Expected HandlerError variant"),
556 }
557 }
558
559 #[test]
560 fn test_plugin_error_with_traces_other_variants() {
561 let error = PluginError::PluginExecutionError("test".to_string());
562 let new_traces = vec![serde_json::json!({"trace": "1"})];
563
564 let result = error.with_traces(new_traces);
565
566 match result {
567 PluginError::PluginExecutionError(msg) => assert_eq!(msg, "test"),
568 _ => panic!("Expected PluginExecutionError variant"),
569 }
570 }
571
572 #[test]
573 fn test_derive_handler_message_with_message() {
574 let result = derive_handler_message("Custom error message", None);
575 assert_eq!(result, "Custom error message");
576 }
577
578 #[test]
579 fn test_derive_handler_message_with_error_log() {
580 let logs = vec![
581 LogEntry {
582 level: LogLevel::Log,
583 message: "info log".to_string(),
584 },
585 LogEntry {
586 level: LogLevel::Error,
587 message: "error log".to_string(),
588 },
589 ];
590 let result = derive_handler_message("", Some(&logs));
591 assert_eq!(result, "error log");
592 }
593
594 #[test]
595 fn test_derive_handler_message_with_warn_log() {
596 let logs = vec![
597 LogEntry {
598 level: LogLevel::Log,
599 message: "info log".to_string(),
600 },
601 LogEntry {
602 level: LogLevel::Warn,
603 message: "warn log".to_string(),
604 },
605 ];
606 let result = derive_handler_message("", Some(&logs));
607 assert_eq!(result, "warn log");
608 }
609
610 #[test]
611 fn test_derive_handler_message_with_only_info_logs() {
612 let logs = vec![
613 LogEntry {
614 level: LogLevel::Log,
615 message: "first log".to_string(),
616 },
617 LogEntry {
618 level: LogLevel::Info,
619 message: "last log".to_string(),
620 },
621 ];
622 let result = derive_handler_message("", Some(&logs));
623 assert_eq!(result, "last log");
624 }
625
626 #[test]
627 fn test_derive_handler_message_no_logs() {
628 let result = derive_handler_message("", None);
629 assert_eq!(result, "Plugin execution failed");
630 }
631
632 #[test]
633 fn test_build_metadata_with_logs_and_traces() {
634 let logs = vec![LogEntry {
635 level: LogLevel::Log,
636 message: "test".to_string(),
637 }];
638 let traces = vec![serde_json::json!({"trace": "1"})];
639
640 let result = build_metadata(Some(logs.clone()), Some(traces.clone()));
641
642 assert!(result.is_some());
643 let metadata = result.unwrap();
644 assert_eq!(metadata.logs.unwrap(), logs);
645 assert_eq!(metadata.traces.unwrap(), traces);
646 }
647
648 #[test]
649 fn test_build_metadata_with_only_logs() {
650 let logs = vec![LogEntry {
651 level: LogLevel::Log,
652 message: "test".to_string(),
653 }];
654
655 let result = build_metadata(Some(logs.clone()), None);
656
657 assert!(result.is_some());
658 let metadata = result.unwrap();
659 assert_eq!(metadata.logs.unwrap(), logs);
660 assert!(metadata.traces.is_none());
661 }
662
663 #[test]
664 fn test_build_metadata_with_only_traces() {
665 let traces = vec![serde_json::json!({"trace": "1"})];
666
667 let result = build_metadata(None, Some(traces.clone()));
668
669 assert!(result.is_some());
670 let metadata = result.unwrap();
671 assert!(metadata.logs.is_none());
672 assert_eq!(metadata.traces.unwrap(), traces);
673 }
674
675 #[test]
676 fn test_build_metadata_with_neither() {
677 let result = build_metadata(None, None);
678 assert!(result.is_none());
679 }
680
681 #[test]
682 fn test_plugin_handler_payload_append_traces_to_existing() {
683 let mut payload = PluginHandlerPayload {
684 status: 400,
685 message: "test".to_string(),
686 code: None,
687 details: None,
688 logs: None,
689 traces: Some(vec![serde_json::json!({"trace": "1"})]),
690 };
691
692 payload.append_traces(vec![serde_json::json!({"trace": "2"})]);
693
694 let traces = payload.traces.unwrap();
695 assert_eq!(traces.len(), 2);
696 assert_eq!(traces[0], serde_json::json!({"trace": "1"}));
697 assert_eq!(traces[1], serde_json::json!({"trace": "2"}));
698 }
699
700 #[test]
701 fn test_plugin_handler_payload_append_traces_to_none() {
702 let mut payload = PluginHandlerPayload {
703 status: 400,
704 message: "test".to_string(),
705 code: None,
706 details: None,
707 logs: None,
708 traces: None,
709 };
710
711 payload.append_traces(vec![serde_json::json!({"trace": "1"})]);
712
713 let traces = payload.traces.unwrap();
714 assert_eq!(traces.len(), 1);
715 assert_eq!(traces[0], serde_json::json!({"trace": "1"}));
716 }
717
718 #[test]
719 fn test_plugin_handler_payload_into_response_with_logs_and_traces() {
720 let logs = vec![LogEntry {
721 level: LogLevel::Error,
722 message: "error message".to_string(),
723 }];
724 let payload = PluginHandlerPayload {
725 status: 400,
726 message: "".to_string(),
727 code: Some("ERR_CODE".to_string()),
728 details: Some(serde_json::json!({"key": "value"})),
729 logs: Some(logs.clone()),
730 traces: Some(vec![serde_json::json!({"trace": "1"})]),
731 };
732
733 let response = payload.into_response(true, true);
734
735 assert_eq!(response.status, 400);
736 assert_eq!(response.message, "error message"); assert_eq!(response.error.code, Some("ERR_CODE".to_string()));
738 assert!(response.metadata.is_some());
739 let metadata = response.metadata.unwrap();
740 assert_eq!(metadata.logs.unwrap(), logs);
741 assert_eq!(metadata.traces.unwrap().len(), 1);
742 }
743
744 #[test]
745 fn test_plugin_handler_payload_into_response_without_logs() {
746 let logs = vec![LogEntry {
747 level: LogLevel::Log,
748 message: "test log".to_string(),
749 }];
750 let payload = PluginHandlerPayload {
751 status: 500,
752 message: "explicit message".to_string(),
753 code: None,
754 details: None,
755 logs: Some(logs),
756 traces: None,
757 };
758
759 let response = payload.into_response(false, false);
760
761 assert_eq!(response.status, 500);
762 assert_eq!(response.message, "explicit message");
763 assert!(response.metadata.is_none()); }
765
766 #[tokio::test]
767 async fn test_call_plugin_handler_error() {
768 let plugin = PluginModel {
769 id: "test-plugin".to_string(),
770 path: "test-path".to_string(),
771 timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
772 emit_logs: true,
773 emit_traces: true,
774 raw_response: false,
775 allow_get_invocation: false,
776 config: None,
777 forward_logs: false,
778 };
779 let app_state =
780 create_mock_app_state(None, None, None, None, Some(vec![plugin.clone()]), None).await;
781
782 let mut plugin_runner = MockPluginRunnerTrait::default();
783
784 plugin_runner
785 .expect_run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>()
786 .returning(move |_, _, _, _, _, _, _, _, _, _, _, _, _| {
787 Err(PluginError::HandlerError(Box::new(PluginHandlerPayload {
788 status: 400,
789 message: "Plugin handler error".to_string(),
790 code: Some("VALIDATION_ERROR".to_string()),
791 details: Some(serde_json::json!({"field": "email"})),
792 logs: Some(vec![LogEntry {
793 level: LogLevel::Error,
794 message: "Invalid email".to_string(),
795 }]),
796 traces: Some(vec![serde_json::json!({"step": "validation"})]),
797 })))
798 });
799
800 let plugin_service = PluginService::<MockPluginRunnerTrait>::new(plugin_runner);
801 let outcome = plugin_service
802 .call_plugin(
803 plugin,
804 PluginCallRequest {
805 params: serde_json::Value::Null,
806 headers: None,
807 route: None,
808 method: Some("POST".to_string()),
809 query: None,
810 },
811 Arc::new(web::ThinData(app_state)),
812 )
813 .await;
814
815 match outcome {
816 PluginCallResult::Handler(response) => {
817 assert_eq!(response.status, 400);
818 assert_eq!(response.error.code, Some("VALIDATION_ERROR".to_string()));
819 assert!(response.metadata.is_some());
820 let metadata = response.metadata.unwrap();
821 assert!(metadata.logs.is_some());
822 assert!(metadata.traces.is_some());
823 }
824 _ => panic!("Expected Handler result"),
825 }
826 }
827
828 #[tokio::test]
829 async fn test_call_plugin_fatal_error() {
830 let plugin = PluginModel {
831 id: "test-plugin".to_string(),
832 path: "test-path".to_string(),
833 timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
834 emit_logs: false,
835 emit_traces: false,
836 raw_response: false,
837 allow_get_invocation: false,
838 config: None,
839 forward_logs: false,
840 };
841 let app_state =
842 create_mock_app_state(None, None, None, None, Some(vec![plugin.clone()]), None).await;
843
844 let mut plugin_runner = MockPluginRunnerTrait::default();
845
846 plugin_runner
847 .expect_run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>()
848 .returning(|_, _, _, _, _, _, _, _, _, _, _, _, _| {
849 Err(PluginError::PluginExecutionError("Fatal error".to_string()))
850 });
851
852 let plugin_service = PluginService::<MockPluginRunnerTrait>::new(plugin_runner);
853 let outcome = plugin_service
854 .call_plugin(
855 plugin,
856 PluginCallRequest {
857 params: serde_json::Value::Null,
858 headers: None,
859 route: None,
860 method: Some("POST".to_string()),
861 query: None,
862 },
863 Arc::new(web::ThinData(app_state)),
864 )
865 .await;
866
867 match outcome {
868 PluginCallResult::Fatal(error) => {
869 assert!(matches!(error, PluginError::PluginExecutionError(_)));
870 }
871 _ => panic!("Expected Fatal result"),
872 }
873 }
874
875 #[tokio::test]
876 async fn test_call_plugin_success_with_json_result() {
877 let plugin = PluginModel {
878 id: "test-plugin".to_string(),
879 path: "test-path".to_string(),
880 timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
881 emit_logs: true,
882 emit_traces: true,
883 raw_response: false,
884 allow_get_invocation: false,
885 config: None,
886 forward_logs: false,
887 };
888 let app_state =
889 create_mock_app_state(None, None, None, None, Some(vec![plugin.clone()]), None).await;
890
891 let mut plugin_runner = MockPluginRunnerTrait::default();
892
893 plugin_runner
894 .expect_run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>()
895 .returning(|_, _, _, _, _, _, _, _, _, _, _, _, _| {
896 Ok(ScriptResult {
897 logs: vec![LogEntry {
898 level: LogLevel::Log,
899 message: "test-log".to_string(),
900 }],
901 error: "".to_string(),
902 return_value: r#"{"result": "success"}"#.to_string(),
903 trace: vec![serde_json::json!({"step": "1"})],
904 })
905 });
906
907 let plugin_service = PluginService::<MockPluginRunnerTrait>::new(plugin_runner);
908 let outcome = plugin_service
909 .call_plugin(
910 plugin,
911 PluginCallRequest {
912 params: serde_json::Value::Null,
913 headers: None,
914 route: None,
915 method: Some("POST".to_string()),
916 query: None,
917 },
918 Arc::new(web::ThinData(app_state)),
919 )
920 .await;
921
922 match outcome {
923 PluginCallResult::Success(result) => {
924 assert_eq!(result.result, serde_json::json!({"result": "success"}));
926 assert!(result.metadata.is_some());
927 let metadata = result.metadata.unwrap();
928 assert!(metadata.logs.is_some());
929 assert!(metadata.traces.is_some());
930 }
931 _ => panic!("Expected Success result"),
932 }
933 }
934
935 #[derive(Clone)]
936 struct VecWriter {
937 buffer: std::sync::Arc<std::sync::Mutex<Vec<String>>>,
938 }
939
940 impl Write for VecWriter {
941 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
942 let mut buffer = self.buffer.lock().unwrap();
943 buffer.push(String::from_utf8_lossy(buf).to_string());
944 Ok(buf.len())
945 }
946
947 fn flush(&mut self) -> std::io::Result<()> {
948 Ok(())
949 }
950 }
951
952 fn init_capturing_subscriber(
953 buffer: std::sync::Arc<std::sync::Mutex<Vec<String>>>,
954 ) -> tracing::subscriber::DefaultGuard {
955 use tracing_subscriber::filter::LevelFilter;
956 let writer = VecWriter { buffer };
957 let subscriber = tracing_subscriber::fmt()
958 .with_writer(move || writer.clone())
959 .with_ansi(false)
960 .with_target(true)
961 .without_time()
962 .with_max_level(LevelFilter::DEBUG)
963 .finish();
964 tracing::subscriber::set_default(subscriber)
965 }
966
967 #[tokio::test]
968 async fn test_forward_logs_to_tracing_when_enabled_all_levels() {
969 use std::sync::{Arc as StdArc, Mutex};
970
971 let logs_buffer: StdArc<Mutex<Vec<String>>> = StdArc::new(Mutex::new(Vec::new()));
972 let _guard = init_capturing_subscriber(logs_buffer.clone());
973
974 let plugin = PluginModel {
975 id: "test-plugin-levels".to_string(),
976 path: "test-path".to_string(),
977 timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
978 emit_logs: false,
979 emit_traces: false,
980 forward_logs: true,
981 raw_response: false,
982 allow_get_invocation: false,
983 config: None,
984 };
985 let app_state =
986 create_mock_app_state(None, None, None, None, Some(vec![plugin.clone()]), None).await;
987
988 let mut plugin_runner = MockPluginRunnerTrait::default();
989
990 plugin_runner
991 .expect_run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>()
992 .returning(|_, _, _, _, _, _, _, _, _, _, _, _, _| {
993 Ok(ScriptResult {
994 logs: vec![
995 LogEntry {
996 level: LogLevel::Error,
997 message: "err-log".to_string(),
998 },
999 LogEntry {
1000 level: LogLevel::Warn,
1001 message: "warn-log".to_string(),
1002 },
1003 LogEntry {
1004 level: LogLevel::Info,
1005 message: "info-log".to_string(),
1006 },
1007 LogEntry {
1008 level: LogLevel::Log,
1009 message: "log-log".to_string(),
1010 },
1011 LogEntry {
1012 level: LogLevel::Debug,
1013 message: "debug-log".to_string(),
1014 },
1015 LogEntry {
1016 level: LogLevel::Result,
1017 message: "result-log".to_string(),
1018 },
1019 ],
1020 error: "".to_string(),
1021 return_value: "{}".to_string(),
1022 trace: vec![],
1023 })
1024 });
1025
1026 let plugin_service = PluginService::<MockPluginRunnerTrait>::new(plugin_runner);
1027 let _ = plugin_service
1028 .call_plugin(
1029 plugin,
1030 PluginCallRequest {
1031 params: serde_json::json!({}),
1032 headers: None,
1033 route: None,
1034 method: Some("POST".to_string()),
1035 query: None,
1036 },
1037 Arc::new(web::ThinData(app_state)),
1038 )
1039 .await;
1040
1041 let captured = logs_buffer.lock().unwrap().join("\n");
1042
1043 assert!(captured.contains("err-log"));
1044 assert!(captured.contains("warn-log"));
1045 assert!(captured.contains("info-log"));
1046 assert!(captured.contains("log-log"));
1047 assert!(captured.contains("debug-log"));
1048 assert!(!captured.contains("result-log"));
1049 assert!(captured.contains("plugin_id=test-plugin-levels"));
1050 assert!(captured.contains("ERROR"));
1051 assert!(captured.contains("WARN"));
1052 let request_id_values: Vec<&str> = captured
1053 .match_indices("request_id=")
1054 .filter_map(|(idx, _)| {
1055 let tail = &captured[idx + "request_id=".len()..];
1056 tail.split_whitespace()
1057 .next()
1058 .map(|value| value.trim_matches(|c: char| c == ',' || c == '"' || c == '}'))
1059 })
1060 .collect();
1061 assert!(
1062 !request_id_values.is_empty(),
1063 "expected forwarded plugin logs to include request_id field, captured: {}",
1064 captured
1065 );
1066 assert!(
1067 request_id_values.iter().all(|value| !value.is_empty()),
1068 "expected non-empty request_id values, captured: {}",
1069 captured
1070 );
1071 assert!(
1072 request_id_values
1073 .iter()
1074 .all(|value| uuid::Uuid::parse_str(value).is_ok()),
1075 "expected request_id fallback to be UUID when no span request id is set, captured: {}",
1076 captured
1077 );
1078 }
1079
1080 #[tokio::test]
1081 async fn test_forward_logs_not_emitted_when_disabled() {
1082 use std::sync::{Arc as StdArc, Mutex};
1083
1084 let logs_buffer: StdArc<Mutex<Vec<String>>> = StdArc::new(Mutex::new(Vec::new()));
1085 let _guard = init_capturing_subscriber(logs_buffer.clone());
1086
1087 let plugin = PluginModel {
1088 id: "test-plugin-disabled".to_string(),
1089 path: "test-path".to_string(),
1090 timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
1091 emit_logs: false,
1092 emit_traces: false,
1093 forward_logs: false,
1094 raw_response: false,
1095 allow_get_invocation: false,
1096 config: None,
1097 };
1098 let app_state =
1099 create_mock_app_state(None, None, None, None, Some(vec![plugin.clone()]), None).await;
1100
1101 let mut plugin_runner = MockPluginRunnerTrait::default();
1102 plugin_runner
1103 .expect_run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>()
1104 .returning(|_, _, _, _, _, _, _, _, _, _, _, _, _| {
1105 Ok(ScriptResult {
1106 logs: vec![LogEntry {
1107 level: LogLevel::Warn,
1108 message: "should-not-emit".to_string(),
1109 }],
1110 error: "".to_string(),
1111 return_value: "{}".to_string(),
1112 trace: vec![],
1113 })
1114 });
1115
1116 let plugin_service = PluginService::<MockPluginRunnerTrait>::new(plugin_runner);
1117 let _ = plugin_service
1118 .call_plugin(
1119 plugin,
1120 PluginCallRequest {
1121 params: serde_json::json!({}),
1122 headers: None,
1123 route: None,
1124 method: Some("POST".to_string()),
1125 query: None,
1126 },
1127 Arc::new(web::ThinData(app_state)),
1128 )
1129 .await;
1130
1131 let captured = logs_buffer.lock().unwrap().join("\n");
1132 assert!(
1135 !captured.contains("should-not-emit"),
1136 "plugin logs should not be forwarded when disabled, but found: {}",
1137 captured
1138 );
1139 }
1140
1141 #[tokio::test]
1142 async fn test_forward_logs_on_handler_error() {
1143 use std::sync::{Arc as StdArc, Mutex};
1144
1145 let logs_buffer: StdArc<Mutex<Vec<String>>> = StdArc::new(Mutex::new(Vec::new()));
1146 let _guard = init_capturing_subscriber(logs_buffer.clone());
1147
1148 let plugin = PluginModel {
1149 id: "test-plugin-error".to_string(),
1150 path: "test-path".to_string(),
1151 timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
1152 emit_logs: true,
1153 emit_traces: false,
1154 forward_logs: true,
1155 raw_response: false,
1156 allow_get_invocation: false,
1157 config: None,
1158 };
1159 let app_state =
1160 create_mock_app_state(None, None, None, None, Some(vec![plugin.clone()]), None).await;
1161
1162 let mut plugin_runner = MockPluginRunnerTrait::default();
1163 plugin_runner
1164 .expect_run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>()
1165 .returning(|_, _, _, _, _, _, _, _, _, _, _, _, _| {
1166 Err(PluginError::HandlerError(Box::new(PluginHandlerPayload {
1167 status: 400,
1168 message: "handler failed".to_string(),
1169 code: None,
1170 details: None,
1171 logs: Some(vec![LogEntry {
1172 level: LogLevel::Error,
1173 message: "handler-log".to_string(),
1174 }]),
1175 traces: None,
1176 })))
1177 });
1178
1179 let plugin_service = PluginService::<MockPluginRunnerTrait>::new(plugin_runner);
1180 let _ = plugin_service
1181 .call_plugin(
1182 plugin,
1183 PluginCallRequest {
1184 params: serde_json::json!({}),
1185 headers: None,
1186 route: None,
1187 method: Some("POST".to_string()),
1188 query: None,
1189 },
1190 Arc::new(web::ThinData(app_state)),
1191 )
1192 .await;
1193
1194 let captured = logs_buffer.lock().unwrap().join("\n");
1195 assert!(captured.contains("handler-log"));
1196 assert!(captured.contains("plugin_id=test-plugin-error"));
1197 assert!(captured.contains("ERROR"));
1198 }
1199
1200 #[tokio::test]
1201 async fn test_call_plugin_success_with_undefined_result() {
1202 let plugin = PluginModel {
1203 id: "test-plugin".to_string(),
1204 path: "test-path".to_string(),
1205 timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
1206 emit_logs: false,
1207 emit_traces: false,
1208 raw_response: false,
1209 allow_get_invocation: false,
1210 config: None,
1211 forward_logs: false,
1212 };
1213 let app_state =
1214 create_mock_app_state(None, None, None, None, Some(vec![plugin.clone()]), None).await;
1215
1216 let mut plugin_runner = MockPluginRunnerTrait::default();
1217
1218 plugin_runner
1219 .expect_run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>()
1220 .returning(|_, _, _, _, _, _, _, _, _, _, _, _, _| {
1221 Ok(ScriptResult {
1222 logs: vec![],
1223 error: "".to_string(),
1224 return_value: "undefined".to_string(),
1225 trace: vec![],
1226 })
1227 });
1228
1229 let plugin_service = PluginService::<MockPluginRunnerTrait>::new(plugin_runner);
1230 let outcome = plugin_service
1231 .call_plugin(
1232 plugin,
1233 PluginCallRequest {
1234 params: serde_json::Value::Null,
1235 headers: None,
1236 route: None,
1237 method: Some("POST".to_string()),
1238 query: None,
1239 },
1240 Arc::new(web::ThinData(app_state)),
1241 )
1242 .await;
1243
1244 match outcome {
1245 PluginCallResult::Success(result) => {
1246 assert_eq!(result.result, serde_json::Value::Null);
1248 assert!(result.metadata.is_none());
1250 }
1251 _ => panic!("Expected Success result"),
1252 }
1253 }
1254
1255 #[tokio::test]
1256 async fn test_call_plugin_with_headers() {
1257 use std::sync::{Arc as StdArc, Mutex};
1258
1259 let plugin = PluginModel {
1260 id: "test-plugin".to_string(),
1261 path: "test-path".to_string(),
1262 timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
1263 emit_logs: false,
1264 emit_traces: false,
1265 raw_response: false,
1266 allow_get_invocation: false,
1267 config: None,
1268 forward_logs: false,
1269 };
1270 let app_state =
1271 create_mock_app_state(None, None, None, None, Some(vec![plugin.clone()]), None).await;
1272
1273 let captured_headers: StdArc<Mutex<Option<String>>> = StdArc::new(Mutex::new(None));
1275 let captured_headers_clone = captured_headers.clone();
1276
1277 let mut plugin_runner = MockPluginRunnerTrait::default();
1278
1279 plugin_runner
1280 .expect_run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>()
1281 .returning(move |_, _, _, _, _, _, headers_json, _, _, _, _, _, _| {
1282 *captured_headers_clone.lock().unwrap() = headers_json;
1284 Ok(ScriptResult {
1285 logs: vec![],
1286 error: "".to_string(),
1287 return_value: "{}".to_string(),
1288 trace: vec![],
1289 })
1290 });
1291
1292 let mut headers_map = std::collections::HashMap::new();
1294 headers_map.insert(
1295 "x-custom-header".to_string(),
1296 vec!["custom-value".to_string()],
1297 );
1298 headers_map.insert(
1299 "authorization".to_string(),
1300 vec!["Bearer token123".to_string()],
1301 );
1302
1303 let plugin_service = PluginService::<MockPluginRunnerTrait>::new(plugin_runner);
1304 let _outcome = plugin_service
1305 .call_plugin(
1306 plugin,
1307 PluginCallRequest {
1308 params: serde_json::json!({"test": "data"}),
1309 headers: Some(headers_map.clone()),
1310 route: None,
1311 method: Some("POST".to_string()),
1312 query: None,
1313 },
1314 Arc::new(web::ThinData(app_state)),
1315 )
1316 .await;
1317
1318 let captured = captured_headers.lock().unwrap();
1320 assert!(
1321 captured.is_some(),
1322 "headers_json should be passed to runner"
1323 );
1324
1325 let headers_json = captured.as_ref().unwrap();
1326 let parsed: std::collections::HashMap<String, Vec<String>> =
1327 serde_json::from_str(headers_json).expect("headers_json should be valid JSON");
1328
1329 assert_eq!(
1330 parsed.get("x-custom-header"),
1331 Some(&vec!["custom-value".to_string()])
1332 );
1333 assert_eq!(
1334 parsed.get("authorization"),
1335 Some(&vec!["Bearer token123".to_string()])
1336 );
1337 }
1338
1339 #[tokio::test]
1340 async fn test_call_plugin_without_headers() {
1341 use std::sync::{Arc as StdArc, Mutex};
1342
1343 let plugin = PluginModel {
1344 id: "test-plugin".to_string(),
1345 path: "test-path".to_string(),
1346 timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
1347 emit_logs: false,
1348 emit_traces: false,
1349 raw_response: false,
1350 allow_get_invocation: false,
1351 config: None,
1352 forward_logs: false,
1353 };
1354 let app_state =
1355 create_mock_app_state(None, None, None, None, Some(vec![plugin.clone()]), None).await;
1356
1357 let captured_headers: StdArc<Mutex<Option<String>>> = StdArc::new(Mutex::new(None));
1358 let captured_headers_clone = captured_headers.clone();
1359
1360 let mut plugin_runner = MockPluginRunnerTrait::default();
1361
1362 plugin_runner
1363 .expect_run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>()
1364 .returning(move |_, _, _, _, _, _, headers_json, _, _, _, _, _, _| {
1365 *captured_headers_clone.lock().unwrap() = headers_json;
1366 Ok(ScriptResult {
1367 logs: vec![],
1368 error: "".to_string(),
1369 return_value: "{}".to_string(),
1370 trace: vec![],
1371 })
1372 });
1373
1374 let plugin_service = PluginService::<MockPluginRunnerTrait>::new(plugin_runner);
1375 let _outcome = plugin_service
1376 .call_plugin(
1377 plugin,
1378 PluginCallRequest {
1379 params: serde_json::json!({}),
1380 headers: None, route: None,
1382 method: Some("POST".to_string()),
1383 query: None,
1384 },
1385 Arc::new(web::ThinData(app_state)),
1386 )
1387 .await;
1388
1389 let captured = captured_headers.lock().unwrap();
1391 assert!(
1392 captured.is_none(),
1393 "headers_json should be None when no headers provided"
1394 );
1395 }
1396}