1use crate::{
10 jobs::{
11 Job, NotificationSend, Queue, RelayerHealthCheck, TransactionRequest, TransactionSend,
12 TransactionStatusCheck,
13 },
14 models::RelayerError,
15 observability::request_id::get_request_id,
16};
17use apalis::prelude::Storage;
18use apalis_redis::RedisError;
19use async_trait::async_trait;
20use serde::Serialize;
21use thiserror::Error;
22use tracing::{debug, error};
23
24use super::{JobType, TokenSwapRequest};
25
26#[cfg(test)]
27use mockall::automock;
28
29#[derive(Debug, Error, Serialize, Clone)]
30pub enum JobProducerError {
31 #[error("Queue error: {0}")]
32 QueueError(String),
33}
34
35impl From<RedisError> for JobProducerError {
36 fn from(err: RedisError) -> Self {
37 error!(error = %err, "Redis error during job production");
38 JobProducerError::QueueError(err.to_string())
39 }
40}
41
42impl From<JobProducerError> for RelayerError {
43 fn from(err: JobProducerError) -> Self {
44 RelayerError::QueueError(err.to_string())
45 }
46}
47
48#[derive(Debug, Clone)]
51pub struct JobProducer {
52 queue: Queue,
53}
54
55#[async_trait]
56#[cfg_attr(test, automock)]
57pub trait JobProducerTrait: Send + Sync {
58 async fn produce_transaction_request_job(
59 &self,
60 transaction_process_job: TransactionRequest,
61 scheduled_on: Option<i64>,
62 ) -> Result<(), JobProducerError>;
63
64 async fn produce_submit_transaction_job(
65 &self,
66 transaction_submit_job: TransactionSend,
67 scheduled_on: Option<i64>,
68 ) -> Result<(), JobProducerError>;
69
70 async fn produce_check_transaction_status_job(
71 &self,
72 transaction_status_check_job: TransactionStatusCheck,
73 scheduled_on: Option<i64>,
74 ) -> Result<(), JobProducerError>;
75
76 async fn produce_send_notification_job(
77 &self,
78 notification_send_job: NotificationSend,
79 scheduled_on: Option<i64>,
80 ) -> Result<(), JobProducerError>;
81
82 async fn produce_token_swap_request_job(
83 &self,
84 swap_request_job: TokenSwapRequest,
85 scheduled_on: Option<i64>,
86 ) -> Result<(), JobProducerError>;
87
88 async fn produce_relayer_health_check_job(
89 &self,
90 relayer_health_check_job: RelayerHealthCheck,
91 scheduled_on: Option<i64>,
92 ) -> Result<(), JobProducerError>;
93
94 async fn get_queue(&self) -> Result<Queue, JobProducerError>;
95}
96
97impl JobProducer {
98 pub fn new(queue: Queue) -> Self {
99 Self { queue }
100 }
101}
102
103#[async_trait]
104impl JobProducerTrait for JobProducer {
105 async fn get_queue(&self) -> Result<Queue, JobProducerError> {
106 Ok(self.queue.clone())
107 }
108
109 async fn produce_transaction_request_job(
110 &self,
111 transaction_process_job: TransactionRequest,
112 scheduled_on: Option<i64>,
113 ) -> Result<(), JobProducerError> {
114 debug!(
115 "Producing transaction request job: {:?}",
116 transaction_process_job
117 );
118 let mut storage = self.queue.transaction_request_queue.clone();
120 let job = Job::new(JobType::TransactionRequest, transaction_process_job)
121 .with_request_id(get_request_id());
122 let job_id = job.message_id.clone();
123 let request_id = job.request_id.clone();
124 let tx_id = job.data.transaction_id.clone();
125 let relayer_id = job.data.relayer_id.clone();
126
127 match scheduled_on {
128 Some(scheduled_on) => {
129 storage.schedule(job, scheduled_on).await?;
130 }
131 None => {
132 storage.push(job).await?;
133 }
134 }
135 debug!(
136 job_type = %JobType::TransactionRequest,
137 job_id = %job_id,
138 request_id = ?request_id,
139 tx_id = %tx_id,
140 relayer_id = %relayer_id,
141 scheduled_on = ?scheduled_on,
142 "transaction request job produced"
143 );
144
145 Ok(())
146 }
147
148 async fn produce_submit_transaction_job(
149 &self,
150 transaction_submit_job: TransactionSend,
151 scheduled_on: Option<i64>,
152 ) -> Result<(), JobProducerError> {
153 let mut storage = self.queue.transaction_submission_queue.clone();
154 let job = Job::new(JobType::TransactionSend, transaction_submit_job)
155 .with_request_id(get_request_id());
156 let job_id = job.message_id.clone();
157 let request_id = job.request_id.clone();
158 let tx_id = job.data.transaction_id.clone();
159 let relayer_id = job.data.relayer_id.clone();
160 let command = job.data.command.clone();
161
162 match scheduled_on {
163 Some(on) => {
164 storage.schedule(job, on).await?;
165 }
166 None => {
167 storage.push(job).await?;
168 }
169 }
170 debug!(
171 job_type = %JobType::TransactionSend,
172 job_id = %job_id,
173 request_id = ?request_id,
174 tx_id = %tx_id,
175 relayer_id = %relayer_id,
176 command = ?command,
177 scheduled_on = ?scheduled_on,
178 "transaction submission job produced"
179 );
180
181 Ok(())
182 }
183
184 async fn produce_check_transaction_status_job(
185 &self,
186 transaction_status_check_job: TransactionStatusCheck,
187 scheduled_on: Option<i64>,
188 ) -> Result<(), JobProducerError> {
189 let job = Job::new(
190 JobType::TransactionStatusCheck,
191 transaction_status_check_job.clone(),
192 )
193 .with_request_id(get_request_id());
194 let job_id = job.message_id.clone();
195 let request_id = job.request_id.clone();
196 let tx_id = job.data.transaction_id.clone();
197 let relayer_id = job.data.relayer_id.clone();
198
199 use crate::models::NetworkType;
202 let mut storage = match transaction_status_check_job.network_type {
203 Some(NetworkType::Evm) => self.queue.transaction_status_queue_evm.clone(),
204 Some(NetworkType::Stellar) => self.queue.transaction_status_queue_stellar.clone(),
205 _ => self.queue.transaction_status_queue.clone(), };
207
208 match scheduled_on {
209 Some(on) => {
210 storage.schedule(job, on).await?;
211 }
212 None => {
213 storage.push(job).await?;
214 }
215 }
216 debug!(
217 job_type = %JobType::TransactionStatusCheck,
218 job_id = %job_id,
219 request_id = ?request_id,
220 tx_id = %tx_id,
221 relayer_id = %relayer_id,
222 network_type = ?transaction_status_check_job.network_type,
223 scheduled_on = ?scheduled_on,
224 "Transaction Status Check job produced successfully"
225 );
226 Ok(())
227 }
228
229 async fn produce_send_notification_job(
230 &self,
231 notification_send_job: NotificationSend,
232 scheduled_on: Option<i64>,
233 ) -> Result<(), JobProducerError> {
234 let mut storage = self.queue.notification_queue.clone();
235 let job = Job::new(JobType::NotificationSend, notification_send_job)
236 .with_request_id(get_request_id());
237 let job_id = job.message_id.clone();
238 let request_id = job.request_id.clone();
239 let notification_id = job.data.notification_id.clone();
240
241 match scheduled_on {
242 Some(on) => {
243 storage.schedule(job, on).await?;
244 }
245 None => {
246 storage.push(job).await?;
247 }
248 }
249
250 debug!(
251 job_type = %JobType::NotificationSend,
252 job_id = %job_id,
253 request_id = ?request_id,
254 notification_id = %notification_id,
255 scheduled_on = ?scheduled_on,
256 "notification send job produced"
257 );
258 Ok(())
259 }
260
261 async fn produce_token_swap_request_job(
262 &self,
263 swap_request_job: TokenSwapRequest,
264 scheduled_on: Option<i64>,
265 ) -> Result<(), JobProducerError> {
266 let mut storage = self.queue.token_swap_request_queue.clone();
267 let job =
268 Job::new(JobType::TokenSwapRequest, swap_request_job).with_request_id(get_request_id());
269 let job_id = job.message_id.clone();
270 let request_id = job.request_id.clone();
271 let relayer_id = job.data.relayer_id.clone();
272
273 match scheduled_on {
274 Some(on) => {
275 storage.schedule(job, on).await?;
276 }
277 None => {
278 storage.push(job).await?;
279 }
280 }
281
282 debug!(
283 job_type = %JobType::TokenSwapRequest,
284 job_id = %job_id,
285 request_id = ?request_id,
286 relayer_id = %relayer_id,
287 scheduled_on = ?scheduled_on,
288 "token swap job produced"
289 );
290 Ok(())
291 }
292
293 async fn produce_relayer_health_check_job(
294 &self,
295 relayer_health_check_job: RelayerHealthCheck,
296 scheduled_on: Option<i64>,
297 ) -> Result<(), JobProducerError> {
298 let mut storage = self.queue.relayer_health_check_queue.clone();
299 let job = Job::new(
300 JobType::RelayerHealthCheck,
301 relayer_health_check_job.clone(),
302 )
303 .with_request_id(get_request_id());
304 let job_id = job.message_id.clone();
305 let request_id = job.request_id.clone();
306 let relayer_id = job.data.relayer_id.clone();
307
308 match scheduled_on {
309 Some(scheduled_on) => {
310 storage.schedule(job, scheduled_on).await?;
311 }
312 None => {
313 storage.push(job).await?;
314 }
315 }
316
317 debug!(
318 job_type = %JobType::RelayerHealthCheck,
319 job_id = %job_id,
320 request_id = ?request_id,
321 relayer_id = %relayer_id,
322 scheduled_on = ?scheduled_on,
323 "relayer health check job produced"
324 );
325 Ok(())
326 }
327}
328
329#[cfg(test)]
330mod tests {
331 use super::*;
332 use crate::models::{
333 EvmTransactionResponse, TransactionResponse, TransactionStatus, WebhookNotification,
334 WebhookPayload, U256,
335 };
336 use crate::utils::calculate_scheduled_timestamp;
337 use tokio::sync::Mutex;
338
339 #[derive(Clone, Debug)]
340 struct TestRedisStorage<T> {
342 pub push_called: bool,
343 pub schedule_called: bool,
344 _phantom: std::marker::PhantomData<T>,
345 }
346
347 impl<T> TestRedisStorage<T> {
348 fn new() -> Self {
349 Self {
350 push_called: false,
351 schedule_called: false,
352 _phantom: std::marker::PhantomData,
353 }
354 }
355
356 async fn push(&mut self, _job: T) -> Result<(), JobProducerError> {
357 self.push_called = true;
358 Ok(())
359 }
360
361 async fn schedule(&mut self, _job: T, _timestamp: i64) -> Result<(), JobProducerError> {
362 self.schedule_called = true;
363 Ok(())
364 }
365 }
366
367 #[derive(Clone, Debug)]
369 struct TestQueue {
370 pub transaction_request_queue: TestRedisStorage<Job<TransactionRequest>>,
371 pub transaction_submission_queue: TestRedisStorage<Job<TransactionSend>>,
372 pub transaction_status_queue: TestRedisStorage<Job<TransactionStatusCheck>>,
373 pub transaction_status_queue_evm: TestRedisStorage<Job<TransactionStatusCheck>>,
374 pub transaction_status_queue_stellar: TestRedisStorage<Job<TransactionStatusCheck>>,
375 pub notification_queue: TestRedisStorage<Job<NotificationSend>>,
376 pub token_swap_request_queue: TestRedisStorage<Job<TokenSwapRequest>>,
377 pub relayer_health_check_queue: TestRedisStorage<Job<RelayerHealthCheck>>,
378 }
379
380 impl TestQueue {
381 fn new() -> Self {
382 Self {
383 transaction_request_queue: TestRedisStorage::new(),
384 transaction_submission_queue: TestRedisStorage::new(),
385 transaction_status_queue: TestRedisStorage::new(),
386 transaction_status_queue_evm: TestRedisStorage::new(),
387 transaction_status_queue_stellar: TestRedisStorage::new(),
388 notification_queue: TestRedisStorage::new(),
389 token_swap_request_queue: TestRedisStorage::new(),
390 relayer_health_check_queue: TestRedisStorage::new(),
391 }
392 }
393 }
394
395 struct TestJobProducer {
397 queue: Mutex<TestQueue>,
398 }
399
400 impl Clone for TestJobProducer {
401 fn clone(&self) -> Self {
402 let queue = self
403 .queue
404 .try_lock()
405 .expect("Failed to lock queue for cloning")
406 .clone();
407 Self {
408 queue: Mutex::new(queue),
409 }
410 }
411 }
412
413 impl TestJobProducer {
414 fn new() -> Self {
415 Self {
416 queue: Mutex::new(TestQueue::new()),
417 }
418 }
419
420 async fn get_queue(&self) -> TestQueue {
421 self.queue.lock().await.clone()
422 }
423 }
424
425 #[async_trait]
426 impl JobProducerTrait for TestJobProducer {
427 async fn get_queue(&self) -> Result<Queue, JobProducerError> {
428 unimplemented!("get_queue not used in tests")
429 }
430
431 async fn produce_transaction_request_job(
432 &self,
433 transaction_process_job: TransactionRequest,
434 scheduled_on: Option<i64>,
435 ) -> Result<(), JobProducerError> {
436 let mut queue = self.queue.lock().await;
437 let job = Job::new(JobType::TransactionRequest, transaction_process_job);
438
439 match scheduled_on {
440 Some(scheduled_on) => {
441 queue
442 .transaction_request_queue
443 .schedule(job, scheduled_on)
444 .await?;
445 }
446 None => {
447 queue.transaction_request_queue.push(job).await?;
448 }
449 }
450
451 Ok(())
452 }
453
454 async fn produce_submit_transaction_job(
455 &self,
456 transaction_submit_job: TransactionSend,
457 scheduled_on: Option<i64>,
458 ) -> Result<(), JobProducerError> {
459 let mut queue = self.queue.lock().await;
460 let job = Job::new(JobType::TransactionSend, transaction_submit_job);
461
462 match scheduled_on {
463 Some(on) => {
464 queue.transaction_submission_queue.schedule(job, on).await?;
465 }
466 None => {
467 queue.transaction_submission_queue.push(job).await?;
468 }
469 }
470
471 Ok(())
472 }
473
474 async fn produce_check_transaction_status_job(
475 &self,
476 transaction_status_check_job: TransactionStatusCheck,
477 scheduled_on: Option<i64>,
478 ) -> Result<(), JobProducerError> {
479 let mut queue = self.queue.lock().await;
480 let job = Job::new(
481 JobType::TransactionStatusCheck,
482 transaction_status_check_job.clone(),
483 );
484
485 use crate::models::NetworkType;
487 let status_queue = match transaction_status_check_job.network_type {
488 Some(NetworkType::Evm) => &mut queue.transaction_status_queue_evm,
489 Some(NetworkType::Stellar) => &mut queue.transaction_status_queue_stellar,
490 Some(NetworkType::Solana) => &mut queue.transaction_status_queue, None => &mut queue.transaction_status_queue, };
493
494 match scheduled_on {
495 Some(on) => {
496 status_queue.schedule(job, on).await?;
497 }
498 None => {
499 status_queue.push(job).await?;
500 }
501 }
502
503 Ok(())
504 }
505
506 async fn produce_send_notification_job(
507 &self,
508 notification_send_job: NotificationSend,
509 scheduled_on: Option<i64>,
510 ) -> Result<(), JobProducerError> {
511 let mut queue = self.queue.lock().await;
512 let job = Job::new(JobType::NotificationSend, notification_send_job);
513
514 match scheduled_on {
515 Some(on) => {
516 queue.notification_queue.schedule(job, on).await?;
517 }
518 None => {
519 queue.notification_queue.push(job).await?;
520 }
521 }
522
523 Ok(())
524 }
525
526 async fn produce_token_swap_request_job(
527 &self,
528 swap_request_job: TokenSwapRequest,
529 scheduled_on: Option<i64>,
530 ) -> Result<(), JobProducerError> {
531 let mut queue = self.queue.lock().await;
532 let job = Job::new(JobType::TokenSwapRequest, swap_request_job);
533
534 match scheduled_on {
535 Some(on) => {
536 queue.token_swap_request_queue.schedule(job, on).await?;
537 }
538 None => {
539 queue.token_swap_request_queue.push(job).await?;
540 }
541 }
542
543 Ok(())
544 }
545
546 async fn produce_relayer_health_check_job(
547 &self,
548 relayer_health_check_job: RelayerHealthCheck,
549 scheduled_on: Option<i64>,
550 ) -> Result<(), JobProducerError> {
551 let mut queue = self.queue.lock().await;
552 let job = Job::new(JobType::RelayerHealthCheck, relayer_health_check_job);
553
554 match scheduled_on {
555 Some(scheduled_on) => {
556 queue
557 .relayer_health_check_queue
558 .schedule(job, scheduled_on)
559 .await?;
560 }
561 None => {
562 queue.relayer_health_check_queue.push(job).await?;
563 }
564 }
565
566 Ok(())
567 }
568 }
569
570 #[tokio::test]
571 async fn test_job_producer_operations() {
572 let producer = TestJobProducer::new();
573
574 let request = TransactionRequest::new("tx123", "relayer-1");
576 let result = producer
577 .produce_transaction_request_job(request, None)
578 .await;
579 assert!(result.is_ok());
580
581 let queue = producer.get_queue().await;
582 assert!(queue.transaction_request_queue.push_called);
583
584 let producer = TestJobProducer::new();
586 let request = TransactionRequest::new("tx123", "relayer-1");
587 let scheduled_timestamp = calculate_scheduled_timestamp(10); let result = producer
589 .produce_transaction_request_job(request, Some(scheduled_timestamp))
590 .await;
591 assert!(result.is_ok());
592
593 let queue = producer.get_queue().await;
594 assert!(queue.transaction_request_queue.schedule_called);
595 }
596
597 #[tokio::test]
598 async fn test_submit_transaction_job() {
599 let producer = TestJobProducer::new();
600
601 let submit_job = TransactionSend::submit("tx123", "relayer-1");
603 let result = producer
604 .produce_submit_transaction_job(submit_job, None)
605 .await;
606 assert!(result.is_ok());
607
608 let queue = producer.get_queue().await;
609 assert!(queue.transaction_submission_queue.push_called);
610 }
611
612 #[tokio::test]
613 async fn test_check_status_job() {
614 use crate::models::NetworkType;
615 let producer = TestJobProducer::new();
616
617 let status_job = TransactionStatusCheck::new("tx123", "relayer-1", NetworkType::Evm);
619 let result = producer
620 .produce_check_transaction_status_job(status_job, None)
621 .await;
622 assert!(result.is_ok());
623
624 let queue = producer.get_queue().await;
625 assert!(queue.transaction_status_queue_evm.push_called);
626 }
627
628 #[tokio::test]
629 async fn test_notification_job() {
630 let producer = TestJobProducer::new();
631
632 let notification = WebhookNotification::new(
634 "test_event".to_string(),
635 WebhookPayload::Transaction(TransactionResponse::Evm(Box::new(
636 EvmTransactionResponse {
637 id: "tx123".to_string(),
638 hash: Some("0x123".to_string()),
639 status: TransactionStatus::Confirmed,
640 status_reason: None,
641 created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
642 sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
643 confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
644 gas_price: Some(1000000000),
645 gas_limit: Some(21000),
646 nonce: Some(1),
647 value: U256::from(1000000000000000000_u64),
648 from: "0xabc".to_string(),
649 to: Some("0xdef".to_string()),
650 relayer_id: "relayer-1".to_string(),
651 data: None,
652 max_fee_per_gas: None,
653 max_priority_fee_per_gas: None,
654 signature: None,
655 speed: None,
656 },
657 ))),
658 );
659 let job = NotificationSend::new("notification-1".to_string(), notification);
660
661 let result = producer.produce_send_notification_job(job, None).await;
662 assert!(result.is_ok());
663
664 let queue = producer.get_queue().await;
665 assert!(queue.notification_queue.push_called);
666 }
667
668 #[tokio::test]
669 async fn test_relayer_health_check_job() {
670 let producer = TestJobProducer::new();
671
672 let health_check = RelayerHealthCheck::new("relayer-1".to_string());
674 let result = producer
675 .produce_relayer_health_check_job(health_check, None)
676 .await;
677 assert!(result.is_ok());
678
679 let queue = producer.get_queue().await;
680 assert!(queue.relayer_health_check_queue.push_called);
681
682 let producer = TestJobProducer::new();
684 let health_check = RelayerHealthCheck::new("relayer-1".to_string());
685 let scheduled_timestamp = calculate_scheduled_timestamp(60);
686 let result = producer
687 .produce_relayer_health_check_job(health_check, Some(scheduled_timestamp))
688 .await;
689 assert!(result.is_ok());
690
691 let queue = producer.get_queue().await;
692 assert!(queue.relayer_health_check_queue.schedule_called);
693 }
694
695 #[test]
696 fn test_job_producer_error_conversion() {
697 let job_error = JobProducerError::QueueError("Test error".to_string());
699 let relayer_error: RelayerError = job_error.into();
700
701 match relayer_error {
702 RelayerError::QueueError(msg) => {
703 assert_eq!(msg, "Queue error: Test error");
704 }
705 _ => panic!("Unexpected error type"),
706 }
707 }
708
709 #[tokio::test]
710 async fn test_get_queue() {
711 let producer = TestJobProducer::new();
712
713 let queue = producer.get_queue().await;
715
716 assert!(!queue.transaction_request_queue.push_called);
718 assert!(!queue.transaction_request_queue.schedule_called);
719 assert!(!queue.transaction_submission_queue.push_called);
720 assert!(!queue.notification_queue.push_called);
721 assert!(!queue.token_swap_request_queue.push_called);
722 assert!(!queue.relayer_health_check_queue.push_called);
723 }
724
725 #[tokio::test]
726 async fn test_produce_relayer_health_check_job_immediate() {
727 let producer = TestJobProducer::new();
728
729 let health_check = RelayerHealthCheck::new("relayer-1".to_string());
731 let result = producer
732 .produce_relayer_health_check_job(health_check, None)
733 .await;
734
735 assert!(result.is_ok());
737
738 let queue = producer.get_queue().await;
740 assert!(queue.relayer_health_check_queue.push_called);
741 assert!(!queue.relayer_health_check_queue.schedule_called);
742
743 assert!(!queue.transaction_request_queue.push_called);
745 assert!(!queue.transaction_submission_queue.push_called);
746 assert!(!queue.transaction_status_queue.push_called);
747 assert!(!queue.notification_queue.push_called);
748 assert!(!queue.token_swap_request_queue.push_called);
749 }
750
751 #[tokio::test]
752 async fn test_produce_relayer_health_check_job_scheduled() {
753 let producer = TestJobProducer::new();
754
755 let health_check = RelayerHealthCheck::new("relayer-2".to_string());
757 let scheduled_timestamp = calculate_scheduled_timestamp(300); let result = producer
759 .produce_relayer_health_check_job(health_check, Some(scheduled_timestamp))
760 .await;
761
762 assert!(result.is_ok());
764
765 let queue = producer.get_queue().await;
767 assert!(queue.relayer_health_check_queue.schedule_called);
768 assert!(!queue.relayer_health_check_queue.push_called);
769
770 assert!(!queue.transaction_request_queue.push_called);
772 assert!(!queue.transaction_submission_queue.push_called);
773 assert!(!queue.transaction_status_queue.push_called);
774 assert!(!queue.notification_queue.push_called);
775 assert!(!queue.token_swap_request_queue.push_called);
776 }
777
778 #[tokio::test]
779 async fn test_produce_relayer_health_check_job_multiple_relayers() {
780 let producer = TestJobProducer::new();
781
782 let relayer_ids = vec!["relayer-1", "relayer-2", "relayer-3"];
784
785 for relayer_id in &relayer_ids {
786 let health_check = RelayerHealthCheck::new(relayer_id.to_string());
787 let result = producer
788 .produce_relayer_health_check_job(health_check, None)
789 .await;
790 assert!(result.is_ok());
791 }
792
793 let queue = producer.get_queue().await;
795 assert!(queue.relayer_health_check_queue.push_called);
796 }
797
798 #[tokio::test]
799 async fn test_status_check_routes_to_evm_queue() {
800 use crate::models::NetworkType;
801 let producer = TestJobProducer::new();
802
803 let status_job = TransactionStatusCheck::new("tx-evm", "relayer-1", NetworkType::Evm);
804 let result = producer
805 .produce_check_transaction_status_job(status_job, None)
806 .await;
807
808 assert!(result.is_ok());
809 let queue = producer.get_queue().await;
810 assert!(queue.transaction_status_queue_evm.push_called);
811 assert!(!queue.transaction_status_queue_stellar.push_called);
812 assert!(!queue.transaction_status_queue.push_called);
813 }
814
815 #[tokio::test]
816 async fn test_status_check_routes_to_stellar_queue() {
817 use crate::models::NetworkType;
818 let producer = TestJobProducer::new();
819
820 let status_job =
821 TransactionStatusCheck::new("tx-stellar", "relayer-2", NetworkType::Stellar);
822 let result = producer
823 .produce_check_transaction_status_job(status_job, None)
824 .await;
825
826 assert!(result.is_ok());
827 let queue = producer.get_queue().await;
828 assert!(queue.transaction_status_queue_stellar.push_called);
829 assert!(!queue.transaction_status_queue_evm.push_called);
830 assert!(!queue.transaction_status_queue.push_called);
831 }
832
833 #[tokio::test]
834 async fn test_status_check_routes_to_default_queue_for_solana() {
835 use crate::models::NetworkType;
836 let producer = TestJobProducer::new();
837
838 let status_job = TransactionStatusCheck::new("tx-solana", "relayer-3", NetworkType::Solana);
839 let result = producer
840 .produce_check_transaction_status_job(status_job, None)
841 .await;
842
843 assert!(result.is_ok());
844 let queue = producer.get_queue().await;
845 assert!(queue.transaction_status_queue.push_called);
846 assert!(!queue.transaction_status_queue_evm.push_called);
847 assert!(!queue.transaction_status_queue_stellar.push_called);
848 }
849
850 #[tokio::test]
851 async fn test_status_check_scheduled_evm() {
852 use crate::models::NetworkType;
853 let producer = TestJobProducer::new();
854
855 let status_job =
856 TransactionStatusCheck::new("tx-evm-scheduled", "relayer-1", NetworkType::Evm);
857 let scheduled_timestamp = calculate_scheduled_timestamp(30);
858 let result = producer
859 .produce_check_transaction_status_job(status_job, Some(scheduled_timestamp))
860 .await;
861
862 assert!(result.is_ok());
863 let queue = producer.get_queue().await;
864 assert!(queue.transaction_status_queue_evm.schedule_called);
865 assert!(!queue.transaction_status_queue_evm.push_called);
866 }
867
868 #[tokio::test]
869 async fn test_submit_transaction_scheduled() {
870 let producer = TestJobProducer::new();
871
872 let submit_job = TransactionSend::submit("tx-scheduled", "relayer-1");
873 let scheduled_timestamp = calculate_scheduled_timestamp(15);
874 let result = producer
875 .produce_submit_transaction_job(submit_job, Some(scheduled_timestamp))
876 .await;
877
878 assert!(result.is_ok());
879 let queue = producer.get_queue().await;
880 assert!(queue.transaction_submission_queue.schedule_called);
881 assert!(!queue.transaction_submission_queue.push_called);
882 }
883
884 #[tokio::test]
885 async fn test_notification_job_scheduled() {
886 let producer = TestJobProducer::new();
887
888 let notification = WebhookNotification::new(
889 "test_scheduled_event".to_string(),
890 WebhookPayload::Transaction(TransactionResponse::Evm(Box::new(
891 EvmTransactionResponse {
892 id: "tx-notify-scheduled".to_string(),
893 hash: Some("0xabc123".to_string()),
894 status: TransactionStatus::Confirmed,
895 status_reason: None,
896 created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
897 sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
898 confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
899 gas_price: Some(1000000000),
900 gas_limit: Some(21000),
901 nonce: Some(1),
902 value: U256::from(1000000000000000000_u64),
903 from: "0xabc".to_string(),
904 to: Some("0xdef".to_string()),
905 relayer_id: "relayer-1".to_string(),
906 data: None,
907 max_fee_per_gas: None,
908 max_priority_fee_per_gas: None,
909 signature: None,
910 speed: None,
911 },
912 ))),
913 );
914 let job = NotificationSend::new("notification-scheduled".to_string(), notification);
915
916 let scheduled_timestamp = calculate_scheduled_timestamp(5);
917 let result = producer
918 .produce_send_notification_job(job, Some(scheduled_timestamp))
919 .await;
920
921 assert!(result.is_ok());
922 let queue = producer.get_queue().await;
923 assert!(queue.notification_queue.schedule_called);
924 assert!(!queue.notification_queue.push_called);
925 }
926
927 #[tokio::test]
928 async fn test_solana_swap_job_immediate() {
929 let producer = TestJobProducer::new();
930
931 let swap_job = TokenSwapRequest::new("relayer-solana".to_string());
932 let result = producer
933 .produce_token_swap_request_job(swap_job, None)
934 .await;
935
936 assert!(result.is_ok());
937 let queue = producer.get_queue().await;
938 assert!(queue.token_swap_request_queue.push_called);
939 assert!(!queue.token_swap_request_queue.schedule_called);
940 }
941
942 #[tokio::test]
943 async fn test_token_swap_job_scheduled() {
944 let producer = TestJobProducer::new();
945
946 let swap_job = TokenSwapRequest::new("relayer-solana".to_string());
947 let scheduled_timestamp = calculate_scheduled_timestamp(20);
948 let result = producer
949 .produce_token_swap_request_job(swap_job, Some(scheduled_timestamp))
950 .await;
951
952 assert!(result.is_ok());
953 let queue = producer.get_queue().await;
954 assert!(queue.token_swap_request_queue.schedule_called);
955 assert!(!queue.token_swap_request_queue.push_called);
956 }
957
958 #[tokio::test]
959 async fn test_transaction_send_cancel_job() {
960 let producer = TestJobProducer::new();
961
962 let cancel_job = TransactionSend::cancel("tx-cancel", "relayer-1", "user requested");
963 let result = producer
964 .produce_submit_transaction_job(cancel_job, None)
965 .await;
966
967 assert!(result.is_ok());
968 let queue = producer.get_queue().await;
969 assert!(queue.transaction_submission_queue.push_called);
970 }
971
972 #[tokio::test]
973 async fn test_transaction_send_resubmit_job() {
974 let producer = TestJobProducer::new();
975
976 let resubmit_job = TransactionSend::resubmit("tx-resubmit", "relayer-1");
977 let result = producer
978 .produce_submit_transaction_job(resubmit_job, None)
979 .await;
980
981 assert!(result.is_ok());
982 let queue = producer.get_queue().await;
983 assert!(queue.transaction_submission_queue.push_called);
984 }
985
986 #[tokio::test]
987 async fn test_transaction_send_resend_job() {
988 let producer = TestJobProducer::new();
989
990 let resend_job = TransactionSend::resend("tx-resend", "relayer-1");
991 let result = producer
992 .produce_submit_transaction_job(resend_job, None)
993 .await;
994
995 assert!(result.is_ok());
996 let queue = producer.get_queue().await;
997 assert!(queue.transaction_submission_queue.push_called);
998 }
999
1000 #[tokio::test]
1001 async fn test_multiple_jobs_different_queues() {
1002 let producer = TestJobProducer::new();
1003
1004 let request = TransactionRequest::new("tx1", "relayer-1");
1006 producer
1007 .produce_transaction_request_job(request, None)
1008 .await
1009 .unwrap();
1010
1011 let submit = TransactionSend::submit("tx2", "relayer-1");
1012 producer
1013 .produce_submit_transaction_job(submit, None)
1014 .await
1015 .unwrap();
1016
1017 use crate::models::NetworkType;
1018 let status = TransactionStatusCheck::new("tx3", "relayer-1", NetworkType::Evm);
1019 producer
1020 .produce_check_transaction_status_job(status, None)
1021 .await
1022 .unwrap();
1023
1024 let queue = producer.get_queue().await;
1026 assert!(queue.transaction_request_queue.push_called);
1027 assert!(queue.transaction_submission_queue.push_called);
1028 assert!(queue.transaction_status_queue_evm.push_called);
1029 }
1030
1031 #[test]
1032 fn test_job_producer_clone() {
1033 let producer = TestJobProducer::new();
1034 let cloned_producer = producer.clone();
1035
1036 assert!(std::ptr::addr_of!(producer) != std::ptr::addr_of!(cloned_producer));
1039 }
1040
1041 #[tokio::test]
1042 async fn test_transaction_request_with_metadata() {
1043 let producer = TestJobProducer::new();
1044
1045 let mut metadata = std::collections::HashMap::new();
1046 metadata.insert("retry_count".to_string(), "3".to_string());
1047
1048 let request = TransactionRequest::new("tx-meta", "relayer-1").with_metadata(metadata);
1049
1050 let result = producer
1051 .produce_transaction_request_job(request, None)
1052 .await;
1053
1054 assert!(result.is_ok());
1055 let queue = producer.get_queue().await;
1056 assert!(queue.transaction_request_queue.push_called);
1057 }
1058
1059 #[tokio::test]
1060 async fn test_status_check_with_metadata() {
1061 use crate::models::NetworkType;
1062 let producer = TestJobProducer::new();
1063
1064 let mut metadata = std::collections::HashMap::new();
1065 metadata.insert("attempt".to_string(), "2".to_string());
1066
1067 let status =
1068 TransactionStatusCheck::new("tx-status-meta", "relayer-1", NetworkType::Stellar)
1069 .with_metadata(metadata);
1070
1071 let result = producer
1072 .produce_check_transaction_status_job(status, None)
1073 .await;
1074
1075 assert!(result.is_ok());
1076 let queue = producer.get_queue().await;
1077 assert!(queue.transaction_status_queue_stellar.push_called);
1078 }
1079
1080 #[tokio::test]
1081 async fn test_scheduled_jobs_with_different_delays() {
1082 let producer = TestJobProducer::new();
1083
1084 let delays = vec![1, 10, 60, 300, 3600]; for (idx, delay) in delays.iter().enumerate() {
1088 let request = TransactionRequest::new(format!("tx-delay-{}", idx), "relayer-1");
1089 let timestamp = calculate_scheduled_timestamp(*delay);
1090
1091 let result = producer
1092 .produce_transaction_request_job(request, Some(timestamp))
1093 .await;
1094
1095 assert!(
1096 result.is_ok(),
1097 "Failed to schedule job with delay {}",
1098 delay
1099 );
1100 }
1101 }
1102
1103 #[test]
1104 fn test_job_producer_error_display() {
1105 let error = JobProducerError::QueueError("Test queue error".to_string());
1106 let error_string = error.to_string();
1107
1108 assert!(error_string.contains("Queue error"));
1109 assert!(error_string.contains("Test queue error"));
1110 }
1111
1112 #[test]
1113 fn test_job_producer_error_to_relayer_error() {
1114 let job_error = JobProducerError::QueueError("Connection failed".to_string());
1116 let relayer_error: RelayerError = job_error.into();
1117
1118 match relayer_error {
1119 RelayerError::QueueError(msg) => {
1120 assert_eq!(msg, "Queue error: Connection failed");
1121 }
1122 _ => panic!("Expected QueueError variant"),
1123 }
1124 }
1125}