openzeppelin_relayer/domain/transaction/stellar/
submit.rs

1//! This module contains the submission-related functionality for Stellar transactions.
2//! It includes methods for submitting transactions with robust error handling,
3//! ensuring proper transaction state management on failure.
4
5use chrono::Utc;
6use tracing::{info, warn};
7
8use super::{is_final_state, utils::is_bad_sequence_error, StellarRelayerTransaction};
9use crate::{
10    jobs::JobProducerTrait,
11    models::{
12        NetworkTransactionData, RelayerRepoModel, TransactionError, TransactionRepoModel,
13        TransactionStatus, TransactionUpdateRequest,
14    },
15    repositories::{Repository, TransactionCounterTrait, TransactionRepository},
16    services::{
17        provider::StellarProviderTrait,
18        signer::{Signer, StellarSignTrait},
19    },
20};
21
22impl<R, T, J, S, P, C, D> StellarRelayerTransaction<R, T, J, S, P, C, D>
23where
24    R: Repository<RelayerRepoModel, String> + Send + Sync,
25    T: TransactionRepository + Send + Sync,
26    J: JobProducerTrait + Send + Sync,
27    S: Signer + StellarSignTrait + Send + Sync,
28    P: StellarProviderTrait + Send + Sync,
29    C: TransactionCounterTrait + Send + Sync,
30    D: crate::services::stellar_dex::StellarDexServiceTrait + Send + Sync + 'static,
31{
32    /// Main submission method with robust error handling.
33    /// Unlike prepare, submit doesn't claim lanes but still needs proper error handling.
34    pub async fn submit_transaction_impl(
35        &self,
36        tx: TransactionRepoModel,
37    ) -> Result<TransactionRepoModel, TransactionError> {
38        info!(
39            tx_id = %tx.id,
40            relayer_id = %tx.relayer_id,
41            status = ?tx.status,
42            "submitting stellar transaction"
43        );
44
45        // Defensive check: if transaction is in a final state or unexpected state, don't retry
46        if is_final_state(&tx.status) {
47            warn!(
48                tx_id = %tx.id,
49                relayer_id = %tx.relayer_id,
50                status = ?tx.status,
51                "transaction already in final state, skipping submission"
52            );
53            return Ok(tx);
54        }
55
56        // Check if transaction has expired before attempting submission
57        if self.is_transaction_expired(&tx)? {
58            info!(
59                tx_id = %tx.id,
60                relayer_id = %tx.relayer_id,
61                valid_until = ?tx.valid_until,
62                "transaction has expired, marking as Expired"
63            );
64            return self
65                .mark_as_expired(tx, "Transaction time_bounds expired".to_string())
66                .await;
67        }
68
69        // Call core submission logic with error handling
70        match self.submit_core(tx.clone()).await {
71            Ok(submitted_tx) => Ok(submitted_tx),
72            Err(error) => {
73                // Handle submission failure - mark as failed and send notification
74                self.handle_submit_failure(tx, error).await
75            }
76        }
77    }
78
79    /// Core submission logic - pure business logic without error handling concerns.
80    ///
81    /// Uses `send_transaction_with_status` to get full status information from the RPC.
82    /// Handles status codes:
83    /// - PENDING: Transaction accepted for processing
84    /// - DUPLICATE: Transaction already submitted (treat as success)
85    /// - TRY_AGAIN_LATER: Transaction not queued, mark as failed
86    /// - ERROR: Transaction validation failed, mark as failed
87    async fn submit_core(
88        &self,
89        tx: TransactionRepoModel,
90    ) -> Result<TransactionRepoModel, TransactionError> {
91        let stellar_data = tx.network_data.get_stellar_transaction_data()?;
92        let tx_envelope = stellar_data
93            .get_envelope_for_submission()
94            .map_err(TransactionError::from)?;
95
96        // Use send_transaction_with_status to get full status information
97        let response = self
98            .provider()
99            .send_transaction_with_status(&tx_envelope)
100            .await
101            .map_err(TransactionError::from)?;
102
103        // Handle status codes from the RPC response
104        match response.status.as_str() {
105            "PENDING" | "DUPLICATE" => {
106                // Success - transaction is accepted or already exists
107                if response.status == "DUPLICATE" {
108                    info!(
109                        tx_id = %tx.id,
110                        relayer_id = %tx.relayer_id,
111                        hash = %response.hash,
112                        "transaction already submitted (DUPLICATE status)"
113                    );
114                }
115
116                let tx_hash_hex = response.hash.clone();
117                let updated_stellar_data = stellar_data.with_hash(tx_hash_hex.clone());
118
119                let mut hashes = tx.hashes.clone();
120                if !hashes.contains(&tx_hash_hex) {
121                    hashes.push(tx_hash_hex);
122                }
123
124                let update_req = TransactionUpdateRequest {
125                    status: Some(TransactionStatus::Submitted),
126                    sent_at: Some(Utc::now().to_rfc3339()),
127                    network_data: Some(NetworkTransactionData::Stellar(updated_stellar_data)),
128                    hashes: Some(hashes),
129                    ..Default::default()
130                };
131
132                let updated_tx = self
133                    .transaction_repository()
134                    .partial_update(tx.id.clone(), update_req)
135                    .await?;
136
137                // Send notification
138                self.send_transaction_update_notification(&updated_tx).await;
139
140                Ok(updated_tx)
141            }
142            "TRY_AGAIN_LATER" => {
143                // Transaction not queued - per acceptance criteria, mark as failed
144                Err(TransactionError::UnexpectedError(
145                    "Transaction not queued: TRY_AGAIN_LATER".to_string(),
146                ))
147            }
148            "ERROR" => {
149                // Transaction validation failed
150                let error_detail = response
151                    .error_result_xdr
152                    .unwrap_or_else(|| "No error details provided".to_string());
153                Err(TransactionError::UnexpectedError(format!(
154                    "Transaction submission error: {error_detail}"
155                )))
156            }
157            unknown => {
158                // Unknown status - treat as error
159                warn!(
160                    tx_id = %tx.id,
161                    relayer_id = %tx.relayer_id,
162                    status = %unknown,
163                    "received unknown transaction status from RPC"
164                );
165                Err(TransactionError::UnexpectedError(format!(
166                    "Unknown transaction status: {unknown}"
167                )))
168            }
169        }
170    }
171
172    /// Handles submission failures with comprehensive cleanup and error reporting.
173    /// For bad sequence errors, resets the transaction and re-enqueues it for retry.
174    async fn handle_submit_failure(
175        &self,
176        tx: TransactionRepoModel,
177        error: TransactionError,
178    ) -> Result<TransactionRepoModel, TransactionError> {
179        let error_reason = format!("Submission failed: {error}");
180        let tx_id = tx.id.clone();
181        let relayer_id = tx.relayer_id.clone();
182        warn!(
183            tx_id = %tx_id,
184            relayer_id = %relayer_id,
185            reason = %error_reason,
186            "transaction submission failed"
187        );
188
189        if is_bad_sequence_error(&error_reason) {
190            // For bad sequence errors, sync sequence from chain first
191            if let Ok(stellar_data) = tx.network_data.get_stellar_transaction_data() {
192                info!(
193                    tx_id = %tx_id,
194                    relayer_id = %relayer_id,
195                    "syncing sequence from chain after bad sequence error"
196                );
197                match self
198                    .sync_sequence_from_chain(&stellar_data.source_account)
199                    .await
200                {
201                    Ok(()) => {
202                        info!(
203                            tx_id = %tx_id,
204                            relayer_id = %relayer_id,
205                            "successfully synced sequence from chain"
206                        );
207                    }
208                    Err(sync_error) => {
209                        warn!(
210                            tx_id = %tx_id,
211                            relayer_id = %relayer_id,
212                            error = %sync_error,
213                            "failed to sync sequence from chain"
214                        );
215                    }
216                }
217            }
218
219            // Reset the transaction to pending state
220            // Status check will handle resubmission when it detects a pending transaction without hash
221            info!(
222                tx_id = %tx_id,
223                relayer_id = %relayer_id,
224                "bad sequence error detected, resetting transaction to pending state"
225            );
226            match self.reset_transaction_for_retry(tx.clone()).await {
227                Ok(reset_tx) => {
228                    info!(
229                        tx_id = %tx_id,
230                        relayer_id = %relayer_id,
231                        "transaction reset to pending, status check will handle resubmission"
232                    );
233                    // Return success since we've reset the transaction
234                    // Status check job (scheduled with delay) will detect pending without hash
235                    // and schedule a recovery job to go through the pipeline again
236                    return Ok(reset_tx);
237                }
238                Err(reset_error) => {
239                    warn!(
240                        tx_id = %tx_id,
241                        relayer_id = %relayer_id,
242                        error = %reset_error,
243                        "failed to reset transaction for retry"
244                    );
245                    // Fall through to normal failure handling
246                }
247            }
248        }
249
250        // For non-bad-sequence errors or if reset failed, mark as failed
251        // Step 1: Mark transaction as Failed with detailed reason
252        let update_request = TransactionUpdateRequest {
253            status: Some(TransactionStatus::Failed),
254            status_reason: Some(error_reason.clone()),
255            ..Default::default()
256        };
257        let _failed_tx = match self
258            .finalize_transaction_state(tx_id.clone(), update_request)
259            .await
260        {
261            Ok(updated_tx) => updated_tx,
262            Err(finalize_error) => {
263                warn!(
264                    tx_id = %tx_id,
265                    relayer_id = %relayer_id,
266                    error = %finalize_error,
267                    "failed to mark transaction as failed, continuing with lane cleanup"
268                );
269                tx
270            }
271        };
272
273        // Attempt to enqueue next pending transaction or release lane
274        if let Err(enqueue_error) = self.enqueue_next_pending_transaction(&tx_id).await {
275            warn!(
276                tx_id = %tx_id,
277                relayer_id = %relayer_id,
278                error = %enqueue_error,
279                "failed to enqueue next pending transaction after submission failure"
280            );
281        }
282
283        info!(
284            tx_id = %tx_id,
285            relayer_id = %relayer_id,
286            error = %error_reason,
287            "transaction submission failure handled"
288        );
289
290        Err(error)
291    }
292
293    /// Resubmit transaction - delegates to submit_transaction_impl
294    pub async fn resubmit_transaction_impl(
295        &self,
296        tx: TransactionRepoModel,
297    ) -> Result<TransactionRepoModel, TransactionError> {
298        self.submit_transaction_impl(tx).await
299    }
300}
301
302#[cfg(test)]
303mod tests {
304    use super::*;
305    use soroban_rs::stellar_rpc_client::SendTransactionResponse;
306    use soroban_rs::xdr::WriteXdr;
307
308    use crate::domain::transaction::stellar::test_helpers::*;
309
310    /// Helper to create a SendTransactionResponse with given status
311    fn create_send_tx_response(status: &str, hash: &str) -> SendTransactionResponse {
312        SendTransactionResponse {
313            status: status.to_string(),
314            hash: hash.to_string(),
315            error_result_xdr: None,
316            latest_ledger: 100,
317            latest_ledger_close_time: 1700000000,
318        }
319    }
320
321    mod submit_transaction_tests {
322        use crate::{
323            models::RepositoryError, repositories::PaginatedResult,
324            services::provider::ProviderError,
325        };
326
327        use super::*;
328
329        #[tokio::test]
330        async fn submit_transaction_happy_path() {
331            let relayer = create_test_relayer();
332            let mut mocks = default_test_mocks();
333
334            // provider returns PENDING status
335            let response = create_send_tx_response(
336                "PENDING",
337                "0101010101010101010101010101010101010101010101010101010101010101",
338            );
339            mocks
340                .provider
341                .expect_send_transaction_with_status()
342                .returning(move |_| {
343                    let r = response.clone();
344                    Box::pin(async move { Ok(r) })
345                });
346
347            // expect partial update to Submitted
348            mocks
349                .tx_repo
350                .expect_partial_update()
351                .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
352                .returning(|id, upd| {
353                    let mut tx = create_test_transaction("relayer-1");
354                    tx.id = id;
355                    tx.status = upd.status.unwrap();
356                    Ok::<_, RepositoryError>(tx)
357                });
358
359            // Expect notification
360            mocks
361                .job_producer
362                .expect_produce_send_notification_job()
363                .times(1)
364                .returning(|_, _| Box::pin(async { Ok(()) }));
365
366            let handler = make_stellar_tx_handler(relayer.clone(), mocks);
367
368            let mut tx = create_test_transaction(&relayer.id);
369            tx.status = TransactionStatus::Sent; // Must be Sent for idempotent submit
370            if let NetworkTransactionData::Stellar(ref mut d) = tx.network_data {
371                d.signatures.push(dummy_signature());
372                d.signed_envelope_xdr = Some(create_signed_xdr(TEST_PK, TEST_PK_2));
373                // Valid XDR
374            }
375
376            let res = handler.submit_transaction_impl(tx).await.unwrap();
377            assert_eq!(res.status, TransactionStatus::Submitted);
378        }
379
380        #[tokio::test]
381        async fn submit_transaction_provider_error_marks_failed() {
382            let relayer = create_test_relayer();
383            let mut mocks = default_test_mocks();
384
385            // Provider fails with non-bad-sequence error
386            mocks
387                .provider
388                .expect_send_transaction_with_status()
389                .returning(|_| {
390                    Box::pin(async { Err(ProviderError::Other("Network error".to_string())) })
391                });
392
393            // Mock finalize_transaction_state for failure handling
394            mocks
395                .tx_repo
396                .expect_partial_update()
397                .withf(|_, upd| upd.status == Some(TransactionStatus::Failed))
398                .returning(|id, upd| {
399                    let mut tx = create_test_transaction("relayer-1");
400                    tx.id = id;
401                    tx.status = upd.status.unwrap();
402                    Ok::<_, RepositoryError>(tx)
403                });
404
405            // Mock notification for failed transaction
406            mocks
407                .job_producer
408                .expect_produce_send_notification_job()
409                .times(1)
410                .returning(|_, _| Box::pin(async { Ok(()) }));
411
412            // Mock find_by_status_paginated for enqueue_next_pending_transaction
413            mocks
414                .tx_repo
415                .expect_find_by_status_paginated()
416                .returning(move |_, _, _, _| {
417                    Ok(PaginatedResult {
418                        items: vec![],
419                        total: 0,
420                        page: 1,
421                        per_page: 1,
422                    })
423                }); // No pending transactions
424
425            let handler = make_stellar_tx_handler(relayer.clone(), mocks);
426            let mut tx = create_test_transaction(&relayer.id);
427            tx.status = TransactionStatus::Sent; // Must be Sent for idempotent submit
428            if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
429                data.signatures.push(dummy_signature());
430                data.sequence_number = Some(42); // Set sequence number
431                data.signed_envelope_xdr = Some("test-xdr".to_string()); // Required for submission
432            }
433
434            let res = handler.submit_transaction_impl(tx).await;
435
436            // Should return error but transaction should be marked as failed
437            assert!(res.is_err());
438            matches!(res.unwrap_err(), TransactionError::UnexpectedError(_));
439        }
440
441        #[tokio::test]
442        async fn submit_transaction_repository_error_marks_failed() {
443            let relayer = create_test_relayer();
444            let mut mocks = default_test_mocks();
445
446            // Provider returns PENDING status
447            let response = create_send_tx_response(
448                "PENDING",
449                "0101010101010101010101010101010101010101010101010101010101010101",
450            );
451            mocks
452                .provider
453                .expect_send_transaction_with_status()
454                .returning(move |_| {
455                    let r = response.clone();
456                    Box::pin(async move { Ok(r) })
457                });
458
459            // Repository fails on first update (submission)
460            mocks
461                .tx_repo
462                .expect_partial_update()
463                .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
464                .returning(|_, _| Err(RepositoryError::Unknown("Database error".to_string())));
465
466            // Mock finalize_transaction_state for failure handling
467            mocks
468                .tx_repo
469                .expect_partial_update()
470                .withf(|_, upd| upd.status == Some(TransactionStatus::Failed))
471                .returning(|id, upd| {
472                    let mut tx = create_test_transaction("relayer-1");
473                    tx.id = id;
474                    tx.status = upd.status.unwrap();
475                    Ok::<_, RepositoryError>(tx)
476                });
477
478            // Mock notification for failed transaction
479            mocks
480                .job_producer
481                .expect_produce_send_notification_job()
482                .times(1)
483                .returning(|_, _| Box::pin(async { Ok(()) }));
484
485            // Mock find_by_status_paginated for enqueue_next_pending_transaction
486            mocks
487                .tx_repo
488                .expect_find_by_status_paginated()
489                .returning(move |_, _, _, _| {
490                    Ok(PaginatedResult {
491                        items: vec![],
492                        total: 0,
493                        page: 1,
494                        per_page: 1,
495                    })
496                }); // No pending transactions
497
498            let handler = make_stellar_tx_handler(relayer.clone(), mocks);
499            let mut tx = create_test_transaction(&relayer.id);
500            tx.status = TransactionStatus::Sent; // Must be Sent for idempotent submit
501            if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
502                data.signatures.push(dummy_signature());
503                data.sequence_number = Some(42); // Set sequence number
504                data.signed_envelope_xdr = Some("test-xdr".to_string()); // Required for submission
505            }
506
507            let res = handler.submit_transaction_impl(tx).await;
508
509            // Should return error but transaction should be marked as failed
510            assert!(res.is_err());
511        }
512
513        #[tokio::test]
514        async fn submit_transaction_uses_signed_envelope_xdr() {
515            let relayer = create_test_relayer();
516            let mut mocks = default_test_mocks();
517
518            // Create a transaction with signed_envelope_xdr set
519            let mut tx = create_test_transaction(&relayer.id);
520            tx.status = TransactionStatus::Sent; // Must be Sent for idempotent submit
521            if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
522                data.signatures.push(dummy_signature());
523                // Build and store the signed envelope XDR
524                let envelope = data.get_envelope_for_submission().unwrap();
525                let xdr = envelope
526                    .to_xdr_base64(soroban_rs::xdr::Limits::none())
527                    .unwrap();
528                data.signed_envelope_xdr = Some(xdr);
529            }
530
531            // Provider should receive the envelope decoded from signed_envelope_xdr
532            let response = create_send_tx_response(
533                "PENDING",
534                "0202020202020202020202020202020202020202020202020202020202020202",
535            );
536            mocks
537                .provider
538                .expect_send_transaction_with_status()
539                .returning(move |_| {
540                    let r = response.clone();
541                    Box::pin(async move { Ok(r) })
542                });
543
544            // Update to Submitted
545            mocks
546                .tx_repo
547                .expect_partial_update()
548                .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
549                .returning(|id, upd| {
550                    let mut tx = create_test_transaction("relayer-1");
551                    tx.id = id;
552                    tx.status = upd.status.unwrap();
553                    Ok::<_, RepositoryError>(tx)
554                });
555
556            // Expect notification
557            mocks
558                .job_producer
559                .expect_produce_send_notification_job()
560                .times(1)
561                .returning(|_, _| Box::pin(async { Ok(()) }));
562
563            let handler = make_stellar_tx_handler(relayer.clone(), mocks);
564            let res = handler.submit_transaction_impl(tx).await.unwrap();
565
566            assert_eq!(res.status, TransactionStatus::Submitted);
567        }
568
569        #[tokio::test]
570        async fn resubmit_transaction_delegates_to_submit() {
571            let relayer = create_test_relayer();
572            let mut mocks = default_test_mocks();
573
574            // provider returns PENDING status
575            let response = create_send_tx_response(
576                "PENDING",
577                "0101010101010101010101010101010101010101010101010101010101010101",
578            );
579            mocks
580                .provider
581                .expect_send_transaction_with_status()
582                .returning(move |_| {
583                    let r = response.clone();
584                    Box::pin(async move { Ok(r) })
585                });
586
587            // expect partial update to Submitted
588            mocks
589                .tx_repo
590                .expect_partial_update()
591                .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
592                .returning(|id, upd| {
593                    let mut tx = create_test_transaction("relayer-1");
594                    tx.id = id;
595                    tx.status = upd.status.unwrap();
596                    Ok::<_, RepositoryError>(tx)
597                });
598
599            // Expect notification
600            mocks
601                .job_producer
602                .expect_produce_send_notification_job()
603                .times(1)
604                .returning(|_, _| Box::pin(async { Ok(()) }));
605
606            let handler = make_stellar_tx_handler(relayer.clone(), mocks);
607
608            let mut tx = create_test_transaction(&relayer.id);
609            tx.status = TransactionStatus::Sent; // Must be Sent for idempotent submit
610            if let NetworkTransactionData::Stellar(ref mut d) = tx.network_data {
611                d.signatures.push(dummy_signature());
612                d.signed_envelope_xdr = Some(create_signed_xdr(TEST_PK, TEST_PK_2));
613                // Valid XDR
614            }
615
616            let res = handler.resubmit_transaction_impl(tx).await.unwrap();
617            assert_eq!(res.status, TransactionStatus::Submitted);
618        }
619
620        #[tokio::test]
621        async fn submit_transaction_failure_enqueues_next_transaction() {
622            let relayer = create_test_relayer();
623            let mut mocks = default_test_mocks();
624
625            // Provider fails with non-bad-sequence error
626            mocks
627                .provider
628                .expect_send_transaction_with_status()
629                .returning(|_| {
630                    Box::pin(async { Err(ProviderError::Other("Network error".to_string())) })
631                });
632
633            // No sync expected for non-bad-sequence errors
634
635            // Mock finalize_transaction_state for failure handling
636            mocks
637                .tx_repo
638                .expect_partial_update()
639                .withf(|_, upd| upd.status == Some(TransactionStatus::Failed))
640                .returning(|id, upd| {
641                    let mut tx = create_test_transaction("relayer-1");
642                    tx.id = id;
643                    tx.status = upd.status.unwrap();
644                    Ok::<_, RepositoryError>(tx)
645                });
646
647            // Mock notification for failed transaction
648            mocks
649                .job_producer
650                .expect_produce_send_notification_job()
651                .times(1)
652                .returning(|_, _| Box::pin(async { Ok(()) }));
653
654            // Mock find_by_status to return a pending transaction
655            let mut pending_tx = create_test_transaction(&relayer.id);
656            pending_tx.id = "next-pending-tx".to_string();
657            pending_tx.status = TransactionStatus::Pending;
658            let captured_pending_tx = pending_tx.clone();
659            let relayer_id_clone = relayer.id.clone();
660            mocks
661                .tx_repo
662                .expect_find_by_status_paginated()
663                .withf(move |relayer_id, statuses, query, oldest_first| {
664                    *relayer_id == relayer_id_clone
665                        && statuses == [TransactionStatus::Pending]
666                        && query.page == 1
667                        && query.per_page == 1
668                        && *oldest_first == true
669                })
670                .times(1)
671                .returning(move |_, _, _, _| {
672                    Ok(PaginatedResult {
673                        items: vec![captured_pending_tx.clone()],
674                        total: 1,
675                        page: 1,
676                        per_page: 1,
677                    })
678                });
679
680            // Mock produce_transaction_request_job for the next pending transaction
681            mocks
682                .job_producer
683                .expect_produce_transaction_request_job()
684                .withf(move |job, _delay| job.transaction_id == "next-pending-tx")
685                .times(1)
686                .returning(|_, _| Box::pin(async { Ok(()) }));
687
688            let handler = make_stellar_tx_handler(relayer.clone(), mocks);
689            let mut tx = create_test_transaction(&relayer.id);
690            tx.status = TransactionStatus::Sent; // Must be Sent for idempotent submit
691            if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
692                data.signatures.push(dummy_signature());
693                data.sequence_number = Some(42); // Set sequence number
694                data.signed_envelope_xdr = Some("test-xdr".to_string()); // Required for submission
695            }
696
697            let res = handler.submit_transaction_impl(tx).await;
698
699            // Should return error but next transaction should be enqueued
700            assert!(res.is_err());
701            matches!(res.unwrap_err(), TransactionError::UnexpectedError(_));
702        }
703
704        #[tokio::test]
705        async fn test_submit_bad_sequence_resets_and_retries() {
706            let relayer = create_test_relayer();
707            let mut mocks = default_test_mocks();
708
709            // Mock provider to return bad sequence error
710            mocks
711                .provider
712                .expect_send_transaction_with_status()
713                .returning(|_| {
714                    Box::pin(async {
715                        Err(ProviderError::Other(
716                            "transaction submission failed: TxBadSeq".to_string(),
717                        ))
718                    })
719                });
720
721            // Mock get_account for sync_sequence_from_chain
722            mocks.provider.expect_get_account().times(1).returning(|_| {
723                Box::pin(async {
724                    use soroban_rs::xdr::{
725                        AccountEntry, AccountEntryExt, AccountId, PublicKey, SequenceNumber,
726                        String32, Thresholds, Uint256,
727                    };
728                    use stellar_strkey::ed25519;
729
730                    let pk = ed25519::PublicKey::from_string(TEST_PK).unwrap();
731                    let account_id = AccountId(PublicKey::PublicKeyTypeEd25519(Uint256(pk.0)));
732
733                    Ok(AccountEntry {
734                        account_id,
735                        balance: 1000000,
736                        seq_num: SequenceNumber(100),
737                        num_sub_entries: 0,
738                        inflation_dest: None,
739                        flags: 0,
740                        home_domain: String32::default(),
741                        thresholds: Thresholds([1, 1, 1, 1]),
742                        signers: Default::default(),
743                        ext: AccountEntryExt::V0,
744                    })
745                })
746            });
747
748            // Mock counter set for sync_sequence_from_chain
749            mocks
750                .counter
751                .expect_set()
752                .times(1)
753                .returning(|_, _, _| Box::pin(async { Ok(()) }));
754
755            // Mock partial_update for reset_transaction_for_retry - should reset to Pending
756            mocks
757                .tx_repo
758                .expect_partial_update()
759                .withf(|_, upd| upd.status == Some(TransactionStatus::Pending))
760                .times(1)
761                .returning(|id, upd| {
762                    let mut tx = create_test_transaction("relayer-1");
763                    tx.id = id;
764                    tx.status = upd.status.unwrap();
765                    if let Some(network_data) = upd.network_data {
766                        tx.network_data = network_data;
767                    }
768                    Ok::<_, RepositoryError>(tx)
769                });
770
771            // Note: Status check will handle resubmission when it detects a pending transaction without hash
772            // We don't schedule the job here - it will be scheduled by status check when the transaction is old enough
773
774            let handler = make_stellar_tx_handler(relayer.clone(), mocks);
775            let mut tx = create_test_transaction(&relayer.id);
776            tx.status = TransactionStatus::Sent; // Must be Sent for idempotent submit
777            if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
778                data.signatures.push(dummy_signature());
779                data.sequence_number = Some(42);
780                data.signed_envelope_xdr = Some(create_signed_xdr(TEST_PK, TEST_PK_2));
781                // Valid XDR
782            }
783
784            let result = handler.submit_transaction_impl(tx).await;
785
786            // Should return Ok since we're handling the retry
787            assert!(result.is_ok());
788            let reset_tx = result.unwrap();
789            assert_eq!(reset_tx.status, TransactionStatus::Pending);
790
791            // Verify stellar data was reset
792            if let NetworkTransactionData::Stellar(data) = &reset_tx.network_data {
793                assert!(data.sequence_number.is_none());
794                assert!(data.signatures.is_empty());
795                assert!(data.hash.is_none());
796                assert!(data.signed_envelope_xdr.is_none());
797            } else {
798                panic!("Expected Stellar transaction data");
799            }
800        }
801
802        #[tokio::test]
803        async fn submit_transaction_duplicate_status_succeeds() {
804            let relayer = create_test_relayer();
805            let mut mocks = default_test_mocks();
806
807            // Provider returns DUPLICATE status
808            let response = create_send_tx_response(
809                "DUPLICATE",
810                "0101010101010101010101010101010101010101010101010101010101010101",
811            );
812            mocks
813                .provider
814                .expect_send_transaction_with_status()
815                .returning(move |_| {
816                    let r = response.clone();
817                    Box::pin(async move { Ok(r) })
818                });
819
820            // expect partial update to Submitted
821            mocks
822                .tx_repo
823                .expect_partial_update()
824                .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
825                .returning(|id, upd| {
826                    let mut tx = create_test_transaction("relayer-1");
827                    tx.id = id;
828                    tx.status = upd.status.unwrap();
829                    Ok::<_, RepositoryError>(tx)
830                });
831
832            // Expect notification
833            mocks
834                .job_producer
835                .expect_produce_send_notification_job()
836                .times(1)
837                .returning(|_, _| Box::pin(async { Ok(()) }));
838
839            let handler = make_stellar_tx_handler(relayer.clone(), mocks);
840
841            let mut tx = create_test_transaction(&relayer.id);
842            tx.status = TransactionStatus::Sent;
843            if let NetworkTransactionData::Stellar(ref mut d) = tx.network_data {
844                d.signatures.push(dummy_signature());
845                d.signed_envelope_xdr = Some(create_signed_xdr(TEST_PK, TEST_PK_2));
846            }
847
848            let res = handler.submit_transaction_impl(tx).await.unwrap();
849            assert_eq!(res.status, TransactionStatus::Submitted);
850        }
851
852        #[tokio::test]
853        async fn submit_transaction_try_again_later_fails() {
854            let relayer = create_test_relayer();
855            let mut mocks = default_test_mocks();
856
857            // Provider returns TRY_AGAIN_LATER status
858            let response = create_send_tx_response(
859                "TRY_AGAIN_LATER",
860                "0101010101010101010101010101010101010101010101010101010101010101",
861            );
862            mocks
863                .provider
864                .expect_send_transaction_with_status()
865                .returning(move |_| {
866                    let r = response.clone();
867                    Box::pin(async move { Ok(r) })
868                });
869
870            // Mock finalize_transaction_state for failure handling
871            mocks
872                .tx_repo
873                .expect_partial_update()
874                .withf(|_, upd| {
875                    upd.status == Some(TransactionStatus::Failed)
876                        && upd
877                            .status_reason
878                            .as_ref()
879                            .is_some_and(|r| r.contains("TRY_AGAIN_LATER"))
880                })
881                .returning(|id, upd| {
882                    let mut tx = create_test_transaction("relayer-1");
883                    tx.id = id;
884                    tx.status = upd.status.unwrap();
885                    Ok::<_, RepositoryError>(tx)
886                });
887
888            // Mock notification for failed transaction
889            mocks
890                .job_producer
891                .expect_produce_send_notification_job()
892                .times(1)
893                .returning(|_, _| Box::pin(async { Ok(()) }));
894
895            // Mock find_by_status_paginated for enqueue_next_pending_transaction
896            mocks
897                .tx_repo
898                .expect_find_by_status_paginated()
899                .returning(move |_, _, _, _| {
900                    Ok(PaginatedResult {
901                        items: vec![],
902                        total: 0,
903                        page: 1,
904                        per_page: 1,
905                    })
906                });
907
908            let handler = make_stellar_tx_handler(relayer.clone(), mocks);
909            let mut tx = create_test_transaction(&relayer.id);
910            tx.status = TransactionStatus::Sent;
911            if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
912                data.signatures.push(dummy_signature());
913                data.signed_envelope_xdr = Some(create_signed_xdr(TEST_PK, TEST_PK_2));
914            }
915
916            let res = handler.submit_transaction_impl(tx).await;
917            assert!(res.is_err());
918            let err = res.unwrap_err();
919            assert!(
920                matches!(err, TransactionError::UnexpectedError(ref msg) if msg.contains("TRY_AGAIN_LATER"))
921            );
922        }
923
924        #[tokio::test]
925        async fn submit_transaction_error_status_fails() {
926            let relayer = create_test_relayer();
927            let mut mocks = default_test_mocks();
928
929            // Provider returns ERROR status with error XDR
930            let mut response = create_send_tx_response(
931                "ERROR",
932                "0101010101010101010101010101010101010101010101010101010101010101",
933            );
934            response.error_result_xdr = Some("AAAAAAAAAGT////7AAAAAA==".to_string());
935            mocks
936                .provider
937                .expect_send_transaction_with_status()
938                .returning(move |_| {
939                    let r = response.clone();
940                    Box::pin(async move { Ok(r) })
941                });
942
943            // Mock finalize_transaction_state for failure handling
944            mocks
945                .tx_repo
946                .expect_partial_update()
947                .withf(|_, upd| upd.status == Some(TransactionStatus::Failed))
948                .returning(|id, upd| {
949                    let mut tx = create_test_transaction("relayer-1");
950                    tx.id = id;
951                    tx.status = upd.status.unwrap();
952                    Ok::<_, RepositoryError>(tx)
953                });
954
955            // Mock notification for failed transaction
956            mocks
957                .job_producer
958                .expect_produce_send_notification_job()
959                .times(1)
960                .returning(|_, _| Box::pin(async { Ok(()) }));
961
962            // Mock find_by_status_paginated for enqueue_next_pending_transaction
963            mocks
964                .tx_repo
965                .expect_find_by_status_paginated()
966                .returning(move |_, _, _, _| {
967                    Ok(PaginatedResult {
968                        items: vec![],
969                        total: 0,
970                        page: 1,
971                        per_page: 1,
972                    })
973                });
974
975            let handler = make_stellar_tx_handler(relayer.clone(), mocks);
976            let mut tx = create_test_transaction(&relayer.id);
977            tx.status = TransactionStatus::Sent;
978            if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
979                data.signatures.push(dummy_signature());
980                data.signed_envelope_xdr = Some(create_signed_xdr(TEST_PK, TEST_PK_2));
981            }
982
983            let res = handler.submit_transaction_impl(tx).await;
984            assert!(res.is_err());
985            let err = res.unwrap_err();
986            // The error should contain the error XDR
987            assert!(matches!(
988                err,
989                TransactionError::UnexpectedError(ref msg) if msg.contains("AAAAAAAAAGT")
990            ));
991        }
992    }
993}