1use chrono::{DateTime, Utc};
6use soroban_rs::xdr::{
7 Error, Hash, InnerTransactionResultResult, InvokeHostFunctionResult, Limits, OperationResult,
8 OperationResultTr, TransactionEnvelope, TransactionResultResult, WriteXdr,
9};
10use tracing::{debug, info, warn};
11
12use super::{is_final_state, StellarRelayerTransaction};
13use crate::constants::{get_stellar_max_stuck_transaction_lifetime, get_stellar_resend_timeout};
14use crate::domain::transaction::stellar::prepare::common::send_submit_transaction_job;
15use crate::domain::transaction::stellar::utils::extract_return_value_from_meta;
16use crate::domain::transaction::stellar::utils::extract_time_bounds;
17use crate::domain::transaction::util::{get_age_since_created, get_age_since_sent_or_created};
18use crate::domain::xdr_utils::parse_transaction_xdr;
19use crate::{
20 constants::STELLAR_PENDING_RECOVERY_TRIGGER_SECONDS,
21 jobs::{JobProducerTrait, StatusCheckContext, TransactionRequest},
22 models::{
23 NetworkTransactionData, RelayerRepoModel, TransactionError, TransactionRepoModel,
24 TransactionStatus, TransactionUpdateRequest,
25 },
26 repositories::{Repository, TransactionCounterTrait, TransactionRepository},
27 services::{
28 provider::StellarProviderTrait,
29 signer::{Signer, StellarSignTrait},
30 },
31};
32
33impl<R, T, J, S, P, C, D> StellarRelayerTransaction<R, T, J, S, P, C, D>
34where
35 R: Repository<RelayerRepoModel, String> + Send + Sync,
36 T: TransactionRepository + Send + Sync,
37 J: JobProducerTrait + Send + Sync,
38 S: Signer + StellarSignTrait + Send + Sync,
39 P: StellarProviderTrait + Send + Sync,
40 C: TransactionCounterTrait + Send + Sync,
41 D: crate::services::stellar_dex::StellarDexServiceTrait + Send + Sync + 'static,
42{
43 pub async fn handle_transaction_status_impl(
51 &self,
52 tx: TransactionRepoModel,
53 context: Option<StatusCheckContext>,
54 ) -> Result<TransactionRepoModel, TransactionError> {
55 debug!(
56 tx_id = %tx.id,
57 relayer_id = %tx.relayer_id,
58 status = ?tx.status,
59 "handling transaction status"
60 );
61
62 if is_final_state(&tx.status) {
64 debug!(
65 tx_id = %tx.id,
66 relayer_id = %tx.relayer_id,
67 status = ?tx.status,
68 "transaction in final state, skipping status check"
69 );
70 return Ok(tx);
71 }
72
73 if let Some(ref ctx) = context {
75 if ctx.should_force_finalize() {
76 let reason = format!(
77 "Transaction status monitoring failed after {} consecutive errors (total: {}). \
78 Last status: {:?}. Unable to determine final on-chain state.",
79 ctx.consecutive_failures, ctx.total_failures, tx.status
80 );
81 warn!(
82 tx_id = %tx.id,
83 consecutive_failures = ctx.consecutive_failures,
84 total_failures = ctx.total_failures,
85 max_consecutive = ctx.max_consecutive_failures,
86 "circuit breaker triggered, forcing transaction to failed state"
87 );
88 return self.mark_as_failed(tx, reason).await;
92 }
93 }
94
95 match self.status_core(tx.clone()).await {
96 Ok(updated_tx) => {
97 debug!(
98 tx_id = %updated_tx.id,
99 status = ?updated_tx.status,
100 "status check completed successfully"
101 );
102 Ok(updated_tx)
103 }
104 Err(error) => {
105 debug!(
106 tx_id = %tx.id,
107 error = ?error,
108 "status check encountered error"
109 );
110
111 match error {
113 TransactionError::ValidationError(ref msg) => {
114 warn!(
117 tx_id = %tx.id,
118 error = %msg,
119 "validation error detected - marking transaction as failed"
120 );
121
122 self.mark_as_failed(tx, format!("Validation error: {msg}"))
123 .await
124 }
125 _ => {
126 warn!(
129 tx_id = %tx.id,
130 error = ?error,
131 "status check failed with retriable error, will retry"
132 );
133 Err(error)
134 }
135 }
136 }
137 }
138 }
139
140 async fn status_core(
142 &self,
143 tx: TransactionRepoModel,
144 ) -> Result<TransactionRepoModel, TransactionError> {
145 if tx.status == TransactionStatus::Pending {
148 return self.handle_pending_state(tx).await;
149 }
150
151 if tx.status == TransactionStatus::Sent {
152 return self.handle_sent_state(tx).await;
153 }
154
155 let stellar_hash = match self.parse_and_validate_hash(&tx) {
156 Ok(hash) => hash,
157 Err(e) => {
158 warn!(
161 tx_id = %tx.id,
162 status = ?tx.status,
163 error = ?e,
164 "failed to parse and validate hash for submitted transaction"
165 );
166 return self
167 .mark_as_failed(tx, format!("Failed to parse and validate hash: {e}"))
168 .await;
169 }
170 };
171
172 let provider_response = match self.provider().get_transaction(&stellar_hash).await {
173 Ok(response) => response,
174 Err(e) => {
175 warn!(error = ?e, "provider get_transaction failed");
176 return Err(TransactionError::from(e));
177 }
178 };
179
180 match provider_response.status.as_str().to_uppercase().as_str() {
181 "SUCCESS" => self.handle_stellar_success(tx, provider_response).await,
182 "FAILED" => self.handle_stellar_failed(tx, provider_response).await,
183 _ => {
184 self.handle_stellar_pending(tx, provider_response.status)
185 .await
186 }
187 }
188 }
189
190 pub fn parse_and_validate_hash(
193 &self,
194 tx: &TransactionRepoModel,
195 ) -> Result<Hash, TransactionError> {
196 let stellar_network_data = tx.network_data.get_stellar_transaction_data()?;
197
198 let tx_hash_str = stellar_network_data.hash.as_deref().filter(|s| !s.is_empty()).ok_or_else(|| {
199 TransactionError::ValidationError(format!(
200 "Stellar transaction {} is missing or has an empty on-chain hash in network_data. Cannot check status.",
201 tx.id
202 ))
203 })?;
204
205 let stellar_hash: Hash = tx_hash_str.parse().map_err(|e: Error| {
206 TransactionError::UnexpectedError(format!(
207 "Failed to parse transaction hash '{}' for tx {}: {:?}. This hash may be corrupted or not a valid Stellar hash.",
208 tx_hash_str, tx.id, e
209 ))
210 })?;
211
212 Ok(stellar_hash)
213 }
214
215 pub(super) async fn mark_as_failed(
217 &self,
218 tx: TransactionRepoModel,
219 reason: String,
220 ) -> Result<TransactionRepoModel, TransactionError> {
221 warn!(tx_id = %tx.id, reason = %reason, "marking transaction as failed");
222
223 let update_request = TransactionUpdateRequest {
224 status: Some(TransactionStatus::Failed),
225 status_reason: Some(reason),
226 ..Default::default()
227 };
228
229 let failed_tx = self
230 .finalize_transaction_state(tx.id.clone(), update_request)
231 .await?;
232
233 if let Err(e) = self.enqueue_next_pending_transaction(&tx.id).await {
235 warn!(error = %e, "failed to enqueue next pending transaction after failure");
236 }
237
238 Ok(failed_tx)
239 }
240
241 pub(super) async fn mark_as_expired(
243 &self,
244 tx: TransactionRepoModel,
245 reason: String,
246 ) -> Result<TransactionRepoModel, TransactionError> {
247 info!(tx_id = %tx.id, reason = %reason, "marking transaction as expired");
248
249 let update_request = TransactionUpdateRequest {
250 status: Some(TransactionStatus::Expired),
251 status_reason: Some(reason),
252 ..Default::default()
253 };
254
255 let expired_tx = self
256 .finalize_transaction_state(tx.id.clone(), update_request)
257 .await?;
258
259 if let Err(e) = self.enqueue_next_pending_transaction(&tx.id).await {
261 warn!(tx_id = %tx.id, relayer_id = %tx.relayer_id, error = %e, "failed to enqueue next pending transaction after expiration");
262 }
263
264 Ok(expired_tx)
265 }
266
267 pub(super) fn is_transaction_expired(
269 &self,
270 tx: &TransactionRepoModel,
271 ) -> Result<bool, TransactionError> {
272 if let Some(valid_until_str) = &tx.valid_until {
273 return Ok(Self::is_valid_until_string_expired(valid_until_str));
274 }
275
276 let stellar_data = tx.network_data.get_stellar_transaction_data()?;
278 if let Some(signed_xdr) = &stellar_data.signed_envelope_xdr {
279 if let Ok(envelope) = parse_transaction_xdr(signed_xdr, true) {
280 if let Some(tb) = extract_time_bounds(&envelope) {
281 if tb.max_time.0 == 0 {
282 return Ok(false); }
284 return Ok(Utc::now().timestamp() as u64 > tb.max_time.0);
285 }
286 }
287 }
288
289 Ok(false)
290 }
291
292 fn is_valid_until_string_expired(valid_until: &str) -> bool {
294 if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(valid_until) {
295 return Utc::now() > dt.with_timezone(&Utc);
296 }
297 match valid_until.parse::<i64>() {
298 Ok(0) => false,
299 Ok(ts) => Utc::now().timestamp() > ts,
300 Err(_) => false,
301 }
302 }
303
304 pub async fn handle_stellar_success(
306 &self,
307 tx: TransactionRepoModel,
308 provider_response: soroban_rs::stellar_rpc_client::GetTransactionResponse,
309 ) -> Result<TransactionRepoModel, TransactionError> {
310 let updated_network_data =
312 tx.network_data
313 .get_stellar_transaction_data()
314 .ok()
315 .map(|mut stellar_data| {
316 if let Some(tx_result) = provider_response.result.as_ref() {
318 stellar_data = stellar_data.with_fee(tx_result.fee_charged as u32);
319 }
320
321 if let Some(result_meta) = provider_response.result_meta.as_ref() {
323 if let Some(return_value) = extract_return_value_from_meta(result_meta) {
324 let xdr_base64 = return_value.to_xdr_base64(Limits::none());
325 if let Ok(xdr_base64) = xdr_base64 {
326 stellar_data = stellar_data.with_transaction_result_xdr(xdr_base64);
327 } else {
328 warn!("Failed to serialize return value to XDR base64");
329 }
330 }
331 }
332
333 NetworkTransactionData::Stellar(stellar_data)
334 });
335
336 let update_request = TransactionUpdateRequest {
337 status: Some(TransactionStatus::Confirmed),
338 confirmed_at: Some(Utc::now().to_rfc3339()),
339 network_data: updated_network_data,
340 ..Default::default()
341 };
342
343 let confirmed_tx = self
344 .finalize_transaction_state(tx.id.clone(), update_request)
345 .await?;
346
347 self.enqueue_next_pending_transaction(&tx.id).await?;
348
349 Ok(confirmed_tx)
350 }
351
352 pub async fn handle_stellar_failed(
354 &self,
355 tx: TransactionRepoModel,
356 provider_response: soroban_rs::stellar_rpc_client::GetTransactionResponse,
357 ) -> Result<TransactionRepoModel, TransactionError> {
358 let result_code = provider_response
359 .result
360 .as_ref()
361 .map(|r| r.result.name())
362 .unwrap_or("unknown");
363
364 let (inner_result_code, op_result_code, inner_tx_hash, inner_fee_charged) =
366 match provider_response.result.as_ref().map(|r| &r.result) {
367 Some(TransactionResultResult::TxFeeBumpInnerFailed(pair)) => {
368 let inner = &pair.result.result;
369 let op = match inner {
370 InnerTransactionResultResult::TxFailed(ops) => {
371 first_failing_op(ops.as_slice())
372 }
373 _ => None,
374 };
375 (
376 Some(inner.name()),
377 op,
378 Some(hex::encode(pair.transaction_hash.0)),
379 pair.result.fee_charged,
380 )
381 }
382 Some(TransactionResultResult::TxFailed(ops)) => {
383 (None, first_failing_op(ops.as_slice()), None, 0)
384 }
385 _ => (None, None, None, 0),
386 };
387
388 let fee_charged = provider_response.result.as_ref().map(|r| r.fee_charged);
389 let fee_bid = provider_response.envelope.as_ref().map(extract_fee_bid);
390
391 warn!(
392 tx_id = %tx.id,
393 result_code,
394 inner_result_code = inner_result_code.unwrap_or("n/a"),
395 op_result_code = op_result_code.unwrap_or("n/a"),
396 inner_tx_hash = inner_tx_hash.as_deref().unwrap_or("n/a"),
397 inner_fee_charged,
398 fee_charged = ?fee_charged,
399 fee_bid = ?fee_bid,
400 "stellar transaction failed"
401 );
402
403 let status_reason = format!(
404 "Transaction failed on-chain. Provider status: FAILED. Specific XDR reason: {result_code}."
405 );
406
407 let update_request = TransactionUpdateRequest {
408 status: Some(TransactionStatus::Failed),
409 status_reason: Some(status_reason),
410 ..Default::default()
411 };
412
413 let updated_tx = self
414 .finalize_transaction_state(tx.id.clone(), update_request)
415 .await?;
416
417 self.enqueue_next_pending_transaction(&tx.id).await?;
418
419 Ok(updated_tx)
420 }
421
422 async fn check_expiration_and_max_lifetime(
425 &self,
426 tx: TransactionRepoModel,
427 failed_reason: String,
428 ) -> Option<Result<TransactionRepoModel, TransactionError>> {
429 let age = match get_age_since_created(&tx) {
430 Ok(age) => age,
431 Err(e) => return Some(Err(e)),
432 };
433
434 if let Ok(true) = self.is_transaction_expired(&tx) {
436 info!(tx_id = %tx.id, valid_until = ?tx.valid_until, "Transaction has expired");
437 return Some(
438 self.mark_as_expired(tx, "Transaction time_bounds expired".to_string())
439 .await,
440 );
441 }
442
443 if age > get_stellar_max_stuck_transaction_lifetime() {
445 warn!(tx_id = %tx.id, age_minutes = age.num_minutes(),
446 "Transaction exceeded max lifetime, marking as Failed");
447 return Some(self.mark_as_failed(tx, failed_reason).await);
448 }
449
450 None
451 }
452
453 async fn handle_sent_state(
456 &self,
457 tx: TransactionRepoModel,
458 ) -> Result<TransactionRepoModel, TransactionError> {
459 if let Some(result) = self
461 .check_expiration_and_max_lifetime(
462 tx.clone(),
463 "Transaction stuck in Sent status for too long".to_string(),
464 )
465 .await
466 {
467 return result;
468 }
469
470 let age = get_age_since_sent_or_created(&tx)?;
472 if age > get_stellar_resend_timeout() {
473 info!(tx_id = %tx.id, age_seconds = age.num_seconds(),
474 "re-enqueueing submit job for stuck Sent transaction");
475 send_submit_transaction_job(self.job_producer(), &tx, None).await?;
476 }
477
478 Ok(tx)
479 }
480
481 async fn handle_pending_state(
484 &self,
485 tx: TransactionRepoModel,
486 ) -> Result<TransactionRepoModel, TransactionError> {
487 if let Some(result) = self
489 .check_expiration_and_max_lifetime(
490 tx.clone(),
491 "Transaction stuck in Pending status for too long".to_string(),
492 )
493 .await
494 {
495 return result;
496 }
497
498 let age = self.get_time_since_created_at(&tx)?;
500
501 if age.num_seconds() >= STELLAR_PENDING_RECOVERY_TRIGGER_SECONDS {
504 info!(
505 tx_id = %tx.id,
506 age_seconds = age.num_seconds(),
507 "pending transaction without hash may be stuck, scheduling recovery job"
508 );
509
510 let transaction_request = TransactionRequest::new(tx.id.clone(), tx.relayer_id.clone());
511 if let Err(e) = self
512 .job_producer()
513 .produce_transaction_request_job(transaction_request, None)
514 .await
515 {
516 warn!(
517 tx_id = %tx.id,
518 error = %e,
519 "failed to schedule recovery job for pending transaction"
520 );
521 }
522 } else {
523 debug!(
524 tx_id = %tx.id,
525 age_seconds = age.num_seconds(),
526 "pending transaction without hash too young for recovery check"
527 );
528 }
529
530 Ok(tx)
531 }
532
533 fn get_time_since_created_at(
536 &self,
537 tx: &TransactionRepoModel,
538 ) -> Result<chrono::Duration, TransactionError> {
539 match DateTime::parse_from_rfc3339(&tx.created_at) {
540 Ok(dt) => Ok(Utc::now().signed_duration_since(dt.with_timezone(&Utc))),
541 Err(e) => {
542 warn!(tx_id = %tx.id, ts = %tx.created_at, error = %e, "failed to parse created_at timestamp");
543 Err(TransactionError::UnexpectedError(format!(
544 "Invalid created_at timestamp for transaction {}: {}",
545 tx.id, e
546 )))
547 }
548 }
549 }
550
551 pub async fn handle_stellar_pending(
553 &self,
554 tx: TransactionRepoModel,
555 original_status_str: String,
556 ) -> Result<TransactionRepoModel, TransactionError> {
557 debug!(
558 tx_id = %tx.id,
559 relayer_id = %tx.relayer_id,
560 status = %original_status_str,
561 "stellar transaction status is still pending, will retry check later"
562 );
563
564 if tx.status == TransactionStatus::Submitted {
566 if let Some(result) = self
567 .check_expiration_and_max_lifetime(
568 tx.clone(),
569 "Transaction stuck in Submitted status for too long".to_string(),
570 )
571 .await
572 {
573 return result;
574 }
575 }
576
577 Ok(tx)
578 }
579}
580
581fn extract_fee_bid(envelope: &TransactionEnvelope) -> i64 {
586 match envelope {
587 TransactionEnvelope::TxFeeBump(fb) => fb.tx.fee,
588 TransactionEnvelope::Tx(v1) => v1.tx.fee as i64,
589 TransactionEnvelope::TxV0(v0) => v0.tx.fee as i64,
590 }
591}
592
593fn first_failing_op(ops: &[OperationResult]) -> Option<&'static str> {
598 let op = ops.iter().find(|op| match op {
599 OperationResult::OpInner(tr) => match tr {
600 OperationResultTr::InvokeHostFunction(r) => {
601 !matches!(r, InvokeHostFunctionResult::Success(_))
602 }
603 OperationResultTr::ExtendFootprintTtl(r) => r.name() != "Success",
604 OperationResultTr::RestoreFootprint(r) => r.name() != "Success",
605 _ => false,
606 },
607 _ => true,
608 })?;
609 match op {
610 OperationResult::OpInner(tr) => match tr {
611 OperationResultTr::InvokeHostFunction(r) => Some(r.name()),
612 OperationResultTr::ExtendFootprintTtl(r) => Some(r.name()),
613 OperationResultTr::RestoreFootprint(r) => Some(r.name()),
614 _ => Some(tr.name()),
615 },
616 _ => Some(op.name()),
617 }
618}
619
620#[cfg(test)]
621mod tests {
622 use super::*;
623 use crate::models::{NetworkTransactionData, RepositoryError};
624 use crate::repositories::PaginatedResult;
625 use chrono::Duration;
626 use mockall::predicate::eq;
627 use soroban_rs::stellar_rpc_client::GetTransactionResponse;
628
629 use crate::domain::transaction::stellar::test_helpers::*;
630
631 fn dummy_get_transaction_response(status: &str) -> GetTransactionResponse {
632 GetTransactionResponse {
633 status: status.to_string(),
634 ledger: None,
635 envelope: None,
636 result: None,
637 result_meta: None,
638 events: soroban_rs::stellar_rpc_client::GetTransactionEvents {
639 contract_events: vec![],
640 diagnostic_events: vec![],
641 transaction_events: vec![],
642 },
643 }
644 }
645
646 fn dummy_get_transaction_response_with_result_meta(
647 status: &str,
648 has_return_value: bool,
649 ) -> GetTransactionResponse {
650 use soroban_rs::xdr::{ScVal, SorobanTransactionMeta, TransactionMeta, TransactionMetaV3};
651
652 let result_meta = if has_return_value {
653 let return_value = ScVal::I32(42);
655 Some(TransactionMeta::V3(TransactionMetaV3 {
656 ext: soroban_rs::xdr::ExtensionPoint::V0,
657 tx_changes_before: soroban_rs::xdr::LedgerEntryChanges::default(),
658 operations: soroban_rs::xdr::VecM::default(),
659 tx_changes_after: soroban_rs::xdr::LedgerEntryChanges::default(),
660 soroban_meta: Some(SorobanTransactionMeta {
661 ext: soroban_rs::xdr::SorobanTransactionMetaExt::V0,
662 return_value,
663 events: soroban_rs::xdr::VecM::default(),
664 diagnostic_events: soroban_rs::xdr::VecM::default(),
665 }),
666 }))
667 } else {
668 None
669 };
670
671 GetTransactionResponse {
672 status: status.to_string(),
673 ledger: None,
674 envelope: None,
675 result: None,
676 result_meta,
677 events: soroban_rs::stellar_rpc_client::GetTransactionEvents {
678 contract_events: vec![],
679 diagnostic_events: vec![],
680 transaction_events: vec![],
681 },
682 }
683 }
684
685 mod handle_transaction_status_tests {
686 use crate::services::provider::ProviderError;
687
688 use super::*;
689
690 #[tokio::test]
691 async fn handle_transaction_status_confirmed_triggers_next() {
692 let relayer = create_test_relayer();
693 let mut mocks = default_test_mocks();
694
695 let mut tx_to_handle = create_test_transaction(&relayer.id);
696 tx_to_handle.id = "tx-confirm-this".to_string();
697 tx_to_handle.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
698 let tx_hash_bytes = [1u8; 32];
699 let tx_hash_hex = hex::encode(tx_hash_bytes);
700 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx_to_handle.network_data
701 {
702 stellar_data.hash = Some(tx_hash_hex.clone());
703 } else {
704 panic!("Expected Stellar network data for tx_to_handle");
705 }
706 tx_to_handle.status = TransactionStatus::Submitted;
707
708 let expected_stellar_hash = soroban_rs::xdr::Hash(tx_hash_bytes);
709
710 mocks
712 .provider
713 .expect_get_transaction()
714 .with(eq(expected_stellar_hash.clone()))
715 .times(1)
716 .returning(move |_| {
717 Box::pin(async { Ok(dummy_get_transaction_response("SUCCESS")) })
718 });
719
720 mocks
722 .tx_repo
723 .expect_partial_update()
724 .withf(move |id, update| {
725 id == "tx-confirm-this"
726 && update.status == Some(TransactionStatus::Confirmed)
727 && update.confirmed_at.is_some()
728 })
729 .times(1)
730 .returning(move |id, update| {
731 let mut updated_tx = tx_to_handle.clone(); updated_tx.id = id;
733 updated_tx.status = update.status.unwrap();
734 updated_tx.confirmed_at = update.confirmed_at;
735 Ok(updated_tx)
736 });
737
738 mocks
740 .job_producer
741 .expect_produce_send_notification_job()
742 .times(1)
743 .returning(|_, _| Box::pin(async { Ok(()) }));
744
745 let mut oldest_pending_tx = create_test_transaction(&relayer.id);
747 oldest_pending_tx.id = "tx-oldest-pending".to_string();
748 oldest_pending_tx.status = TransactionStatus::Pending;
749 let captured_oldest_pending_tx = oldest_pending_tx.clone();
750 let relayer_id_clone = relayer.id.clone();
751 mocks
752 .tx_repo
753 .expect_find_by_status_paginated()
754 .withf(move |relayer_id, statuses, query, oldest_first| {
755 *relayer_id == relayer_id_clone
756 && statuses == [TransactionStatus::Pending]
757 && query.page == 1
758 && query.per_page == 1
759 && *oldest_first
760 })
761 .times(1)
762 .returning(move |_, _, _, _| {
763 Ok(PaginatedResult {
764 items: vec![captured_oldest_pending_tx.clone()],
765 total: 1,
766 page: 1,
767 per_page: 1,
768 })
769 });
770
771 mocks
773 .job_producer
774 .expect_produce_transaction_request_job()
775 .withf(move |job, _delay| job.transaction_id == "tx-oldest-pending")
776 .times(1)
777 .returning(|_, _| Box::pin(async { Ok(()) }));
778
779 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
780 let mut initial_tx_for_handling = create_test_transaction(&relayer.id);
781 initial_tx_for_handling.id = "tx-confirm-this".to_string();
782 initial_tx_for_handling.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
783 if let NetworkTransactionData::Stellar(ref mut stellar_data) =
784 initial_tx_for_handling.network_data
785 {
786 stellar_data.hash = Some(hex::encode(tx_hash_bytes));
787 } else {
788 panic!("Expected Stellar network data for initial_tx_for_handling");
789 }
790 initial_tx_for_handling.status = TransactionStatus::Submitted;
791
792 let result = handler
793 .handle_transaction_status_impl(initial_tx_for_handling, None)
794 .await;
795
796 assert!(result.is_ok());
797 let handled_tx = result.unwrap();
798 assert_eq!(handled_tx.id, "tx-confirm-this");
799 assert_eq!(handled_tx.status, TransactionStatus::Confirmed);
800 assert!(handled_tx.confirmed_at.is_some());
801 }
802
803 #[tokio::test]
804 async fn handle_transaction_status_still_pending() {
805 let relayer = create_test_relayer();
806 let mut mocks = default_test_mocks();
807
808 let mut tx_to_handle = create_test_transaction(&relayer.id);
809 tx_to_handle.id = "tx-pending-check".to_string();
810 tx_to_handle.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
811 let tx_hash_bytes = [2u8; 32];
812 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx_to_handle.network_data
813 {
814 stellar_data.hash = Some(hex::encode(tx_hash_bytes));
815 } else {
816 panic!("Expected Stellar network data");
817 }
818 tx_to_handle.status = TransactionStatus::Submitted; let expected_stellar_hash = soroban_rs::xdr::Hash(tx_hash_bytes);
821
822 mocks
824 .provider
825 .expect_get_transaction()
826 .with(eq(expected_stellar_hash.clone()))
827 .times(1)
828 .returning(move |_| {
829 Box::pin(async { Ok(dummy_get_transaction_response("PENDING")) })
830 });
831
832 mocks.tx_repo.expect_partial_update().never();
834
835 mocks
837 .job_producer
838 .expect_produce_send_notification_job()
839 .never();
840
841 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
842 let original_tx_clone = tx_to_handle.clone();
843
844 let result = handler
845 .handle_transaction_status_impl(tx_to_handle, None)
846 .await;
847
848 assert!(result.is_ok());
849 let returned_tx = result.unwrap();
850 assert_eq!(returned_tx.id, original_tx_clone.id);
852 assert_eq!(returned_tx.status, original_tx_clone.status);
853 assert!(returned_tx.confirmed_at.is_none()); }
855
856 #[tokio::test]
857 async fn handle_transaction_status_failed() {
858 let relayer = create_test_relayer();
859 let mut mocks = default_test_mocks();
860
861 let mut tx_to_handle = create_test_transaction(&relayer.id);
862 tx_to_handle.id = "tx-fail-this".to_string();
863 tx_to_handle.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
864 let tx_hash_bytes = [3u8; 32];
865 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx_to_handle.network_data
866 {
867 stellar_data.hash = Some(hex::encode(tx_hash_bytes));
868 } else {
869 panic!("Expected Stellar network data");
870 }
871 tx_to_handle.status = TransactionStatus::Submitted;
872
873 let expected_stellar_hash = soroban_rs::xdr::Hash(tx_hash_bytes);
874
875 mocks
877 .provider
878 .expect_get_transaction()
879 .with(eq(expected_stellar_hash.clone()))
880 .times(1)
881 .returning(move |_| {
882 Box::pin(async { Ok(dummy_get_transaction_response("FAILED")) })
883 });
884
885 let relayer_id_for_mock = relayer.id.clone();
887 mocks
888 .tx_repo
889 .expect_partial_update()
890 .times(1)
891 .returning(move |id, update| {
892 let mut updated_tx = create_test_transaction(&relayer_id_for_mock);
894 updated_tx.id = id;
895 updated_tx.status = update.status.unwrap();
896 updated_tx.status_reason = update.status_reason.clone();
897 Ok::<_, RepositoryError>(updated_tx)
898 });
899
900 mocks
902 .job_producer
903 .expect_produce_send_notification_job()
904 .times(1)
905 .returning(|_, _| Box::pin(async { Ok(()) }));
906
907 let relayer_id_clone = relayer.id.clone();
909 mocks
910 .tx_repo
911 .expect_find_by_status_paginated()
912 .withf(move |relayer_id, statuses, query, oldest_first| {
913 *relayer_id == relayer_id_clone
914 && statuses == [TransactionStatus::Pending]
915 && query.page == 1
916 && query.per_page == 1
917 && *oldest_first
918 })
919 .times(1)
920 .returning(move |_, _, _, _| {
921 Ok(PaginatedResult {
922 items: vec![],
923 total: 0,
924 page: 1,
925 per_page: 1,
926 })
927 }); mocks
931 .job_producer
932 .expect_produce_transaction_request_job()
933 .never();
934 mocks
936 .job_producer
937 .expect_produce_check_transaction_status_job()
938 .never();
939
940 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
941 let mut initial_tx_for_handling = create_test_transaction(&relayer.id);
942 initial_tx_for_handling.id = "tx-fail-this".to_string();
943 initial_tx_for_handling.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
944 if let NetworkTransactionData::Stellar(ref mut stellar_data) =
945 initial_tx_for_handling.network_data
946 {
947 stellar_data.hash = Some(hex::encode(tx_hash_bytes));
948 } else {
949 panic!("Expected Stellar network data");
950 }
951 initial_tx_for_handling.status = TransactionStatus::Submitted;
952
953 let result = handler
954 .handle_transaction_status_impl(initial_tx_for_handling, None)
955 .await;
956
957 assert!(result.is_ok());
958 let handled_tx = result.unwrap();
959 assert_eq!(handled_tx.id, "tx-fail-this");
960 assert_eq!(handled_tx.status, TransactionStatus::Failed);
961 assert!(handled_tx.status_reason.is_some());
962 assert_eq!(
963 handled_tx.status_reason.unwrap(),
964 "Transaction failed on-chain. Provider status: FAILED. Specific XDR reason: unknown."
965 );
966 }
967
968 #[tokio::test]
969 async fn handle_transaction_status_provider_error() {
970 let relayer = create_test_relayer();
971 let mut mocks = default_test_mocks();
972
973 let mut tx_to_handle = create_test_transaction(&relayer.id);
974 tx_to_handle.id = "tx-provider-error".to_string();
975 tx_to_handle.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
976 let tx_hash_bytes = [4u8; 32];
977 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx_to_handle.network_data
978 {
979 stellar_data.hash = Some(hex::encode(tx_hash_bytes));
980 } else {
981 panic!("Expected Stellar network data");
982 }
983 tx_to_handle.status = TransactionStatus::Submitted;
984
985 let expected_stellar_hash = soroban_rs::xdr::Hash(tx_hash_bytes);
986
987 mocks
989 .provider
990 .expect_get_transaction()
991 .with(eq(expected_stellar_hash.clone()))
992 .times(1)
993 .returning(move |_| {
994 Box::pin(async { Err(ProviderError::Other("RPC boom".to_string())) })
995 });
996
997 mocks.tx_repo.expect_partial_update().never();
999
1000 mocks
1002 .job_producer
1003 .expect_produce_send_notification_job()
1004 .never();
1005 mocks
1007 .job_producer
1008 .expect_produce_transaction_request_job()
1009 .never();
1010
1011 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1012
1013 let result = handler
1014 .handle_transaction_status_impl(tx_to_handle, None)
1015 .await;
1016
1017 assert!(result.is_err());
1019 matches!(result.unwrap_err(), TransactionError::UnderlyingProvider(_));
1020 }
1021
1022 #[tokio::test]
1023 async fn handle_transaction_status_no_hashes() {
1024 let relayer = create_test_relayer();
1025 let mut mocks = default_test_mocks();
1026
1027 let mut tx_to_handle = create_test_transaction(&relayer.id);
1028 tx_to_handle.id = "tx-no-hashes".to_string();
1029 tx_to_handle.status = TransactionStatus::Submitted;
1030 tx_to_handle.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
1031
1032 mocks.provider.expect_get_transaction().never();
1034
1035 mocks
1037 .tx_repo
1038 .expect_partial_update()
1039 .times(1)
1040 .returning(|_, update| {
1041 let mut updated_tx = create_test_transaction("test-relayer");
1042 updated_tx.status = update.status.unwrap_or(updated_tx.status);
1043 updated_tx.status_reason = update.status_reason.clone();
1044 Ok(updated_tx)
1045 });
1046
1047 mocks
1049 .job_producer
1050 .expect_produce_send_notification_job()
1051 .times(1)
1052 .returning(|_, _| Box::pin(async { Ok(()) }));
1053
1054 let relayer_id_clone = relayer.id.clone();
1056 mocks
1057 .tx_repo
1058 .expect_find_by_status_paginated()
1059 .withf(move |relayer_id, statuses, query, oldest_first| {
1060 *relayer_id == relayer_id_clone
1061 && statuses == [TransactionStatus::Pending]
1062 && query.page == 1
1063 && query.per_page == 1
1064 && *oldest_first
1065 })
1066 .times(1)
1067 .returning(move |_, _, _, _| {
1068 Ok(PaginatedResult {
1069 items: vec![],
1070 total: 0,
1071 page: 1,
1072 per_page: 1,
1073 })
1074 }); let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1077 let result = handler
1078 .handle_transaction_status_impl(tx_to_handle, None)
1079 .await;
1080
1081 assert!(result.is_ok(), "Expected Ok result");
1083 let updated_tx = result.unwrap();
1084 assert_eq!(updated_tx.status, TransactionStatus::Failed);
1085 assert!(
1086 updated_tx
1087 .status_reason
1088 .as_ref()
1089 .unwrap()
1090 .contains("Failed to parse and validate hash"),
1091 "Expected hash validation error in status_reason, got: {:?}",
1092 updated_tx.status_reason
1093 );
1094 }
1095
1096 #[tokio::test]
1097 async fn test_on_chain_failure_does_not_decrement_sequence() {
1098 let relayer = create_test_relayer();
1099 let mut mocks = default_test_mocks();
1100
1101 let mut tx_to_handle = create_test_transaction(&relayer.id);
1102 tx_to_handle.id = "tx-on-chain-fail".to_string();
1103 tx_to_handle.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
1104 let tx_hash_bytes = [4u8; 32];
1105 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx_to_handle.network_data
1106 {
1107 stellar_data.hash = Some(hex::encode(tx_hash_bytes));
1108 stellar_data.sequence_number = Some(100); }
1110 tx_to_handle.status = TransactionStatus::Submitted;
1111
1112 let expected_stellar_hash = soroban_rs::xdr::Hash(tx_hash_bytes);
1113
1114 mocks
1116 .provider
1117 .expect_get_transaction()
1118 .with(eq(expected_stellar_hash.clone()))
1119 .times(1)
1120 .returning(move |_| {
1121 Box::pin(async { Ok(dummy_get_transaction_response("FAILED")) })
1122 });
1123
1124 mocks.counter.expect_decrement().never();
1126
1127 mocks
1129 .tx_repo
1130 .expect_partial_update()
1131 .times(1)
1132 .returning(move |id, update| {
1133 let mut updated_tx = create_test_transaction("test");
1134 updated_tx.id = id;
1135 updated_tx.status = update.status.unwrap();
1136 updated_tx.status_reason = update.status_reason.clone();
1137 Ok::<_, RepositoryError>(updated_tx)
1138 });
1139
1140 mocks
1142 .job_producer
1143 .expect_produce_send_notification_job()
1144 .times(1)
1145 .returning(|_, _| Box::pin(async { Ok(()) }));
1146
1147 mocks
1149 .tx_repo
1150 .expect_find_by_status_paginated()
1151 .returning(move |_, _, _, _| {
1152 Ok(PaginatedResult {
1153 items: vec![],
1154 total: 0,
1155 page: 1,
1156 per_page: 1,
1157 })
1158 });
1159
1160 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1161 let initial_tx = tx_to_handle.clone();
1162
1163 let result = handler
1164 .handle_transaction_status_impl(initial_tx, None)
1165 .await;
1166
1167 assert!(result.is_ok());
1168 let handled_tx = result.unwrap();
1169 assert_eq!(handled_tx.id, "tx-on-chain-fail");
1170 assert_eq!(handled_tx.status, TransactionStatus::Failed);
1171 }
1172
1173 #[tokio::test]
1174 async fn test_on_chain_success_does_not_decrement_sequence() {
1175 let relayer = create_test_relayer();
1176 let mut mocks = default_test_mocks();
1177
1178 let mut tx_to_handle = create_test_transaction(&relayer.id);
1179 tx_to_handle.id = "tx-on-chain-success".to_string();
1180 tx_to_handle.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
1181 let tx_hash_bytes = [5u8; 32];
1182 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx_to_handle.network_data
1183 {
1184 stellar_data.hash = Some(hex::encode(tx_hash_bytes));
1185 stellar_data.sequence_number = Some(101); }
1187 tx_to_handle.status = TransactionStatus::Submitted;
1188
1189 let expected_stellar_hash = soroban_rs::xdr::Hash(tx_hash_bytes);
1190
1191 mocks
1193 .provider
1194 .expect_get_transaction()
1195 .with(eq(expected_stellar_hash.clone()))
1196 .times(1)
1197 .returning(move |_| {
1198 Box::pin(async { Ok(dummy_get_transaction_response("SUCCESS")) })
1199 });
1200
1201 mocks.counter.expect_decrement().never();
1203
1204 mocks
1206 .tx_repo
1207 .expect_partial_update()
1208 .withf(move |id, update| {
1209 id == "tx-on-chain-success"
1210 && update.status == Some(TransactionStatus::Confirmed)
1211 && update.confirmed_at.is_some()
1212 })
1213 .times(1)
1214 .returning(move |id, update| {
1215 let mut updated_tx = create_test_transaction("test");
1216 updated_tx.id = id;
1217 updated_tx.status = update.status.unwrap();
1218 updated_tx.confirmed_at = update.confirmed_at;
1219 Ok(updated_tx)
1220 });
1221
1222 mocks
1224 .job_producer
1225 .expect_produce_send_notification_job()
1226 .times(1)
1227 .returning(|_, _| Box::pin(async { Ok(()) }));
1228
1229 mocks
1231 .tx_repo
1232 .expect_find_by_status_paginated()
1233 .returning(move |_, _, _, _| {
1234 Ok(PaginatedResult {
1235 items: vec![],
1236 total: 0,
1237 page: 1,
1238 per_page: 1,
1239 })
1240 });
1241
1242 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1243 let initial_tx = tx_to_handle.clone();
1244
1245 let result = handler
1246 .handle_transaction_status_impl(initial_tx, None)
1247 .await;
1248
1249 assert!(result.is_ok());
1250 let handled_tx = result.unwrap();
1251 assert_eq!(handled_tx.id, "tx-on-chain-success");
1252 assert_eq!(handled_tx.status, TransactionStatus::Confirmed);
1253 }
1254
1255 #[tokio::test]
1256 async fn test_handle_transaction_status_with_xdr_error_requeues() {
1257 let relayer = create_test_relayer();
1259 let mut mocks = default_test_mocks();
1260
1261 let mut tx_to_handle = create_test_transaction(&relayer.id);
1262 tx_to_handle.id = "tx-xdr-error-requeue".to_string();
1263 tx_to_handle.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
1264 let tx_hash_bytes = [8u8; 32];
1265 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx_to_handle.network_data
1266 {
1267 stellar_data.hash = Some(hex::encode(tx_hash_bytes));
1268 }
1269 tx_to_handle.status = TransactionStatus::Submitted;
1270
1271 let expected_stellar_hash = soroban_rs::xdr::Hash(tx_hash_bytes);
1272
1273 mocks
1275 .provider
1276 .expect_get_transaction()
1277 .with(eq(expected_stellar_hash.clone()))
1278 .times(1)
1279 .returning(move |_| {
1280 Box::pin(async { Err(ProviderError::Other("Network timeout".to_string())) })
1281 });
1282
1283 mocks.tx_repo.expect_partial_update().never();
1285 mocks
1286 .job_producer
1287 .expect_produce_send_notification_job()
1288 .never();
1289
1290 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1291
1292 let result = handler
1293 .handle_transaction_status_impl(tx_to_handle, None)
1294 .await;
1295
1296 assert!(result.is_err());
1298 matches!(result.unwrap_err(), TransactionError::UnderlyingProvider(_));
1299 }
1300
1301 #[tokio::test]
1302 async fn handle_transaction_status_extracts_transaction_result_xdr() {
1303 let relayer = create_test_relayer();
1304 let mut mocks = default_test_mocks();
1305
1306 let mut tx_to_handle = create_test_transaction(&relayer.id);
1307 tx_to_handle.id = "tx-with-result".to_string();
1308 tx_to_handle.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
1309 let tx_hash_bytes = [9u8; 32];
1310 let tx_hash_hex = hex::encode(tx_hash_bytes);
1311 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx_to_handle.network_data
1312 {
1313 stellar_data.hash = Some(tx_hash_hex.clone());
1314 } else {
1315 panic!("Expected Stellar network data");
1316 }
1317 tx_to_handle.status = TransactionStatus::Submitted;
1318
1319 let expected_stellar_hash = soroban_rs::xdr::Hash(tx_hash_bytes);
1320
1321 mocks
1323 .provider
1324 .expect_get_transaction()
1325 .with(eq(expected_stellar_hash.clone()))
1326 .times(1)
1327 .returning(move |_| {
1328 Box::pin(async {
1329 Ok(dummy_get_transaction_response_with_result_meta(
1330 "SUCCESS", true,
1331 ))
1332 })
1333 });
1334
1335 let tx_to_handle_clone = tx_to_handle.clone();
1337 mocks
1338 .tx_repo
1339 .expect_partial_update()
1340 .withf(move |id, update| {
1341 id == "tx-with-result"
1342 && update.status == Some(TransactionStatus::Confirmed)
1343 && update.confirmed_at.is_some()
1344 && update.network_data.as_ref().is_some_and(|and| {
1345 if let NetworkTransactionData::Stellar(stellar_data) = and {
1346 stellar_data.transaction_result_xdr.is_some()
1348 } else {
1349 false
1350 }
1351 })
1352 })
1353 .times(1)
1354 .returning(move |id, update| {
1355 let mut updated_tx = tx_to_handle_clone.clone();
1356 updated_tx.id = id;
1357 updated_tx.status = update.status.unwrap();
1358 updated_tx.confirmed_at = update.confirmed_at;
1359 if let Some(network_data) = update.network_data {
1360 updated_tx.network_data = network_data;
1361 }
1362 Ok(updated_tx)
1363 });
1364
1365 mocks
1367 .job_producer
1368 .expect_produce_send_notification_job()
1369 .times(1)
1370 .returning(|_, _| Box::pin(async { Ok(()) }));
1371
1372 mocks
1374 .tx_repo
1375 .expect_find_by_status_paginated()
1376 .returning(move |_, _, _, _| {
1377 Ok(PaginatedResult {
1378 items: vec![],
1379 total: 0,
1380 page: 1,
1381 per_page: 1,
1382 })
1383 });
1384
1385 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1386 let result = handler
1387 .handle_transaction_status_impl(tx_to_handle, None)
1388 .await;
1389
1390 assert!(result.is_ok());
1391 let handled_tx = result.unwrap();
1392 assert_eq!(handled_tx.id, "tx-with-result");
1393 assert_eq!(handled_tx.status, TransactionStatus::Confirmed);
1394
1395 if let NetworkTransactionData::Stellar(stellar_data) = handled_tx.network_data {
1397 assert!(
1398 stellar_data.transaction_result_xdr.is_some(),
1399 "transaction_result_xdr should be stored when result_meta contains return_value"
1400 );
1401 } else {
1402 panic!("Expected Stellar network data");
1403 }
1404 }
1405
1406 #[tokio::test]
1407 async fn handle_transaction_status_no_result_meta_does_not_store_xdr() {
1408 let relayer = create_test_relayer();
1409 let mut mocks = default_test_mocks();
1410
1411 let mut tx_to_handle = create_test_transaction(&relayer.id);
1412 tx_to_handle.id = "tx-no-result-meta".to_string();
1413 tx_to_handle.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
1414 let tx_hash_bytes = [10u8; 32];
1415 let tx_hash_hex = hex::encode(tx_hash_bytes);
1416 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx_to_handle.network_data
1417 {
1418 stellar_data.hash = Some(tx_hash_hex.clone());
1419 } else {
1420 panic!("Expected Stellar network data");
1421 }
1422 tx_to_handle.status = TransactionStatus::Submitted;
1423
1424 let expected_stellar_hash = soroban_rs::xdr::Hash(tx_hash_bytes);
1425
1426 mocks
1428 .provider
1429 .expect_get_transaction()
1430 .with(eq(expected_stellar_hash.clone()))
1431 .times(1)
1432 .returning(move |_| {
1433 Box::pin(async {
1434 Ok(dummy_get_transaction_response_with_result_meta(
1435 "SUCCESS", false,
1436 ))
1437 })
1438 });
1439
1440 let tx_to_handle_clone = tx_to_handle.clone();
1442 mocks
1443 .tx_repo
1444 .expect_partial_update()
1445 .times(1)
1446 .returning(move |id, update| {
1447 let mut updated_tx = tx_to_handle_clone.clone();
1448 updated_tx.id = id;
1449 updated_tx.status = update.status.unwrap();
1450 updated_tx.confirmed_at = update.confirmed_at;
1451 if let Some(network_data) = update.network_data {
1452 updated_tx.network_data = network_data;
1453 }
1454 Ok(updated_tx)
1455 });
1456
1457 mocks
1459 .job_producer
1460 .expect_produce_send_notification_job()
1461 .times(1)
1462 .returning(|_, _| Box::pin(async { Ok(()) }));
1463
1464 mocks
1466 .tx_repo
1467 .expect_find_by_status_paginated()
1468 .returning(move |_, _, _, _| {
1469 Ok(PaginatedResult {
1470 items: vec![],
1471 total: 0,
1472 page: 1,
1473 per_page: 1,
1474 })
1475 });
1476
1477 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1478 let result = handler
1479 .handle_transaction_status_impl(tx_to_handle, None)
1480 .await;
1481
1482 assert!(result.is_ok());
1483 let handled_tx = result.unwrap();
1484
1485 if let NetworkTransactionData::Stellar(stellar_data) = handled_tx.network_data {
1487 assert!(
1488 stellar_data.transaction_result_xdr.is_none(),
1489 "transaction_result_xdr should be None when result_meta is missing"
1490 );
1491 } else {
1492 panic!("Expected Stellar network data");
1493 }
1494 }
1495
1496 #[tokio::test]
1497 async fn test_sent_transaction_not_stuck_yet_returns_ok() {
1498 let relayer = create_test_relayer();
1500 let mut mocks = default_test_mocks();
1501
1502 let mut tx = create_test_transaction(&relayer.id);
1503 tx.id = "tx-sent-not-stuck".to_string();
1504 tx.status = TransactionStatus::Sent;
1505 tx.created_at = Utc::now().to_rfc3339();
1507 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx.network_data {
1509 stellar_data.hash = None;
1510 }
1511
1512 mocks.provider.expect_get_transaction().never();
1514 mocks.tx_repo.expect_partial_update().never();
1515 mocks
1516 .job_producer
1517 .expect_produce_submit_transaction_job()
1518 .never();
1519
1520 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1521 let result = handler
1522 .handle_transaction_status_impl(tx.clone(), None)
1523 .await;
1524
1525 assert!(result.is_ok());
1526 let returned_tx = result.unwrap();
1527 assert_eq!(returned_tx.id, tx.id);
1529 assert_eq!(returned_tx.status, TransactionStatus::Sent);
1530 }
1531
1532 #[tokio::test]
1533 async fn test_stuck_sent_transaction_reenqueues_submit_job() {
1534 let relayer = create_test_relayer();
1537 let mut mocks = default_test_mocks();
1538
1539 let mut tx = create_test_transaction(&relayer.id);
1540 tx.id = "tx-stuck-with-xdr".to_string();
1541 tx.status = TransactionStatus::Sent;
1542 tx.created_at = (Utc::now() - Duration::minutes(10)).to_rfc3339();
1544 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx.network_data {
1546 stellar_data.hash = None;
1547 stellar_data.signed_envelope_xdr = Some("AAAA...signed...".to_string());
1548 }
1549
1550 mocks
1552 .job_producer
1553 .expect_produce_submit_transaction_job()
1554 .times(1)
1555 .returning(|_, _| Box::pin(async { Ok(()) }));
1556
1557 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1558 let result = handler
1559 .handle_transaction_status_impl(tx.clone(), None)
1560 .await;
1561
1562 assert!(result.is_ok());
1563 let returned_tx = result.unwrap();
1564 assert_eq!(returned_tx.status, TransactionStatus::Sent);
1566 }
1567
1568 #[tokio::test]
1569 async fn test_stuck_sent_transaction_expired_marks_expired() {
1570 let relayer = create_test_relayer();
1572 let mut mocks = default_test_mocks();
1573
1574 let mut tx = create_test_transaction(&relayer.id);
1575 tx.id = "tx-expired".to_string();
1576 tx.status = TransactionStatus::Sent;
1577 tx.created_at = (Utc::now() - Duration::minutes(10)).to_rfc3339();
1579 tx.valid_until = Some((Utc::now() - Duration::minutes(5)).to_rfc3339());
1581 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx.network_data {
1582 stellar_data.hash = None;
1583 stellar_data.signed_envelope_xdr = Some("AAAA...signed...".to_string());
1584 }
1585
1586 mocks
1588 .tx_repo
1589 .expect_partial_update()
1590 .withf(|_id, update| update.status == Some(TransactionStatus::Expired))
1591 .times(1)
1592 .returning(|id, update| {
1593 let mut updated = create_test_transaction("test");
1594 updated.id = id;
1595 updated.status = update.status.unwrap();
1596 updated.status_reason = update.status_reason.clone();
1597 Ok(updated)
1598 });
1599
1600 mocks
1602 .job_producer
1603 .expect_produce_submit_transaction_job()
1604 .never();
1605
1606 mocks
1608 .job_producer
1609 .expect_produce_send_notification_job()
1610 .times(1)
1611 .returning(|_, _| Box::pin(async { Ok(()) }));
1612
1613 mocks
1615 .tx_repo
1616 .expect_find_by_status_paginated()
1617 .returning(move |_, _, _, _| {
1618 Ok(PaginatedResult {
1619 items: vec![],
1620 total: 0,
1621 page: 1,
1622 per_page: 1,
1623 })
1624 });
1625
1626 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1627 let result = handler.handle_transaction_status_impl(tx, None).await;
1628
1629 assert!(result.is_ok());
1630 let expired_tx = result.unwrap();
1631 assert_eq!(expired_tx.status, TransactionStatus::Expired);
1632 assert!(expired_tx
1633 .status_reason
1634 .as_ref()
1635 .unwrap()
1636 .contains("expired"));
1637 }
1638
1639 #[tokio::test]
1640 async fn test_stuck_sent_transaction_max_lifetime_marks_failed() {
1641 let relayer = create_test_relayer();
1643 let mut mocks = default_test_mocks();
1644
1645 let mut tx = create_test_transaction(&relayer.id);
1646 tx.id = "tx-max-lifetime".to_string();
1647 tx.status = TransactionStatus::Sent;
1648 tx.created_at = (Utc::now() - Duration::minutes(35)).to_rfc3339();
1650 tx.valid_until = None;
1652 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx.network_data {
1653 stellar_data.hash = None;
1654 stellar_data.signed_envelope_xdr = Some("AAAA...signed...".to_string());
1655 }
1656
1657 mocks
1659 .tx_repo
1660 .expect_partial_update()
1661 .withf(|_id, update| update.status == Some(TransactionStatus::Failed))
1662 .times(1)
1663 .returning(|id, update| {
1664 let mut updated = create_test_transaction("test");
1665 updated.id = id;
1666 updated.status = update.status.unwrap();
1667 updated.status_reason = update.status_reason.clone();
1668 Ok(updated)
1669 });
1670
1671 mocks
1673 .job_producer
1674 .expect_produce_submit_transaction_job()
1675 .never();
1676
1677 mocks
1679 .job_producer
1680 .expect_produce_send_notification_job()
1681 .times(1)
1682 .returning(|_, _| Box::pin(async { Ok(()) }));
1683
1684 mocks
1686 .tx_repo
1687 .expect_find_by_status_paginated()
1688 .returning(|_, _, _, _| {
1689 Ok(PaginatedResult {
1690 items: vec![],
1691 total: 0,
1692 page: 1,
1693 per_page: 1,
1694 })
1695 });
1696
1697 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1698 let result = handler.handle_transaction_status_impl(tx, None).await;
1699
1700 assert!(result.is_ok());
1701 let failed_tx = result.unwrap();
1702 assert_eq!(failed_tx.status, TransactionStatus::Failed);
1703 assert!(failed_tx
1705 .status_reason
1706 .as_ref()
1707 .unwrap()
1708 .contains("stuck in Sent status for too long"));
1709 }
1710 }
1711
1712 mod handle_pending_state_tests {
1713 use super::*;
1714 use crate::constants::get_stellar_max_stuck_transaction_lifetime;
1715 use crate::constants::STELLAR_PENDING_RECOVERY_TRIGGER_SECONDS;
1716
1717 #[tokio::test]
1718 async fn test_pending_exceeds_max_lifetime_marks_failed() {
1719 let relayer = create_test_relayer();
1720 let mut mocks = default_test_mocks();
1721
1722 let mut tx = create_test_transaction(&relayer.id);
1723 tx.id = "tx-pending-old".to_string();
1724 tx.status = TransactionStatus::Pending;
1725 tx.created_at =
1727 (Utc::now() - get_stellar_max_stuck_transaction_lifetime() - Duration::minutes(1))
1728 .to_rfc3339();
1729
1730 mocks
1732 .tx_repo
1733 .expect_partial_update()
1734 .withf(|_id, update| update.status == Some(TransactionStatus::Failed))
1735 .times(1)
1736 .returning(|id, update| {
1737 let mut updated = create_test_transaction("test");
1738 updated.id = id;
1739 updated.status = update.status.unwrap();
1740 updated.status_reason = update.status_reason.clone();
1741 Ok(updated)
1742 });
1743
1744 mocks
1746 .job_producer
1747 .expect_produce_send_notification_job()
1748 .times(1)
1749 .returning(|_, _| Box::pin(async { Ok(()) }));
1750
1751 mocks
1753 .tx_repo
1754 .expect_find_by_status_paginated()
1755 .returning(move |_, _, _, _| {
1756 Ok(PaginatedResult {
1757 items: vec![],
1758 total: 0,
1759 page: 1,
1760 per_page: 1,
1761 })
1762 });
1763
1764 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1765 let result = handler.handle_transaction_status_impl(tx, None).await;
1766
1767 assert!(result.is_ok());
1768 let failed_tx = result.unwrap();
1769 assert_eq!(failed_tx.status, TransactionStatus::Failed);
1770 assert!(failed_tx
1771 .status_reason
1772 .as_ref()
1773 .unwrap()
1774 .contains("stuck in Pending status for too long"));
1775 }
1776
1777 #[tokio::test]
1778 async fn test_pending_triggers_recovery_job_when_old_enough() {
1779 let relayer = create_test_relayer();
1780 let mut mocks = default_test_mocks();
1781
1782 let mut tx = create_test_transaction(&relayer.id);
1783 tx.id = "tx-pending-recovery".to_string();
1784 tx.status = TransactionStatus::Pending;
1785 tx.created_at = (Utc::now()
1787 - Duration::seconds(STELLAR_PENDING_RECOVERY_TRIGGER_SECONDS + 5))
1788 .to_rfc3339();
1789
1790 mocks
1792 .job_producer
1793 .expect_produce_transaction_request_job()
1794 .times(1)
1795 .returning(|_, _| Box::pin(async { Ok(()) }));
1796
1797 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1798 let result = handler.handle_transaction_status_impl(tx, None).await;
1799
1800 assert!(result.is_ok());
1801 let tx_result = result.unwrap();
1802 assert_eq!(tx_result.status, TransactionStatus::Pending);
1803 }
1804
1805 #[tokio::test]
1806 async fn test_pending_too_young_does_not_schedule_recovery() {
1807 let relayer = create_test_relayer();
1808 let mut mocks = default_test_mocks();
1809
1810 let mut tx = create_test_transaction(&relayer.id);
1811 tx.id = "tx-pending-young".to_string();
1812 tx.status = TransactionStatus::Pending;
1813 tx.created_at = (Utc::now()
1815 - Duration::seconds(STELLAR_PENDING_RECOVERY_TRIGGER_SECONDS - 5))
1816 .to_rfc3339();
1817
1818 mocks
1820 .job_producer
1821 .expect_produce_transaction_request_job()
1822 .never();
1823
1824 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1825 let result = handler.handle_transaction_status_impl(tx, None).await;
1826
1827 assert!(result.is_ok());
1828 let tx_result = result.unwrap();
1829 assert_eq!(tx_result.status, TransactionStatus::Pending);
1830 }
1831
1832 #[tokio::test]
1833 async fn test_sent_without_hash_handles_stuck_recovery() {
1834 use crate::constants::get_stellar_resend_timeout;
1835
1836 let relayer = create_test_relayer();
1837 let mut mocks = default_test_mocks();
1838
1839 let mut tx = create_test_transaction(&relayer.id);
1840 tx.id = "tx-sent-no-hash".to_string();
1841 tx.status = TransactionStatus::Sent;
1842 tx.created_at =
1844 (Utc::now() - get_stellar_resend_timeout() - Duration::seconds(1)).to_rfc3339();
1845 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx.network_data {
1846 stellar_data.hash = None; }
1848
1849 mocks
1851 .job_producer
1852 .expect_produce_submit_transaction_job()
1853 .times(1)
1854 .returning(|_, _| Box::pin(async { Ok(()) }));
1855
1856 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1857 let result = handler.handle_transaction_status_impl(tx, None).await;
1858
1859 assert!(result.is_ok());
1860 let tx_result = result.unwrap();
1861 assert_eq!(tx_result.status, TransactionStatus::Sent);
1862 }
1863
1864 #[tokio::test]
1865 async fn test_submitted_without_hash_marks_failed() {
1866 let relayer = create_test_relayer();
1867 let mut mocks = default_test_mocks();
1868
1869 let mut tx = create_test_transaction(&relayer.id);
1870 tx.id = "tx-submitted-no-hash".to_string();
1871 tx.status = TransactionStatus::Submitted;
1872 tx.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
1873 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx.network_data {
1874 stellar_data.hash = None; }
1876
1877 mocks
1879 .tx_repo
1880 .expect_partial_update()
1881 .withf(|_id, update| update.status == Some(TransactionStatus::Failed))
1882 .times(1)
1883 .returning(|id, update| {
1884 let mut updated = create_test_transaction("test");
1885 updated.id = id;
1886 updated.status = update.status.unwrap();
1887 updated.status_reason = update.status_reason.clone();
1888 Ok(updated)
1889 });
1890
1891 mocks
1893 .job_producer
1894 .expect_produce_send_notification_job()
1895 .times(1)
1896 .returning(|_, _| Box::pin(async { Ok(()) }));
1897
1898 mocks
1900 .tx_repo
1901 .expect_find_by_status_paginated()
1902 .returning(move |_, _, _, _| {
1903 Ok(PaginatedResult {
1904 items: vec![],
1905 total: 0,
1906 page: 1,
1907 per_page: 1,
1908 })
1909 });
1910
1911 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1912 let result = handler.handle_transaction_status_impl(tx, None).await;
1913
1914 assert!(result.is_ok());
1915 let failed_tx = result.unwrap();
1916 assert_eq!(failed_tx.status, TransactionStatus::Failed);
1917 assert!(failed_tx
1918 .status_reason
1919 .as_ref()
1920 .unwrap()
1921 .contains("Failed to parse and validate hash"));
1922 }
1923
1924 #[tokio::test]
1925 async fn test_submitted_exceeds_max_lifetime_marks_failed() {
1926 let relayer = create_test_relayer();
1927 let mut mocks = default_test_mocks();
1928
1929 let mut tx = create_test_transaction(&relayer.id);
1930 tx.id = "tx-submitted-old".to_string();
1931 tx.status = TransactionStatus::Submitted;
1932 tx.created_at =
1934 (Utc::now() - get_stellar_max_stuck_transaction_lifetime() - Duration::minutes(1))
1935 .to_rfc3339();
1936 let tx_hash_bytes = [6u8; 32];
1938 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx.network_data {
1939 stellar_data.hash = Some(hex::encode(tx_hash_bytes));
1940 }
1941
1942 let expected_stellar_hash = soroban_rs::xdr::Hash(tx_hash_bytes);
1943
1944 mocks
1946 .provider
1947 .expect_get_transaction()
1948 .with(eq(expected_stellar_hash.clone()))
1949 .times(1)
1950 .returning(move |_| {
1951 Box::pin(async { Ok(dummy_get_transaction_response("PENDING")) })
1952 });
1953
1954 mocks
1956 .tx_repo
1957 .expect_partial_update()
1958 .withf(|_id, update| update.status == Some(TransactionStatus::Failed))
1959 .times(1)
1960 .returning(|id, update| {
1961 let mut updated = create_test_transaction("test");
1962 updated.id = id;
1963 updated.status = update.status.unwrap();
1964 updated.status_reason = update.status_reason.clone();
1965 Ok(updated)
1966 });
1967
1968 mocks
1970 .job_producer
1971 .expect_produce_send_notification_job()
1972 .times(1)
1973 .returning(|_, _| Box::pin(async { Ok(()) }));
1974
1975 mocks
1977 .tx_repo
1978 .expect_find_by_status_paginated()
1979 .returning(move |_, _, _, _| {
1980 Ok(PaginatedResult {
1981 items: vec![],
1982 total: 0,
1983 page: 1,
1984 per_page: 1,
1985 })
1986 });
1987
1988 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1989 let result = handler.handle_transaction_status_impl(tx, None).await;
1990
1991 assert!(result.is_ok());
1992 let failed_tx = result.unwrap();
1993 assert_eq!(failed_tx.status, TransactionStatus::Failed);
1994 assert!(failed_tx
1995 .status_reason
1996 .as_ref()
1997 .unwrap()
1998 .contains("stuck in Submitted status for too long"));
1999 }
2000
2001 #[tokio::test]
2002 async fn test_submitted_expired_marks_expired() {
2003 let relayer = create_test_relayer();
2004 let mut mocks = default_test_mocks();
2005
2006 let mut tx = create_test_transaction(&relayer.id);
2007 tx.id = "tx-submitted-expired".to_string();
2008 tx.status = TransactionStatus::Submitted;
2009 tx.created_at = (Utc::now() - Duration::minutes(10)).to_rfc3339();
2010 tx.valid_until = Some((Utc::now() - Duration::minutes(5)).to_rfc3339());
2012 let tx_hash_bytes = [7u8; 32];
2014 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx.network_data {
2015 stellar_data.hash = Some(hex::encode(tx_hash_bytes));
2016 }
2017
2018 let expected_stellar_hash = soroban_rs::xdr::Hash(tx_hash_bytes);
2019
2020 mocks
2022 .provider
2023 .expect_get_transaction()
2024 .with(eq(expected_stellar_hash.clone()))
2025 .times(1)
2026 .returning(move |_| {
2027 Box::pin(async { Ok(dummy_get_transaction_response("PENDING")) })
2028 });
2029
2030 mocks
2032 .tx_repo
2033 .expect_partial_update()
2034 .withf(|_id, update| update.status == Some(TransactionStatus::Expired))
2035 .times(1)
2036 .returning(|id, update| {
2037 let mut updated = create_test_transaction("test");
2038 updated.id = id;
2039 updated.status = update.status.unwrap();
2040 updated.status_reason = update.status_reason.clone();
2041 Ok(updated)
2042 });
2043
2044 mocks
2046 .job_producer
2047 .expect_produce_send_notification_job()
2048 .times(1)
2049 .returning(|_, _| Box::pin(async { Ok(()) }));
2050
2051 mocks
2053 .tx_repo
2054 .expect_find_by_status_paginated()
2055 .returning(move |_, _, _, _| {
2056 Ok(PaginatedResult {
2057 items: vec![],
2058 total: 0,
2059 page: 1,
2060 per_page: 1,
2061 })
2062 });
2063
2064 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
2065 let result = handler.handle_transaction_status_impl(tx, None).await;
2066
2067 assert!(result.is_ok());
2068 let expired_tx = result.unwrap();
2069 assert_eq!(expired_tx.status, TransactionStatus::Expired);
2070 assert!(expired_tx
2071 .status_reason
2072 .as_ref()
2073 .unwrap()
2074 .contains("expired"));
2075 }
2076 }
2077
2078 mod is_valid_until_expired_tests {
2079 use super::*;
2080 use crate::{
2081 jobs::MockJobProducerTrait,
2082 repositories::{
2083 MockRelayerRepository, MockTransactionCounterTrait, MockTransactionRepository,
2084 },
2085 services::{
2086 provider::MockStellarProviderTrait, stellar_dex::MockStellarDexServiceTrait,
2087 },
2088 };
2089 use chrono::{Duration, Utc};
2090
2091 type TestHandler = StellarRelayerTransaction<
2093 MockRelayerRepository,
2094 MockTransactionRepository,
2095 MockJobProducerTrait,
2096 MockStellarCombinedSigner,
2097 MockStellarProviderTrait,
2098 MockTransactionCounterTrait,
2099 MockStellarDexServiceTrait,
2100 >;
2101
2102 #[test]
2103 fn test_rfc3339_expired() {
2104 let past = (Utc::now() - Duration::hours(1)).to_rfc3339();
2105 assert!(TestHandler::is_valid_until_string_expired(&past));
2106 }
2107
2108 #[test]
2109 fn test_rfc3339_not_expired() {
2110 let future = (Utc::now() + Duration::hours(1)).to_rfc3339();
2111 assert!(!TestHandler::is_valid_until_string_expired(&future));
2112 }
2113
2114 #[test]
2115 fn test_numeric_timestamp_expired() {
2116 let past_timestamp = (Utc::now() - Duration::hours(1)).timestamp().to_string();
2117 assert!(TestHandler::is_valid_until_string_expired(&past_timestamp));
2118 }
2119
2120 #[test]
2121 fn test_numeric_timestamp_not_expired() {
2122 let future_timestamp = (Utc::now() + Duration::hours(1)).timestamp().to_string();
2123 assert!(!TestHandler::is_valid_until_string_expired(
2124 &future_timestamp
2125 ));
2126 }
2127
2128 #[test]
2129 fn test_zero_timestamp_unbounded() {
2130 assert!(!TestHandler::is_valid_until_string_expired("0"));
2132 }
2133
2134 #[test]
2135 fn test_invalid_format_not_expired() {
2136 assert!(!TestHandler::is_valid_until_string_expired("not-a-date"));
2138 }
2139 }
2140
2141 mod circuit_breaker_tests {
2143 use super::*;
2144 use crate::jobs::StatusCheckContext;
2145 use crate::models::NetworkType;
2146
2147 fn create_triggered_context() -> StatusCheckContext {
2149 StatusCheckContext::new(
2150 110, 150, 160, 100, 300, NetworkType::Stellar,
2156 )
2157 }
2158
2159 fn create_safe_context() -> StatusCheckContext {
2161 StatusCheckContext::new(
2162 10, 20, 25, 100, 300, NetworkType::Stellar,
2168 )
2169 }
2170
2171 fn create_total_triggered_context() -> StatusCheckContext {
2173 StatusCheckContext::new(
2174 20, 310, 350, 100, 300, NetworkType::Stellar,
2180 )
2181 }
2182
2183 #[tokio::test]
2184 async fn test_circuit_breaker_submitted_marks_as_failed() {
2185 let relayer = create_test_relayer();
2186 let mut mocks = default_test_mocks();
2187
2188 let mut tx_to_handle = create_test_transaction(&relayer.id);
2189 tx_to_handle.status = TransactionStatus::Submitted;
2190 tx_to_handle.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
2191
2192 mocks
2194 .tx_repo
2195 .expect_partial_update()
2196 .withf(|_, update| update.status == Some(TransactionStatus::Failed))
2197 .times(1)
2198 .returning(|_, update| {
2199 let mut updated_tx = create_test_transaction("test-relayer");
2200 updated_tx.status = update.status.unwrap_or(updated_tx.status);
2201 updated_tx.status_reason = update.status_reason.clone();
2202 Ok(updated_tx)
2203 });
2204
2205 mocks
2207 .job_producer
2208 .expect_produce_send_notification_job()
2209 .returning(|_, _| Box::pin(async { Ok(()) }));
2210
2211 mocks
2213 .tx_repo
2214 .expect_find_by_status_paginated()
2215 .returning(|_, _, _, _| {
2216 Ok(PaginatedResult {
2217 items: vec![],
2218 total: 0,
2219 page: 1,
2220 per_page: 1,
2221 })
2222 });
2223
2224 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
2225 let ctx = create_triggered_context();
2226
2227 let result = handler
2228 .handle_transaction_status_impl(tx_to_handle, Some(ctx))
2229 .await;
2230
2231 assert!(result.is_ok());
2232 let tx = result.unwrap();
2233 assert_eq!(tx.status, TransactionStatus::Failed);
2234 assert!(tx.status_reason.is_some());
2235 assert!(tx.status_reason.unwrap().contains("consecutive errors"));
2236 }
2237
2238 #[tokio::test]
2239 async fn test_circuit_breaker_pending_marks_as_failed() {
2240 let relayer = create_test_relayer();
2241 let mut mocks = default_test_mocks();
2242
2243 let mut tx_to_handle = create_test_transaction(&relayer.id);
2244 tx_to_handle.status = TransactionStatus::Pending;
2245 tx_to_handle.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
2246
2247 mocks
2249 .tx_repo
2250 .expect_partial_update()
2251 .withf(|_, update| update.status == Some(TransactionStatus::Failed))
2252 .times(1)
2253 .returning(|_, update| {
2254 let mut updated_tx = create_test_transaction("test-relayer");
2255 updated_tx.status = update.status.unwrap_or(updated_tx.status);
2256 updated_tx.status_reason = update.status_reason.clone();
2257 Ok(updated_tx)
2258 });
2259
2260 mocks
2261 .job_producer
2262 .expect_produce_send_notification_job()
2263 .returning(|_, _| Box::pin(async { Ok(()) }));
2264
2265 mocks
2266 .tx_repo
2267 .expect_find_by_status_paginated()
2268 .returning(|_, _, _, _| {
2269 Ok(PaginatedResult {
2270 items: vec![],
2271 total: 0,
2272 page: 1,
2273 per_page: 1,
2274 })
2275 });
2276
2277 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
2278 let ctx = create_triggered_context();
2279
2280 let result = handler
2281 .handle_transaction_status_impl(tx_to_handle, Some(ctx))
2282 .await;
2283
2284 assert!(result.is_ok());
2285 let tx = result.unwrap();
2286 assert_eq!(tx.status, TransactionStatus::Failed);
2287 }
2288
2289 #[tokio::test]
2290 async fn test_circuit_breaker_total_failures_triggers() {
2291 let relayer = create_test_relayer();
2292 let mut mocks = default_test_mocks();
2293
2294 let mut tx_to_handle = create_test_transaction(&relayer.id);
2295 tx_to_handle.status = TransactionStatus::Submitted;
2296 tx_to_handle.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
2297
2298 mocks
2299 .tx_repo
2300 .expect_partial_update()
2301 .withf(|_, update| update.status == Some(TransactionStatus::Failed))
2302 .times(1)
2303 .returning(|_, update| {
2304 let mut updated_tx = create_test_transaction("test-relayer");
2305 updated_tx.status = update.status.unwrap_or(updated_tx.status);
2306 updated_tx.status_reason = update.status_reason.clone();
2307 Ok(updated_tx)
2308 });
2309
2310 mocks
2311 .job_producer
2312 .expect_produce_send_notification_job()
2313 .returning(|_, _| Box::pin(async { Ok(()) }));
2314
2315 mocks
2316 .tx_repo
2317 .expect_find_by_status_paginated()
2318 .returning(|_, _, _, _| {
2319 Ok(PaginatedResult {
2320 items: vec![],
2321 total: 0,
2322 page: 1,
2323 per_page: 1,
2324 })
2325 });
2326
2327 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
2328 let ctx = create_total_triggered_context();
2330
2331 let result = handler
2332 .handle_transaction_status_impl(tx_to_handle, Some(ctx))
2333 .await;
2334
2335 assert!(result.is_ok());
2336 let tx = result.unwrap();
2337 assert_eq!(tx.status, TransactionStatus::Failed);
2338 }
2339
2340 #[tokio::test]
2341 async fn test_circuit_breaker_below_threshold_continues() {
2342 let relayer = create_test_relayer();
2343 let mut mocks = default_test_mocks();
2344
2345 let mut tx_to_handle = create_test_transaction(&relayer.id);
2346 tx_to_handle.status = TransactionStatus::Submitted;
2347 tx_to_handle.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
2348 let tx_hash_bytes = [1u8; 32];
2349 let tx_hash_hex = hex::encode(tx_hash_bytes);
2350 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx_to_handle.network_data
2351 {
2352 stellar_data.hash = Some(tx_hash_hex.clone());
2353 }
2354
2355 mocks
2357 .provider
2358 .expect_get_transaction()
2359 .returning(|_| Box::pin(async { Ok(dummy_get_transaction_response("SUCCESS")) }));
2360
2361 mocks
2362 .tx_repo
2363 .expect_partial_update()
2364 .returning(|_, update| {
2365 let mut updated_tx = create_test_transaction("test-relayer");
2366 updated_tx.status = update.status.unwrap_or(updated_tx.status);
2367 Ok(updated_tx)
2368 });
2369
2370 mocks
2371 .job_producer
2372 .expect_produce_send_notification_job()
2373 .returning(|_, _| Box::pin(async { Ok(()) }));
2374
2375 mocks
2376 .tx_repo
2377 .expect_find_by_status_paginated()
2378 .returning(|_, _, _, _| {
2379 Ok(PaginatedResult {
2380 items: vec![],
2381 total: 0,
2382 page: 1,
2383 per_page: 1,
2384 })
2385 });
2386
2387 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
2388 let ctx = create_safe_context();
2389
2390 let result = handler
2391 .handle_transaction_status_impl(tx_to_handle, Some(ctx))
2392 .await;
2393
2394 assert!(result.is_ok());
2395 let tx = result.unwrap();
2396 assert_eq!(tx.status, TransactionStatus::Confirmed);
2398 }
2399
2400 #[tokio::test]
2401 async fn test_circuit_breaker_final_state_early_return() {
2402 let relayer = create_test_relayer();
2403 let mocks = default_test_mocks();
2404
2405 let mut tx_to_handle = create_test_transaction(&relayer.id);
2407 tx_to_handle.status = TransactionStatus::Confirmed;
2408
2409 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
2410 let ctx = create_triggered_context();
2411
2412 let result = handler
2414 .handle_transaction_status_impl(tx_to_handle.clone(), Some(ctx))
2415 .await;
2416
2417 assert!(result.is_ok());
2418 assert_eq!(result.unwrap().id, tx_to_handle.id);
2419 }
2420
2421 #[tokio::test]
2422 async fn test_circuit_breaker_no_context_continues() {
2423 let relayer = create_test_relayer();
2424 let mut mocks = default_test_mocks();
2425
2426 let mut tx_to_handle = create_test_transaction(&relayer.id);
2427 tx_to_handle.status = TransactionStatus::Submitted;
2428 tx_to_handle.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
2429 let tx_hash_bytes = [1u8; 32];
2430 let tx_hash_hex = hex::encode(tx_hash_bytes);
2431 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx_to_handle.network_data
2432 {
2433 stellar_data.hash = Some(tx_hash_hex.clone());
2434 }
2435
2436 mocks
2438 .provider
2439 .expect_get_transaction()
2440 .returning(|_| Box::pin(async { Ok(dummy_get_transaction_response("SUCCESS")) }));
2441
2442 mocks
2443 .tx_repo
2444 .expect_partial_update()
2445 .returning(|_, update| {
2446 let mut updated_tx = create_test_transaction("test-relayer");
2447 updated_tx.status = update.status.unwrap_or(updated_tx.status);
2448 Ok(updated_tx)
2449 });
2450
2451 mocks
2452 .job_producer
2453 .expect_produce_send_notification_job()
2454 .returning(|_, _| Box::pin(async { Ok(()) }));
2455
2456 mocks
2457 .tx_repo
2458 .expect_find_by_status_paginated()
2459 .returning(|_, _, _, _| {
2460 Ok(PaginatedResult {
2461 items: vec![],
2462 total: 0,
2463 page: 1,
2464 per_page: 1,
2465 })
2466 });
2467
2468 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
2469
2470 let result = handler
2472 .handle_transaction_status_impl(tx_to_handle, None)
2473 .await;
2474
2475 assert!(result.is_ok());
2476 let tx = result.unwrap();
2477 assert_eq!(tx.status, TransactionStatus::Confirmed);
2478 }
2479 }
2480
2481 mod failure_detail_helper_tests {
2482 use super::*;
2483 use soroban_rs::xdr::{InvokeHostFunctionResult, OperationResult, OperationResultTr, VecM};
2484
2485 #[test]
2486 fn first_failing_op_finds_trapped() {
2487 let ops: VecM<OperationResult> = vec![OperationResult::OpInner(
2488 OperationResultTr::InvokeHostFunction(InvokeHostFunctionResult::Trapped),
2489 )]
2490 .try_into()
2491 .unwrap();
2492 assert_eq!(first_failing_op(ops.as_slice()), Some("Trapped"));
2493 }
2494
2495 #[test]
2496 fn first_failing_op_skips_success() {
2497 let ops: VecM<OperationResult> = vec![
2498 OperationResult::OpInner(OperationResultTr::InvokeHostFunction(
2499 InvokeHostFunctionResult::Success(soroban_rs::xdr::Hash([0u8; 32])),
2500 )),
2501 OperationResult::OpInner(OperationResultTr::InvokeHostFunction(
2502 InvokeHostFunctionResult::ResourceLimitExceeded,
2503 )),
2504 ]
2505 .try_into()
2506 .unwrap();
2507 assert_eq!(
2508 first_failing_op(ops.as_slice()),
2509 Some("ResourceLimitExceeded")
2510 );
2511 }
2512
2513 #[test]
2514 fn first_failing_op_all_success_returns_none() {
2515 let ops: VecM<OperationResult> = vec![OperationResult::OpInner(
2516 OperationResultTr::InvokeHostFunction(InvokeHostFunctionResult::Success(
2517 soroban_rs::xdr::Hash([0u8; 32]),
2518 )),
2519 )]
2520 .try_into()
2521 .unwrap();
2522 assert_eq!(first_failing_op(ops.as_slice()), None);
2523 }
2524
2525 #[test]
2526 fn first_failing_op_empty_returns_none() {
2527 assert_eq!(first_failing_op(&[]), None);
2528 }
2529
2530 #[test]
2531 fn first_failing_op_op_bad_auth() {
2532 let ops: VecM<OperationResult> = vec![OperationResult::OpBadAuth].try_into().unwrap();
2533 assert_eq!(first_failing_op(ops.as_slice()), Some("OpBadAuth"));
2534 }
2535 }
2536}