openzeppelin_relayer/jobs/
job_producer.rs

1//! Job producer module for enqueueing jobs to Redis queues.
2//!
3//! Provides functionality for producing various types of jobs:
4//! - Transaction processing jobs
5//! - Transaction submission jobs
6//! - Status monitoring jobs
7//! - Notification jobs
8
9use crate::{
10    jobs::{
11        Job, NotificationSend, Queue, RelayerHealthCheck, TransactionRequest, TransactionSend,
12        TransactionStatusCheck,
13    },
14    models::RelayerError,
15    observability::request_id::get_request_id,
16};
17use apalis::prelude::Storage;
18use apalis_redis::RedisError;
19use async_trait::async_trait;
20use serde::Serialize;
21use thiserror::Error;
22use tracing::{debug, error};
23
24use super::{JobType, TokenSwapRequest};
25
26#[cfg(test)]
27use mockall::automock;
28
29#[derive(Debug, Error, Serialize, Clone)]
30pub enum JobProducerError {
31    #[error("Queue error: {0}")]
32    QueueError(String),
33}
34
35impl From<RedisError> for JobProducerError {
36    fn from(err: RedisError) -> Self {
37        error!(error = %err, "Redis error during job production");
38        JobProducerError::QueueError(err.to_string())
39    }
40}
41
42impl From<JobProducerError> for RelayerError {
43    fn from(err: JobProducerError) -> Self {
44        RelayerError::QueueError(err.to_string())
45    }
46}
47
48/// Job producer that enqueues jobs to Redis-backed queues.
49///
50#[derive(Debug, Clone)]
51pub struct JobProducer {
52    queue: Queue,
53}
54
55#[async_trait]
56#[cfg_attr(test, automock)]
57pub trait JobProducerTrait: Send + Sync {
58    async fn produce_transaction_request_job(
59        &self,
60        transaction_process_job: TransactionRequest,
61        scheduled_on: Option<i64>,
62    ) -> Result<(), JobProducerError>;
63
64    async fn produce_submit_transaction_job(
65        &self,
66        transaction_submit_job: TransactionSend,
67        scheduled_on: Option<i64>,
68    ) -> Result<(), JobProducerError>;
69
70    async fn produce_check_transaction_status_job(
71        &self,
72        transaction_status_check_job: TransactionStatusCheck,
73        scheduled_on: Option<i64>,
74    ) -> Result<(), JobProducerError>;
75
76    async fn produce_send_notification_job(
77        &self,
78        notification_send_job: NotificationSend,
79        scheduled_on: Option<i64>,
80    ) -> Result<(), JobProducerError>;
81
82    async fn produce_token_swap_request_job(
83        &self,
84        swap_request_job: TokenSwapRequest,
85        scheduled_on: Option<i64>,
86    ) -> Result<(), JobProducerError>;
87
88    async fn produce_relayer_health_check_job(
89        &self,
90        relayer_health_check_job: RelayerHealthCheck,
91        scheduled_on: Option<i64>,
92    ) -> Result<(), JobProducerError>;
93
94    async fn get_queue(&self) -> Result<Queue, JobProducerError>;
95}
96
97impl JobProducer {
98    pub fn new(queue: Queue) -> Self {
99        Self { queue }
100    }
101}
102
103#[async_trait]
104impl JobProducerTrait for JobProducer {
105    async fn get_queue(&self) -> Result<Queue, JobProducerError> {
106        Ok(self.queue.clone())
107    }
108
109    async fn produce_transaction_request_job(
110        &self,
111        transaction_process_job: TransactionRequest,
112        scheduled_on: Option<i64>,
113    ) -> Result<(), JobProducerError> {
114        debug!(
115            "Producing transaction request job: {:?}",
116            transaction_process_job
117        );
118        // Clone the specific storage - this is cheap (Arc clone internally)
119        let mut storage = self.queue.transaction_request_queue.clone();
120        let job = Job::new(JobType::TransactionRequest, transaction_process_job)
121            .with_request_id(get_request_id());
122        let job_id = job.message_id.clone();
123        let request_id = job.request_id.clone();
124        let tx_id = job.data.transaction_id.clone();
125        let relayer_id = job.data.relayer_id.clone();
126
127        match scheduled_on {
128            Some(scheduled_on) => {
129                storage.schedule(job, scheduled_on).await?;
130            }
131            None => {
132                storage.push(job).await?;
133            }
134        }
135        debug!(
136            job_type = %JobType::TransactionRequest,
137            job_id = %job_id,
138            request_id = ?request_id,
139            tx_id = %tx_id,
140            relayer_id = %relayer_id,
141            scheduled_on = ?scheduled_on,
142            "transaction request job produced"
143        );
144
145        Ok(())
146    }
147
148    async fn produce_submit_transaction_job(
149        &self,
150        transaction_submit_job: TransactionSend,
151        scheduled_on: Option<i64>,
152    ) -> Result<(), JobProducerError> {
153        let mut storage = self.queue.transaction_submission_queue.clone();
154        let job = Job::new(JobType::TransactionSend, transaction_submit_job)
155            .with_request_id(get_request_id());
156        let job_id = job.message_id.clone();
157        let request_id = job.request_id.clone();
158        let tx_id = job.data.transaction_id.clone();
159        let relayer_id = job.data.relayer_id.clone();
160        let command = job.data.command.clone();
161
162        match scheduled_on {
163            Some(on) => {
164                storage.schedule(job, on).await?;
165            }
166            None => {
167                storage.push(job).await?;
168            }
169        }
170        debug!(
171            job_type = %JobType::TransactionSend,
172            job_id = %job_id,
173            request_id = ?request_id,
174            tx_id = %tx_id,
175            relayer_id = %relayer_id,
176            command = ?command,
177            scheduled_on = ?scheduled_on,
178            "transaction submission job produced"
179        );
180
181        Ok(())
182    }
183
184    async fn produce_check_transaction_status_job(
185        &self,
186        transaction_status_check_job: TransactionStatusCheck,
187        scheduled_on: Option<i64>,
188    ) -> Result<(), JobProducerError> {
189        let job = Job::new(
190            JobType::TransactionStatusCheck,
191            transaction_status_check_job.clone(),
192        )
193        .with_request_id(get_request_id());
194        let job_id = job.message_id.clone();
195        let request_id = job.request_id.clone();
196        let tx_id = job.data.transaction_id.clone();
197        let relayer_id = job.data.relayer_id.clone();
198
199        // Route to the appropriate queue based on network type
200        // Clone the specific storage to avoid lock contention
201        use crate::models::NetworkType;
202        let mut storage = match transaction_status_check_job.network_type {
203            Some(NetworkType::Evm) => self.queue.transaction_status_queue_evm.clone(),
204            Some(NetworkType::Stellar) => self.queue.transaction_status_queue_stellar.clone(),
205            _ => self.queue.transaction_status_queue.clone(), // Generic queue or legacy messages without network_type
206        };
207
208        match scheduled_on {
209            Some(on) => {
210                storage.schedule(job, on).await?;
211            }
212            None => {
213                storage.push(job).await?;
214            }
215        }
216        debug!(
217            job_type = %JobType::TransactionStatusCheck,
218            job_id = %job_id,
219            request_id = ?request_id,
220            tx_id = %tx_id,
221            relayer_id = %relayer_id,
222            network_type = ?transaction_status_check_job.network_type,
223            scheduled_on = ?scheduled_on,
224            "Transaction Status Check job produced successfully"
225        );
226        Ok(())
227    }
228
229    async fn produce_send_notification_job(
230        &self,
231        notification_send_job: NotificationSend,
232        scheduled_on: Option<i64>,
233    ) -> Result<(), JobProducerError> {
234        let mut storage = self.queue.notification_queue.clone();
235        let job = Job::new(JobType::NotificationSend, notification_send_job)
236            .with_request_id(get_request_id());
237        let job_id = job.message_id.clone();
238        let request_id = job.request_id.clone();
239        let notification_id = job.data.notification_id.clone();
240
241        match scheduled_on {
242            Some(on) => {
243                storage.schedule(job, on).await?;
244            }
245            None => {
246                storage.push(job).await?;
247            }
248        }
249
250        debug!(
251            job_type = %JobType::NotificationSend,
252            job_id = %job_id,
253            request_id = ?request_id,
254            notification_id = %notification_id,
255            scheduled_on = ?scheduled_on,
256            "notification send job produced"
257        );
258        Ok(())
259    }
260
261    async fn produce_token_swap_request_job(
262        &self,
263        swap_request_job: TokenSwapRequest,
264        scheduled_on: Option<i64>,
265    ) -> Result<(), JobProducerError> {
266        let mut storage = self.queue.token_swap_request_queue.clone();
267        let job =
268            Job::new(JobType::TokenSwapRequest, swap_request_job).with_request_id(get_request_id());
269        let job_id = job.message_id.clone();
270        let request_id = job.request_id.clone();
271        let relayer_id = job.data.relayer_id.clone();
272
273        match scheduled_on {
274            Some(on) => {
275                storage.schedule(job, on).await?;
276            }
277            None => {
278                storage.push(job).await?;
279            }
280        }
281
282        debug!(
283            job_type = %JobType::TokenSwapRequest,
284            job_id = %job_id,
285            request_id = ?request_id,
286            relayer_id = %relayer_id,
287            scheduled_on = ?scheduled_on,
288            "token swap job produced"
289        );
290        Ok(())
291    }
292
293    async fn produce_relayer_health_check_job(
294        &self,
295        relayer_health_check_job: RelayerHealthCheck,
296        scheduled_on: Option<i64>,
297    ) -> Result<(), JobProducerError> {
298        let mut storage = self.queue.relayer_health_check_queue.clone();
299        let job = Job::new(
300            JobType::RelayerHealthCheck,
301            relayer_health_check_job.clone(),
302        )
303        .with_request_id(get_request_id());
304        let job_id = job.message_id.clone();
305        let request_id = job.request_id.clone();
306        let relayer_id = job.data.relayer_id.clone();
307
308        match scheduled_on {
309            Some(scheduled_on) => {
310                storage.schedule(job, scheduled_on).await?;
311            }
312            None => {
313                storage.push(job).await?;
314            }
315        }
316
317        debug!(
318            job_type = %JobType::RelayerHealthCheck,
319            job_id = %job_id,
320            request_id = ?request_id,
321            relayer_id = %relayer_id,
322            scheduled_on = ?scheduled_on,
323            "relayer health check job produced"
324        );
325        Ok(())
326    }
327}
328
329#[cfg(test)]
330mod tests {
331    use super::*;
332    use crate::models::{
333        EvmTransactionResponse, TransactionResponse, TransactionStatus, WebhookNotification,
334        WebhookPayload, U256,
335    };
336    use crate::utils::calculate_scheduled_timestamp;
337    use tokio::sync::Mutex;
338
339    #[derive(Clone, Debug)]
340    // Define a simplified queue for testing without using complex mocks
341    struct TestRedisStorage<T> {
342        pub push_called: bool,
343        pub schedule_called: bool,
344        _phantom: std::marker::PhantomData<T>,
345    }
346
347    impl<T> TestRedisStorage<T> {
348        fn new() -> Self {
349            Self {
350                push_called: false,
351                schedule_called: false,
352                _phantom: std::marker::PhantomData,
353            }
354        }
355
356        async fn push(&mut self, _job: T) -> Result<(), JobProducerError> {
357            self.push_called = true;
358            Ok(())
359        }
360
361        async fn schedule(&mut self, _job: T, _timestamp: i64) -> Result<(), JobProducerError> {
362            self.schedule_called = true;
363            Ok(())
364        }
365    }
366
367    // A test version of the Queue
368    #[derive(Clone, Debug)]
369    struct TestQueue {
370        pub transaction_request_queue: TestRedisStorage<Job<TransactionRequest>>,
371        pub transaction_submission_queue: TestRedisStorage<Job<TransactionSend>>,
372        pub transaction_status_queue: TestRedisStorage<Job<TransactionStatusCheck>>,
373        pub transaction_status_queue_evm: TestRedisStorage<Job<TransactionStatusCheck>>,
374        pub transaction_status_queue_stellar: TestRedisStorage<Job<TransactionStatusCheck>>,
375        pub notification_queue: TestRedisStorage<Job<NotificationSend>>,
376        pub token_swap_request_queue: TestRedisStorage<Job<TokenSwapRequest>>,
377        pub relayer_health_check_queue: TestRedisStorage<Job<RelayerHealthCheck>>,
378    }
379
380    impl TestQueue {
381        fn new() -> Self {
382            Self {
383                transaction_request_queue: TestRedisStorage::new(),
384                transaction_submission_queue: TestRedisStorage::new(),
385                transaction_status_queue: TestRedisStorage::new(),
386                transaction_status_queue_evm: TestRedisStorage::new(),
387                transaction_status_queue_stellar: TestRedisStorage::new(),
388                notification_queue: TestRedisStorage::new(),
389                token_swap_request_queue: TestRedisStorage::new(),
390                relayer_health_check_queue: TestRedisStorage::new(),
391            }
392        }
393    }
394
395    // A test version of JobProducer
396    struct TestJobProducer {
397        queue: Mutex<TestQueue>,
398    }
399
400    impl Clone for TestJobProducer {
401        fn clone(&self) -> Self {
402            let queue = self
403                .queue
404                .try_lock()
405                .expect("Failed to lock queue for cloning")
406                .clone();
407            Self {
408                queue: Mutex::new(queue),
409            }
410        }
411    }
412
413    impl TestJobProducer {
414        fn new() -> Self {
415            Self {
416                queue: Mutex::new(TestQueue::new()),
417            }
418        }
419
420        async fn get_queue(&self) -> TestQueue {
421            self.queue.lock().await.clone()
422        }
423    }
424
425    #[async_trait]
426    impl JobProducerTrait for TestJobProducer {
427        async fn get_queue(&self) -> Result<Queue, JobProducerError> {
428            unimplemented!("get_queue not used in tests")
429        }
430
431        async fn produce_transaction_request_job(
432            &self,
433            transaction_process_job: TransactionRequest,
434            scheduled_on: Option<i64>,
435        ) -> Result<(), JobProducerError> {
436            let mut queue = self.queue.lock().await;
437            let job = Job::new(JobType::TransactionRequest, transaction_process_job);
438
439            match scheduled_on {
440                Some(scheduled_on) => {
441                    queue
442                        .transaction_request_queue
443                        .schedule(job, scheduled_on)
444                        .await?;
445                }
446                None => {
447                    queue.transaction_request_queue.push(job).await?;
448                }
449            }
450
451            Ok(())
452        }
453
454        async fn produce_submit_transaction_job(
455            &self,
456            transaction_submit_job: TransactionSend,
457            scheduled_on: Option<i64>,
458        ) -> Result<(), JobProducerError> {
459            let mut queue = self.queue.lock().await;
460            let job = Job::new(JobType::TransactionSend, transaction_submit_job);
461
462            match scheduled_on {
463                Some(on) => {
464                    queue.transaction_submission_queue.schedule(job, on).await?;
465                }
466                None => {
467                    queue.transaction_submission_queue.push(job).await?;
468                }
469            }
470
471            Ok(())
472        }
473
474        async fn produce_check_transaction_status_job(
475            &self,
476            transaction_status_check_job: TransactionStatusCheck,
477            scheduled_on: Option<i64>,
478        ) -> Result<(), JobProducerError> {
479            let mut queue = self.queue.lock().await;
480            let job = Job::new(
481                JobType::TransactionStatusCheck,
482                transaction_status_check_job.clone(),
483            );
484
485            // Route to the appropriate queue based on network type
486            use crate::models::NetworkType;
487            let status_queue = match transaction_status_check_job.network_type {
488                Some(NetworkType::Evm) => &mut queue.transaction_status_queue_evm,
489                Some(NetworkType::Stellar) => &mut queue.transaction_status_queue_stellar,
490                Some(NetworkType::Solana) => &mut queue.transaction_status_queue, // Use default queue
491                None => &mut queue.transaction_status_queue, // Legacy messages without network_type
492            };
493
494            match scheduled_on {
495                Some(on) => {
496                    status_queue.schedule(job, on).await?;
497                }
498                None => {
499                    status_queue.push(job).await?;
500                }
501            }
502
503            Ok(())
504        }
505
506        async fn produce_send_notification_job(
507            &self,
508            notification_send_job: NotificationSend,
509            scheduled_on: Option<i64>,
510        ) -> Result<(), JobProducerError> {
511            let mut queue = self.queue.lock().await;
512            let job = Job::new(JobType::NotificationSend, notification_send_job);
513
514            match scheduled_on {
515                Some(on) => {
516                    queue.notification_queue.schedule(job, on).await?;
517                }
518                None => {
519                    queue.notification_queue.push(job).await?;
520                }
521            }
522
523            Ok(())
524        }
525
526        async fn produce_token_swap_request_job(
527            &self,
528            swap_request_job: TokenSwapRequest,
529            scheduled_on: Option<i64>,
530        ) -> Result<(), JobProducerError> {
531            let mut queue = self.queue.lock().await;
532            let job = Job::new(JobType::TokenSwapRequest, swap_request_job);
533
534            match scheduled_on {
535                Some(on) => {
536                    queue.token_swap_request_queue.schedule(job, on).await?;
537                }
538                None => {
539                    queue.token_swap_request_queue.push(job).await?;
540                }
541            }
542
543            Ok(())
544        }
545
546        async fn produce_relayer_health_check_job(
547            &self,
548            relayer_health_check_job: RelayerHealthCheck,
549            scheduled_on: Option<i64>,
550        ) -> Result<(), JobProducerError> {
551            let mut queue = self.queue.lock().await;
552            let job = Job::new(JobType::RelayerHealthCheck, relayer_health_check_job);
553
554            match scheduled_on {
555                Some(scheduled_on) => {
556                    queue
557                        .relayer_health_check_queue
558                        .schedule(job, scheduled_on)
559                        .await?;
560                }
561                None => {
562                    queue.relayer_health_check_queue.push(job).await?;
563                }
564            }
565
566            Ok(())
567        }
568    }
569
570    #[tokio::test]
571    async fn test_job_producer_operations() {
572        let producer = TestJobProducer::new();
573
574        // Test transaction request job
575        let request = TransactionRequest::new("tx123", "relayer-1");
576        let result = producer
577            .produce_transaction_request_job(request, None)
578            .await;
579        assert!(result.is_ok());
580
581        let queue = producer.get_queue().await;
582        assert!(queue.transaction_request_queue.push_called);
583
584        // Test scheduled job
585        let producer = TestJobProducer::new();
586        let request = TransactionRequest::new("tx123", "relayer-1");
587        let scheduled_timestamp = calculate_scheduled_timestamp(10); // Schedule for 10 seconds from now
588        let result = producer
589            .produce_transaction_request_job(request, Some(scheduled_timestamp))
590            .await;
591        assert!(result.is_ok());
592
593        let queue = producer.get_queue().await;
594        assert!(queue.transaction_request_queue.schedule_called);
595    }
596
597    #[tokio::test]
598    async fn test_submit_transaction_job() {
599        let producer = TestJobProducer::new();
600
601        // Test submit transaction job
602        let submit_job = TransactionSend::submit("tx123", "relayer-1");
603        let result = producer
604            .produce_submit_transaction_job(submit_job, None)
605            .await;
606        assert!(result.is_ok());
607
608        let queue = producer.get_queue().await;
609        assert!(queue.transaction_submission_queue.push_called);
610    }
611
612    #[tokio::test]
613    async fn test_check_status_job() {
614        use crate::models::NetworkType;
615        let producer = TestJobProducer::new();
616
617        // Test status check job for EVM
618        let status_job = TransactionStatusCheck::new("tx123", "relayer-1", NetworkType::Evm);
619        let result = producer
620            .produce_check_transaction_status_job(status_job, None)
621            .await;
622        assert!(result.is_ok());
623
624        let queue = producer.get_queue().await;
625        assert!(queue.transaction_status_queue_evm.push_called);
626    }
627
628    #[tokio::test]
629    async fn test_notification_job() {
630        let producer = TestJobProducer::new();
631
632        // Create a simple notification for testing
633        let notification = WebhookNotification::new(
634            "test_event".to_string(),
635            WebhookPayload::Transaction(TransactionResponse::Evm(Box::new(
636                EvmTransactionResponse {
637                    id: "tx123".to_string(),
638                    hash: Some("0x123".to_string()),
639                    status: TransactionStatus::Confirmed,
640                    status_reason: None,
641                    created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
642                    sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
643                    confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
644                    gas_price: Some(1000000000),
645                    gas_limit: Some(21000),
646                    nonce: Some(1),
647                    value: U256::from(1000000000000000000_u64),
648                    from: "0xabc".to_string(),
649                    to: Some("0xdef".to_string()),
650                    relayer_id: "relayer-1".to_string(),
651                    data: None,
652                    max_fee_per_gas: None,
653                    max_priority_fee_per_gas: None,
654                    signature: None,
655                    speed: None,
656                },
657            ))),
658        );
659        let job = NotificationSend::new("notification-1".to_string(), notification);
660
661        let result = producer.produce_send_notification_job(job, None).await;
662        assert!(result.is_ok());
663
664        let queue = producer.get_queue().await;
665        assert!(queue.notification_queue.push_called);
666    }
667
668    #[tokio::test]
669    async fn test_relayer_health_check_job() {
670        let producer = TestJobProducer::new();
671
672        // Test immediate health check job
673        let health_check = RelayerHealthCheck::new("relayer-1".to_string());
674        let result = producer
675            .produce_relayer_health_check_job(health_check, None)
676            .await;
677        assert!(result.is_ok());
678
679        let queue = producer.get_queue().await;
680        assert!(queue.relayer_health_check_queue.push_called);
681
682        // Test scheduled health check job
683        let producer = TestJobProducer::new();
684        let health_check = RelayerHealthCheck::new("relayer-1".to_string());
685        let scheduled_timestamp = calculate_scheduled_timestamp(60);
686        let result = producer
687            .produce_relayer_health_check_job(health_check, Some(scheduled_timestamp))
688            .await;
689        assert!(result.is_ok());
690
691        let queue = producer.get_queue().await;
692        assert!(queue.relayer_health_check_queue.schedule_called);
693    }
694
695    #[test]
696    fn test_job_producer_error_conversion() {
697        // Test error conversion preserves original error message
698        let job_error = JobProducerError::QueueError("Test error".to_string());
699        let relayer_error: RelayerError = job_error.into();
700
701        match relayer_error {
702            RelayerError::QueueError(msg) => {
703                assert_eq!(msg, "Queue error: Test error");
704            }
705            _ => panic!("Unexpected error type"),
706        }
707    }
708
709    #[tokio::test]
710    async fn test_get_queue() {
711        let producer = TestJobProducer::new();
712
713        // Get the queue
714        let queue = producer.get_queue().await;
715
716        // Verify the queue is valid and has the expected structure
717        assert!(!queue.transaction_request_queue.push_called);
718        assert!(!queue.transaction_request_queue.schedule_called);
719        assert!(!queue.transaction_submission_queue.push_called);
720        assert!(!queue.notification_queue.push_called);
721        assert!(!queue.token_swap_request_queue.push_called);
722        assert!(!queue.relayer_health_check_queue.push_called);
723    }
724
725    #[tokio::test]
726    async fn test_produce_relayer_health_check_job_immediate() {
727        let producer = TestJobProducer::new();
728
729        // Test immediate health check job (no scheduling)
730        let health_check = RelayerHealthCheck::new("relayer-1".to_string());
731        let result = producer
732            .produce_relayer_health_check_job(health_check, None)
733            .await;
734
735        // Should succeed
736        assert!(result.is_ok());
737
738        // Verify the job was pushed (not scheduled)
739        let queue = producer.get_queue().await;
740        assert!(queue.relayer_health_check_queue.push_called);
741        assert!(!queue.relayer_health_check_queue.schedule_called);
742
743        // Other queues should not be affected
744        assert!(!queue.transaction_request_queue.push_called);
745        assert!(!queue.transaction_submission_queue.push_called);
746        assert!(!queue.transaction_status_queue.push_called);
747        assert!(!queue.notification_queue.push_called);
748        assert!(!queue.token_swap_request_queue.push_called);
749    }
750
751    #[tokio::test]
752    async fn test_produce_relayer_health_check_job_scheduled() {
753        let producer = TestJobProducer::new();
754
755        // Test scheduled health check job
756        let health_check = RelayerHealthCheck::new("relayer-2".to_string());
757        let scheduled_timestamp = calculate_scheduled_timestamp(300); // 5 minutes from now
758        let result = producer
759            .produce_relayer_health_check_job(health_check, Some(scheduled_timestamp))
760            .await;
761
762        // Should succeed
763        assert!(result.is_ok());
764
765        // Verify the job was scheduled (not pushed)
766        let queue = producer.get_queue().await;
767        assert!(queue.relayer_health_check_queue.schedule_called);
768        assert!(!queue.relayer_health_check_queue.push_called);
769
770        // Other queues should not be affected
771        assert!(!queue.transaction_request_queue.push_called);
772        assert!(!queue.transaction_submission_queue.push_called);
773        assert!(!queue.transaction_status_queue.push_called);
774        assert!(!queue.notification_queue.push_called);
775        assert!(!queue.token_swap_request_queue.push_called);
776    }
777
778    #[tokio::test]
779    async fn test_produce_relayer_health_check_job_multiple_relayers() {
780        let producer = TestJobProducer::new();
781
782        // Produce health check jobs for multiple relayers
783        let relayer_ids = vec!["relayer-1", "relayer-2", "relayer-3"];
784
785        for relayer_id in &relayer_ids {
786            let health_check = RelayerHealthCheck::new(relayer_id.to_string());
787            let result = producer
788                .produce_relayer_health_check_job(health_check, None)
789                .await;
790            assert!(result.is_ok());
791        }
792
793        // Verify jobs were produced
794        let queue = producer.get_queue().await;
795        assert!(queue.relayer_health_check_queue.push_called);
796    }
797
798    #[tokio::test]
799    async fn test_status_check_routes_to_evm_queue() {
800        use crate::models::NetworkType;
801        let producer = TestJobProducer::new();
802
803        let status_job = TransactionStatusCheck::new("tx-evm", "relayer-1", NetworkType::Evm);
804        let result = producer
805            .produce_check_transaction_status_job(status_job, None)
806            .await;
807
808        assert!(result.is_ok());
809        let queue = producer.get_queue().await;
810        assert!(queue.transaction_status_queue_evm.push_called);
811        assert!(!queue.transaction_status_queue_stellar.push_called);
812        assert!(!queue.transaction_status_queue.push_called);
813    }
814
815    #[tokio::test]
816    async fn test_status_check_routes_to_stellar_queue() {
817        use crate::models::NetworkType;
818        let producer = TestJobProducer::new();
819
820        let status_job =
821            TransactionStatusCheck::new("tx-stellar", "relayer-2", NetworkType::Stellar);
822        let result = producer
823            .produce_check_transaction_status_job(status_job, None)
824            .await;
825
826        assert!(result.is_ok());
827        let queue = producer.get_queue().await;
828        assert!(queue.transaction_status_queue_stellar.push_called);
829        assert!(!queue.transaction_status_queue_evm.push_called);
830        assert!(!queue.transaction_status_queue.push_called);
831    }
832
833    #[tokio::test]
834    async fn test_status_check_routes_to_default_queue_for_solana() {
835        use crate::models::NetworkType;
836        let producer = TestJobProducer::new();
837
838        let status_job = TransactionStatusCheck::new("tx-solana", "relayer-3", NetworkType::Solana);
839        let result = producer
840            .produce_check_transaction_status_job(status_job, None)
841            .await;
842
843        assert!(result.is_ok());
844        let queue = producer.get_queue().await;
845        assert!(queue.transaction_status_queue.push_called);
846        assert!(!queue.transaction_status_queue_evm.push_called);
847        assert!(!queue.transaction_status_queue_stellar.push_called);
848    }
849
850    #[tokio::test]
851    async fn test_status_check_scheduled_evm() {
852        use crate::models::NetworkType;
853        let producer = TestJobProducer::new();
854
855        let status_job =
856            TransactionStatusCheck::new("tx-evm-scheduled", "relayer-1", NetworkType::Evm);
857        let scheduled_timestamp = calculate_scheduled_timestamp(30);
858        let result = producer
859            .produce_check_transaction_status_job(status_job, Some(scheduled_timestamp))
860            .await;
861
862        assert!(result.is_ok());
863        let queue = producer.get_queue().await;
864        assert!(queue.transaction_status_queue_evm.schedule_called);
865        assert!(!queue.transaction_status_queue_evm.push_called);
866    }
867
868    #[tokio::test]
869    async fn test_submit_transaction_scheduled() {
870        let producer = TestJobProducer::new();
871
872        let submit_job = TransactionSend::submit("tx-scheduled", "relayer-1");
873        let scheduled_timestamp = calculate_scheduled_timestamp(15);
874        let result = producer
875            .produce_submit_transaction_job(submit_job, Some(scheduled_timestamp))
876            .await;
877
878        assert!(result.is_ok());
879        let queue = producer.get_queue().await;
880        assert!(queue.transaction_submission_queue.schedule_called);
881        assert!(!queue.transaction_submission_queue.push_called);
882    }
883
884    #[tokio::test]
885    async fn test_notification_job_scheduled() {
886        let producer = TestJobProducer::new();
887
888        let notification = WebhookNotification::new(
889            "test_scheduled_event".to_string(),
890            WebhookPayload::Transaction(TransactionResponse::Evm(Box::new(
891                EvmTransactionResponse {
892                    id: "tx-notify-scheduled".to_string(),
893                    hash: Some("0xabc123".to_string()),
894                    status: TransactionStatus::Confirmed,
895                    status_reason: None,
896                    created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
897                    sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
898                    confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
899                    gas_price: Some(1000000000),
900                    gas_limit: Some(21000),
901                    nonce: Some(1),
902                    value: U256::from(1000000000000000000_u64),
903                    from: "0xabc".to_string(),
904                    to: Some("0xdef".to_string()),
905                    relayer_id: "relayer-1".to_string(),
906                    data: None,
907                    max_fee_per_gas: None,
908                    max_priority_fee_per_gas: None,
909                    signature: None,
910                    speed: None,
911                },
912            ))),
913        );
914        let job = NotificationSend::new("notification-scheduled".to_string(), notification);
915
916        let scheduled_timestamp = calculate_scheduled_timestamp(5);
917        let result = producer
918            .produce_send_notification_job(job, Some(scheduled_timestamp))
919            .await;
920
921        assert!(result.is_ok());
922        let queue = producer.get_queue().await;
923        assert!(queue.notification_queue.schedule_called);
924        assert!(!queue.notification_queue.push_called);
925    }
926
927    #[tokio::test]
928    async fn test_solana_swap_job_immediate() {
929        let producer = TestJobProducer::new();
930
931        let swap_job = TokenSwapRequest::new("relayer-solana".to_string());
932        let result = producer
933            .produce_token_swap_request_job(swap_job, None)
934            .await;
935
936        assert!(result.is_ok());
937        let queue = producer.get_queue().await;
938        assert!(queue.token_swap_request_queue.push_called);
939        assert!(!queue.token_swap_request_queue.schedule_called);
940    }
941
942    #[tokio::test]
943    async fn test_token_swap_job_scheduled() {
944        let producer = TestJobProducer::new();
945
946        let swap_job = TokenSwapRequest::new("relayer-solana".to_string());
947        let scheduled_timestamp = calculate_scheduled_timestamp(20);
948        let result = producer
949            .produce_token_swap_request_job(swap_job, Some(scheduled_timestamp))
950            .await;
951
952        assert!(result.is_ok());
953        let queue = producer.get_queue().await;
954        assert!(queue.token_swap_request_queue.schedule_called);
955        assert!(!queue.token_swap_request_queue.push_called);
956    }
957
958    #[tokio::test]
959    async fn test_transaction_send_cancel_job() {
960        let producer = TestJobProducer::new();
961
962        let cancel_job = TransactionSend::cancel("tx-cancel", "relayer-1", "user requested");
963        let result = producer
964            .produce_submit_transaction_job(cancel_job, None)
965            .await;
966
967        assert!(result.is_ok());
968        let queue = producer.get_queue().await;
969        assert!(queue.transaction_submission_queue.push_called);
970    }
971
972    #[tokio::test]
973    async fn test_transaction_send_resubmit_job() {
974        let producer = TestJobProducer::new();
975
976        let resubmit_job = TransactionSend::resubmit("tx-resubmit", "relayer-1");
977        let result = producer
978            .produce_submit_transaction_job(resubmit_job, None)
979            .await;
980
981        assert!(result.is_ok());
982        let queue = producer.get_queue().await;
983        assert!(queue.transaction_submission_queue.push_called);
984    }
985
986    #[tokio::test]
987    async fn test_transaction_send_resend_job() {
988        let producer = TestJobProducer::new();
989
990        let resend_job = TransactionSend::resend("tx-resend", "relayer-1");
991        let result = producer
992            .produce_submit_transaction_job(resend_job, None)
993            .await;
994
995        assert!(result.is_ok());
996        let queue = producer.get_queue().await;
997        assert!(queue.transaction_submission_queue.push_called);
998    }
999
1000    #[tokio::test]
1001    async fn test_multiple_jobs_different_queues() {
1002        let producer = TestJobProducer::new();
1003
1004        // Produce different types of jobs
1005        let request = TransactionRequest::new("tx1", "relayer-1");
1006        producer
1007            .produce_transaction_request_job(request, None)
1008            .await
1009            .unwrap();
1010
1011        let submit = TransactionSend::submit("tx2", "relayer-1");
1012        producer
1013            .produce_submit_transaction_job(submit, None)
1014            .await
1015            .unwrap();
1016
1017        use crate::models::NetworkType;
1018        let status = TransactionStatusCheck::new("tx3", "relayer-1", NetworkType::Evm);
1019        producer
1020            .produce_check_transaction_status_job(status, None)
1021            .await
1022            .unwrap();
1023
1024        // Verify all queues were used
1025        let queue = producer.get_queue().await;
1026        assert!(queue.transaction_request_queue.push_called);
1027        assert!(queue.transaction_submission_queue.push_called);
1028        assert!(queue.transaction_status_queue_evm.push_called);
1029    }
1030
1031    #[test]
1032    fn test_job_producer_clone() {
1033        let producer = TestJobProducer::new();
1034        let cloned_producer = producer.clone();
1035
1036        // Both should be valid instances
1037        // The clone creates a new Mutex with a cloned Queue
1038        assert!(std::ptr::addr_of!(producer) != std::ptr::addr_of!(cloned_producer));
1039    }
1040
1041    #[tokio::test]
1042    async fn test_transaction_request_with_metadata() {
1043        let producer = TestJobProducer::new();
1044
1045        let mut metadata = std::collections::HashMap::new();
1046        metadata.insert("retry_count".to_string(), "3".to_string());
1047
1048        let request = TransactionRequest::new("tx-meta", "relayer-1").with_metadata(metadata);
1049
1050        let result = producer
1051            .produce_transaction_request_job(request, None)
1052            .await;
1053
1054        assert!(result.is_ok());
1055        let queue = producer.get_queue().await;
1056        assert!(queue.transaction_request_queue.push_called);
1057    }
1058
1059    #[tokio::test]
1060    async fn test_status_check_with_metadata() {
1061        use crate::models::NetworkType;
1062        let producer = TestJobProducer::new();
1063
1064        let mut metadata = std::collections::HashMap::new();
1065        metadata.insert("attempt".to_string(), "2".to_string());
1066
1067        let status =
1068            TransactionStatusCheck::new("tx-status-meta", "relayer-1", NetworkType::Stellar)
1069                .with_metadata(metadata);
1070
1071        let result = producer
1072            .produce_check_transaction_status_job(status, None)
1073            .await;
1074
1075        assert!(result.is_ok());
1076        let queue = producer.get_queue().await;
1077        assert!(queue.transaction_status_queue_stellar.push_called);
1078    }
1079
1080    #[tokio::test]
1081    async fn test_scheduled_jobs_with_different_delays() {
1082        let producer = TestJobProducer::new();
1083
1084        // Test with various scheduling delays
1085        let delays = vec![1, 10, 60, 300, 3600]; // 1s, 10s, 1m, 5m, 1h
1086
1087        for (idx, delay) in delays.iter().enumerate() {
1088            let request = TransactionRequest::new(format!("tx-delay-{}", idx), "relayer-1");
1089            let timestamp = calculate_scheduled_timestamp(*delay);
1090
1091            let result = producer
1092                .produce_transaction_request_job(request, Some(timestamp))
1093                .await;
1094
1095            assert!(
1096                result.is_ok(),
1097                "Failed to schedule job with delay {}",
1098                delay
1099            );
1100        }
1101    }
1102
1103    #[test]
1104    fn test_job_producer_error_display() {
1105        let error = JobProducerError::QueueError("Test queue error".to_string());
1106        let error_string = error.to_string();
1107
1108        assert!(error_string.contains("Queue error"));
1109        assert!(error_string.contains("Test queue error"));
1110    }
1111
1112    #[test]
1113    fn test_job_producer_error_to_relayer_error() {
1114        // Test error conversion preserves original error message
1115        let job_error = JobProducerError::QueueError("Connection failed".to_string());
1116        let relayer_error: RelayerError = job_error.into();
1117
1118        match relayer_error {
1119            RelayerError::QueueError(msg) => {
1120                assert_eq!(msg, "Queue error: Connection failed");
1121            }
1122            _ => panic!("Expected QueueError variant"),
1123        }
1124    }
1125}