1use crate::domain::transaction::common::is_final_state;
4use crate::metrics::{
5 TRANSACTIONS_BY_STATUS, TRANSACTIONS_CREATED, TRANSACTIONS_FAILED, TRANSACTIONS_SUBMITTED,
6 TRANSACTIONS_SUCCESS, TRANSACTION_PROCESSING_TIME,
7};
8use crate::models::{
9 NetworkTransactionData, PaginationQuery, RepositoryError, TransactionRepoModel,
10 TransactionStatus, TransactionUpdateRequest,
11};
12use crate::repositories::redis_base::RedisRepository;
13use crate::repositories::{
14 BatchDeleteResult, BatchRetrievalResult, PaginatedResult, Repository, TransactionDeleteRequest,
15 TransactionRepository,
16};
17use crate::utils::RedisConnections;
18use async_trait::async_trait;
19use chrono::Utc;
20use redis::{AsyncCommands, Script};
21use std::fmt;
22use std::sync::Arc;
23use tracing::{debug, error, warn};
24
25const RELAYER_PREFIX: &str = "relayer";
26const TX_PREFIX: &str = "tx";
27const STATUS_PREFIX: &str = "status";
28const STATUS_SORTED_PREFIX: &str = "status_sorted";
29const NONCE_PREFIX: &str = "nonce";
30const TX_TO_RELAYER_PREFIX: &str = "tx_to_relayer";
31const RELAYER_LIST_KEY: &str = "relayer_list";
32const TX_BY_CREATED_AT_PREFIX: &str = "tx_by_created_at";
33
34#[derive(Clone)]
35pub struct RedisTransactionRepository {
36 pub connections: Arc<RedisConnections>,
37 pub key_prefix: String,
38}
39
40impl RedisRepository for RedisTransactionRepository {}
41
42impl RedisTransactionRepository {
43 pub fn new(
44 connections: Arc<RedisConnections>,
45 key_prefix: String,
46 ) -> Result<Self, RepositoryError> {
47 if key_prefix.is_empty() {
48 return Err(RepositoryError::InvalidData(
49 "Redis key prefix cannot be empty".to_string(),
50 ));
51 }
52
53 Ok(Self {
54 connections,
55 key_prefix,
56 })
57 }
58
59 fn tx_key(&self, relayer_id: &str, tx_id: &str) -> String {
61 format!(
62 "{}:{}:{}:{}:{}",
63 self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX, tx_id
64 )
65 }
66
67 fn tx_to_relayer_key(&self, tx_id: &str) -> String {
69 format!(
70 "{}:{}:{}:{}",
71 self.key_prefix, RELAYER_PREFIX, TX_TO_RELAYER_PREFIX, tx_id
72 )
73 }
74
75 fn relayer_status_key(&self, relayer_id: &str, status: &TransactionStatus) -> String {
77 format!(
78 "{}:{}:{}:{}:{}",
79 self.key_prefix, RELAYER_PREFIX, relayer_id, STATUS_PREFIX, status
80 )
81 }
82
83 fn relayer_status_sorted_key(&self, relayer_id: &str, status: &TransactionStatus) -> String {
86 format!(
87 "{}:{}:{}:{}:{}",
88 self.key_prefix, RELAYER_PREFIX, relayer_id, STATUS_SORTED_PREFIX, status
89 )
90 }
91
92 fn relayer_nonce_key(&self, relayer_id: &str, nonce: u64) -> String {
94 format!(
95 "{}:{}:{}:{}:{}",
96 self.key_prefix, RELAYER_PREFIX, relayer_id, NONCE_PREFIX, nonce
97 )
98 }
99
100 fn relayer_list_key(&self) -> String {
102 format!("{}:{}", self.key_prefix, RELAYER_LIST_KEY)
103 }
104
105 fn relayer_tx_by_created_at_key(&self, relayer_id: &str) -> String {
107 format!(
108 "{}:{}:{}:{}",
109 self.key_prefix, RELAYER_PREFIX, relayer_id, TX_BY_CREATED_AT_PREFIX
110 )
111 }
112
113 fn timestamp_to_score(&self, timestamp: &str) -> f64 {
115 chrono::DateTime::parse_from_rfc3339(timestamp)
116 .map(|dt| dt.timestamp_millis() as f64)
117 .unwrap_or_else(|_| {
118 warn!(timestamp = %timestamp, "failed to parse timestamp, using 0");
119 0.0
120 })
121 }
122
123 fn status_sorted_score(&self, tx: &TransactionRepoModel) -> f64 {
127 if tx.status == TransactionStatus::Confirmed {
128 if let Some(ref confirmed_at) = tx.confirmed_at {
130 return self.timestamp_to_score(confirmed_at);
131 }
132 warn!(tx_id = %tx.id, "Confirmed transaction missing confirmed_at, using created_at");
134 }
135 self.timestamp_to_score(&tx.created_at)
136 }
137
138 async fn get_transactions_by_ids(
140 &self,
141 ids: &[String],
142 ) -> Result<BatchRetrievalResult<TransactionRepoModel>, RepositoryError> {
143 if ids.is_empty() {
144 debug!("no transaction IDs provided for batch fetch");
145 return Ok(BatchRetrievalResult {
146 results: vec![],
147 failed_ids: vec![],
148 });
149 }
150
151 let mut conn = self
152 .get_connection(self.connections.reader(), "batch_fetch_transactions")
153 .await?;
154
155 let reverse_keys: Vec<String> = ids.iter().map(|id| self.tx_to_relayer_key(id)).collect();
156
157 debug!(count = %ids.len(), "fetching relayer IDs for transactions");
158
159 let relayer_ids: Vec<Option<String>> = conn
160 .mget(&reverse_keys)
161 .await
162 .map_err(|e| self.map_redis_error(e, "batch_fetch_relayer_ids"))?;
163
164 let mut tx_keys = Vec::new();
165 let mut valid_ids = Vec::new();
166 let mut failed_ids = Vec::new();
167 for (i, relayer_id) in relayer_ids.into_iter().enumerate() {
168 match relayer_id {
169 Some(relayer_id) => {
170 tx_keys.push(self.tx_key(&relayer_id, &ids[i]));
171 valid_ids.push(ids[i].clone());
172 }
173 None => {
174 warn!(tx_id = %ids[i], "no relayer found for transaction");
175 failed_ids.push(ids[i].clone());
176 }
177 }
178 }
179
180 if tx_keys.is_empty() {
181 debug!("no valid transactions found for batch fetch");
182 return Ok(BatchRetrievalResult {
183 results: vec![],
184 failed_ids,
185 });
186 }
187
188 debug!(count = %tx_keys.len(), "batch fetching transaction data");
189
190 let values: Vec<Option<String>> = conn
191 .mget(&tx_keys)
192 .await
193 .map_err(|e| self.map_redis_error(e, "batch_fetch_transactions"))?;
194
195 let mut transactions = Vec::new();
196 let mut failed_count = 0;
197 let mut failed_ids = Vec::new();
198 for (i, value) in values.into_iter().enumerate() {
199 match value {
200 Some(json) => {
201 match self.deserialize_entity::<TransactionRepoModel>(
202 &json,
203 &valid_ids[i],
204 "transaction",
205 ) {
206 Ok(tx) => transactions.push(tx),
207 Err(e) => {
208 failed_count += 1;
209 error!(tx_id = %valid_ids[i], error = %e, "failed to deserialize transaction");
210 }
212 }
213 }
214 None => {
215 warn!(tx_id = %valid_ids[i], "transaction not found in batch fetch");
216 failed_ids.push(valid_ids[i].clone());
217 }
218 }
219 }
220
221 if failed_count > 0 {
222 warn!(failed_count = %failed_count, total_count = %valid_ids.len(), "failed to deserialize transactions in batch");
223 }
224
225 debug!(count = %transactions.len(), "successfully fetched transactions");
226 Ok(BatchRetrievalResult {
227 results: transactions,
228 failed_ids,
229 })
230 }
231
232 fn extract_nonce(&self, network_data: &NetworkTransactionData) -> Option<u64> {
234 match network_data.get_evm_transaction_data() {
235 Ok(tx_data) => tx_data.nonce,
236 Err(_) => {
237 debug!("no EVM transaction data available for nonce extraction");
238 None
239 }
240 }
241 }
242
243 async fn ensure_status_sorted_set(
260 &self,
261 relayer_id: &str,
262 status: &TransactionStatus,
263 ) -> Result<u64, RepositoryError> {
264 let sorted_key = self.relayer_status_sorted_key(relayer_id, status);
265 let legacy_key = self.relayer_status_key(relayer_id, status);
266
267 let legacy_ids = {
269 let mut conn = self
270 .get_connection(self.connections.primary(), "ensure_status_sorted_set_check")
271 .await?;
272
273 let legacy_count: u64 = conn
275 .scard(&legacy_key)
276 .await
277 .map_err(|e| self.map_redis_error(e, "ensure_status_sorted_set_scard"))?;
278
279 if legacy_count == 0 {
280 let sorted_count: u64 = conn
282 .zcard(&sorted_key)
283 .await
284 .map_err(|e| self.map_redis_error(e, "ensure_status_sorted_set_zcard"))?;
285 return Ok(sorted_count);
286 }
287
288 debug!(
290 relayer_id = %relayer_id,
291 status = %status,
292 legacy_count = %legacy_count,
293 "migrating status set to sorted set"
294 );
295
296 let ids: Vec<String> = conn
297 .smembers(&legacy_key)
298 .await
299 .map_err(|e| self.map_redis_error(e, "ensure_status_sorted_set_smembers"))?;
300
301 ids
302 };
304
305 if legacy_ids.is_empty() {
306 return Ok(0);
307 }
308
309 let transactions = self.get_transactions_by_ids(&legacy_ids).await?;
311
312 let mut conn = self
314 .get_connection(
315 self.connections.primary(),
316 "ensure_status_sorted_set_migrate",
317 )
318 .await?;
319
320 if transactions.results.is_empty() {
321 let _: () = conn
323 .del(&legacy_key)
324 .await
325 .map_err(|e| self.map_redis_error(e, "ensure_status_sorted_set_del_stale"))?;
326 return Ok(0);
327 }
328
329 let mut pipe = redis::pipe();
332 pipe.atomic();
333
334 for tx in &transactions.results {
335 let score = self.status_sorted_score(tx);
336 pipe.zadd(&sorted_key, &tx.id, score);
337 }
338
339 pipe.del(&legacy_key);
341
342 pipe.query_async::<()>(&mut conn)
343 .await
344 .map_err(|e| self.map_redis_error(e, "ensure_status_sorted_set_migrate"))?;
345
346 let migrated_count = transactions.results.len() as u64;
347 debug!(
348 relayer_id = %relayer_id,
349 status = %status,
350 migrated_count = %migrated_count,
351 "completed migration of status set to sorted set"
352 );
353
354 Ok(migrated_count)
355 }
356
357 async fn update_indexes(
359 &self,
360 tx: &TransactionRepoModel,
361 old_tx: Option<&TransactionRepoModel>,
362 ) -> Result<(), RepositoryError> {
363 let mut conn = self
364 .get_connection(self.connections.primary(), "update_indexes")
365 .await?;
366 let mut pipe = redis::pipe();
367 pipe.atomic();
368
369 debug!(tx_id = %tx.id, "updating indexes for transaction");
370
371 let relayer_list_key = self.relayer_list_key();
373 pipe.sadd(&relayer_list_key, &tx.relayer_id);
374
375 let status_score = self.status_sorted_score(tx);
378 let created_at_score = self.timestamp_to_score(&tx.created_at);
380
381 let new_status_sorted_key = self.relayer_status_sorted_key(&tx.relayer_id, &tx.status);
383 pipe.zadd(&new_status_sorted_key, &tx.id, status_score);
384 debug!(tx_id = %tx.id, status = %tx.status, score = %status_score, "added transaction to status sorted set");
385
386 if let Some(nonce) = self.extract_nonce(&tx.network_data) {
387 let nonce_key = self.relayer_nonce_key(&tx.relayer_id, nonce);
388 pipe.set(&nonce_key, &tx.id);
389 debug!(tx_id = %tx.id, nonce = %nonce, "added nonce index for transaction");
390 }
391
392 let relayer_sorted_key = self.relayer_tx_by_created_at_key(&tx.relayer_id);
394 pipe.zadd(&relayer_sorted_key, &tx.id, created_at_score);
395 debug!(tx_id = %tx.id, score = %created_at_score, "added transaction to sorted set by created_at");
396
397 if let Some(old) = old_tx {
399 if old.status != tx.status {
400 let old_status_sorted_key =
402 self.relayer_status_sorted_key(&old.relayer_id, &old.status);
403 pipe.zrem(&old_status_sorted_key, &tx.id);
404
405 let old_status_legacy_key = self.relayer_status_key(&old.relayer_id, &old.status);
407 pipe.srem(&old_status_legacy_key, &tx.id);
408
409 debug!(tx_id = %tx.id, old_status = %old.status, new_status = %tx.status, "removing old status indexes for transaction");
410 }
411
412 if let Some(old_nonce) = self.extract_nonce(&old.network_data) {
414 let new_nonce = self.extract_nonce(&tx.network_data);
415 if Some(old_nonce) != new_nonce {
416 let old_nonce_key = self.relayer_nonce_key(&old.relayer_id, old_nonce);
417 pipe.del(&old_nonce_key);
418 debug!(tx_id = %tx.id, old_nonce = %old_nonce, new_nonce = ?new_nonce, "removing old nonce index for transaction");
419 }
420 }
421 }
422
423 pipe.exec_async(&mut conn).await.map_err(|e| {
425 error!(tx_id = %tx.id, error = %e, "index update pipeline failed for transaction");
426 self.map_redis_error(e, &format!("update_indexes_for_tx_{}", tx.id))
427 })?;
428
429 debug!(tx_id = %tx.id, "successfully updated indexes for transaction");
430 Ok(())
431 }
432
433 async fn remove_all_indexes(&self, tx: &TransactionRepoModel) -> Result<(), RepositoryError> {
435 let mut conn = self
436 .get_connection(self.connections.primary(), "remove_all_indexes")
437 .await?;
438 let mut pipe = redis::pipe();
439 pipe.atomic();
440
441 debug!(tx_id = %tx.id, "removing all indexes for transaction");
442
443 for status in &[
447 TransactionStatus::Canceled,
448 TransactionStatus::Pending,
449 TransactionStatus::Sent,
450 TransactionStatus::Submitted,
451 TransactionStatus::Mined,
452 TransactionStatus::Confirmed,
453 TransactionStatus::Failed,
454 TransactionStatus::Expired,
455 ] {
456 let status_sorted_key = self.relayer_status_sorted_key(&tx.relayer_id, status);
458 pipe.zrem(&status_sorted_key, &tx.id);
459
460 let status_legacy_key = self.relayer_status_key(&tx.relayer_id, status);
462 pipe.srem(&status_legacy_key, &tx.id);
463 }
464
465 if let Some(nonce) = self.extract_nonce(&tx.network_data) {
467 let nonce_key = self.relayer_nonce_key(&tx.relayer_id, nonce);
468 pipe.del(&nonce_key);
469 debug!(tx_id = %tx.id, nonce = %nonce, "removing nonce index for transaction");
470 }
471
472 let relayer_sorted_key = self.relayer_tx_by_created_at_key(&tx.relayer_id);
474 pipe.zrem(&relayer_sorted_key, &tx.id);
475 debug!(tx_id = %tx.id, "removing transaction from sorted set by created_at");
476
477 let reverse_key = self.tx_to_relayer_key(&tx.id);
479 pipe.del(&reverse_key);
480
481 pipe.exec_async(&mut conn).await.map_err(|e| {
482 error!(tx_id = %tx.id, error = %e, "index removal failed for transaction");
483 self.map_redis_error(e, &format!("remove_indexes_for_tx_{}", tx.id))
484 })?;
485
486 debug!(tx_id = %tx.id, "successfully removed all indexes for transaction");
487 Ok(())
488 }
489}
490
491impl fmt::Debug for RedisTransactionRepository {
492 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
493 f.debug_struct("RedisTransactionRepository")
494 .field("connections", &"<RedisConnections>")
495 .field("key_prefix", &self.key_prefix)
496 .finish()
497 }
498}
499
500#[async_trait]
501impl Repository<TransactionRepoModel, String> for RedisTransactionRepository {
502 async fn create(
503 &self,
504 entity: TransactionRepoModel,
505 ) -> Result<TransactionRepoModel, RepositoryError> {
506 if entity.id.is_empty() {
507 return Err(RepositoryError::InvalidData(
508 "Transaction ID cannot be empty".to_string(),
509 ));
510 }
511
512 let key = self.tx_key(&entity.relayer_id, &entity.id);
513 let reverse_key = self.tx_to_relayer_key(&entity.id);
514 let mut conn = self
515 .get_connection(self.connections.primary(), "create")
516 .await?;
517
518 debug!(tx_id = %entity.id, "creating transaction");
519
520 let value = self.serialize_entity(&entity, |t| &t.id, "transaction")?;
521
522 let existing: Option<String> = conn
524 .get(&reverse_key)
525 .await
526 .map_err(|e| self.map_redis_error(e, "create_transaction_check"))?;
527
528 if existing.is_some() {
529 return Err(RepositoryError::ConstraintViolation(format!(
530 "Transaction with ID {} already exists",
531 entity.id
532 )));
533 }
534
535 let mut pipe = redis::pipe();
537 pipe.atomic();
538 pipe.set(&key, &value);
539 pipe.set(&reverse_key, &entity.relayer_id);
540
541 pipe.exec_async(&mut conn)
542 .await
543 .map_err(|e| self.map_redis_error(e, "create_transaction"))?;
544
545 if let Err(e) = self.update_indexes(&entity, None).await {
547 error!(tx_id = %entity.id, error = %e, "failed to update indexes for new transaction");
548 return Err(e);
549 }
550
551 let network_type = format!("{:?}", entity.network_type).to_lowercase();
553 let relayer_id = entity.relayer_id.as_str();
554 TRANSACTIONS_CREATED
555 .with_label_values(&[relayer_id, &network_type])
556 .inc();
557
558 let status = &entity.status;
560 let status_str = format!("{status:?}").to_lowercase();
561 TRANSACTIONS_BY_STATUS
562 .with_label_values(&[relayer_id, &network_type, &status_str])
563 .inc();
564
565 debug!(tx_id = %entity.id, "successfully created transaction");
566 Ok(entity)
567 }
568
569 async fn get_by_id(&self, id: String) -> Result<TransactionRepoModel, RepositoryError> {
570 if id.is_empty() {
571 return Err(RepositoryError::InvalidData(
572 "Transaction ID cannot be empty".to_string(),
573 ));
574 }
575
576 let mut conn = self
577 .get_connection(self.connections.reader(), "get_by_id")
578 .await?;
579
580 debug!(tx_id = %id, "fetching transaction");
581
582 let reverse_key = self.tx_to_relayer_key(&id);
583 let relayer_id: Option<String> = conn
584 .get(&reverse_key)
585 .await
586 .map_err(|e| self.map_redis_error(e, "get_transaction_reverse_lookup"))?;
587
588 let relayer_id = match relayer_id {
589 Some(relayer_id) => relayer_id,
590 None => {
591 debug!(tx_id = %id, "transaction not found (no reverse lookup)");
592 return Err(RepositoryError::NotFound(format!(
593 "Transaction with ID {id} not found"
594 )));
595 }
596 };
597
598 let key = self.tx_key(&relayer_id, &id);
599 let value: Option<String> = conn
600 .get(&key)
601 .await
602 .map_err(|e| self.map_redis_error(e, "get_transaction_by_id"))?;
603
604 match value {
605 Some(json) => {
606 let tx =
607 self.deserialize_entity::<TransactionRepoModel>(&json, &id, "transaction")?;
608 debug!(tx_id = %id, "successfully fetched transaction");
609 Ok(tx)
610 }
611 None => {
612 debug!(tx_id = %id, "transaction not found");
613 Err(RepositoryError::NotFound(format!(
614 "Transaction with ID {id} not found"
615 )))
616 }
617 }
618 }
619
620 async fn list_all(&self) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
622 let mut conn = self
623 .get_connection(self.connections.reader(), "list_all")
624 .await?;
625
626 debug!("fetching all transactions sorted by created_at (newest first)");
627
628 let relayer_list_key = self.relayer_list_key();
630 let relayer_ids: Vec<String> = conn
631 .smembers(&relayer_list_key)
632 .await
633 .map_err(|e| self.map_redis_error(e, "list_all_relayer_ids"))?;
634
635 debug!(count = %relayer_ids.len(), "found relayers");
636
637 let mut all_tx_ids = Vec::new();
639 for relayer_id in relayer_ids {
640 let relayer_sorted_key = self.relayer_tx_by_created_at_key(&relayer_id);
641 let tx_ids: Vec<String> = redis::cmd("ZRANGE")
642 .arg(&relayer_sorted_key)
643 .arg(0)
644 .arg(-1)
645 .arg("REV")
646 .query_async(&mut conn)
647 .await
648 .map_err(|e| self.map_redis_error(e, "list_all_relayer_sorted"))?;
649
650 all_tx_ids.extend(tx_ids);
651 }
652
653 drop(conn);
655
656 let batch_result = self.get_transactions_by_ids(&all_tx_ids).await?;
658 let mut all_transactions = batch_result.results;
659
660 all_transactions.sort_by(|a, b| b.created_at.cmp(&a.created_at));
662
663 debug!(count = %all_transactions.len(), "found transactions");
664 Ok(all_transactions)
665 }
666
667 async fn list_paginated(
669 &self,
670 query: PaginationQuery,
671 ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
672 if query.per_page == 0 {
673 return Err(RepositoryError::InvalidData(
674 "per_page must be greater than 0".to_string(),
675 ));
676 }
677
678 let mut conn = self
679 .get_connection(self.connections.reader(), "list_paginated")
680 .await?;
681
682 debug!(page = %query.page, per_page = %query.per_page, "fetching paginated transactions sorted by created_at (newest first)");
683
684 let relayer_list_key = self.relayer_list_key();
686 let relayer_ids: Vec<String> = conn
687 .smembers(&relayer_list_key)
688 .await
689 .map_err(|e| self.map_redis_error(e, "list_paginated_relayer_ids"))?;
690
691 let mut all_tx_ids = Vec::new();
693 for relayer_id in relayer_ids {
694 let relayer_sorted_key = self.relayer_tx_by_created_at_key(&relayer_id);
695 let tx_ids: Vec<String> = redis::cmd("ZRANGE")
696 .arg(&relayer_sorted_key)
697 .arg(0)
698 .arg(-1)
699 .arg("REV")
700 .query_async(&mut conn)
701 .await
702 .map_err(|e| self.map_redis_error(e, "list_paginated_relayer_sorted"))?;
703
704 all_tx_ids.extend(tx_ids);
705 }
706
707 drop(conn);
709
710 let batch_result = self.get_transactions_by_ids(&all_tx_ids).await?;
712 let mut all_transactions = batch_result.results;
713
714 all_transactions.sort_by(|a, b| b.created_at.cmp(&a.created_at));
716
717 let total = all_transactions.len() as u64;
718 let start = ((query.page - 1) * query.per_page) as usize;
719 let end = (start + query.per_page as usize).min(all_transactions.len());
720
721 if start >= all_transactions.len() {
722 debug!(page = %query.page, total = %total, "page is beyond available data");
723 return Ok(PaginatedResult {
724 items: vec![],
725 total,
726 page: query.page,
727 per_page: query.per_page,
728 });
729 }
730
731 let items = all_transactions[start..end].to_vec();
732
733 debug!(count = %items.len(), page = %query.page, "successfully fetched transactions for page");
734
735 Ok(PaginatedResult {
736 items,
737 total,
738 page: query.page,
739 per_page: query.per_page,
740 })
741 }
742
743 async fn update(
744 &self,
745 id: String,
746 entity: TransactionRepoModel,
747 ) -> Result<TransactionRepoModel, RepositoryError> {
748 if id.is_empty() {
749 return Err(RepositoryError::InvalidData(
750 "Transaction ID cannot be empty".to_string(),
751 ));
752 }
753
754 debug!(tx_id = %id, "updating transaction");
755
756 let old_tx = self.get_by_id(id.clone()).await?;
758
759 let key = self.tx_key(&entity.relayer_id, &id);
760 let mut conn = self
761 .get_connection(self.connections.primary(), "update")
762 .await?;
763
764 let value = self.serialize_entity(&entity, |t| &t.id, "transaction")?;
765
766 let _: () = conn
768 .set(&key, value)
769 .await
770 .map_err(|e| self.map_redis_error(e, "update_transaction"))?;
771
772 self.update_indexes(&entity, Some(&old_tx)).await?;
774
775 debug!(tx_id = %id, "successfully updated transaction");
776 Ok(entity)
777 }
778
779 async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError> {
780 if id.is_empty() {
781 return Err(RepositoryError::InvalidData(
782 "Transaction ID cannot be empty".to_string(),
783 ));
784 }
785
786 debug!(tx_id = %id, "deleting transaction");
787
788 let tx = self.get_by_id(id.clone()).await?;
790
791 let key = self.tx_key(&tx.relayer_id, &id);
792 let reverse_key = self.tx_to_relayer_key(&id);
793 let mut conn = self
794 .get_connection(self.connections.primary(), "delete_by_id")
795 .await?;
796
797 let mut pipe = redis::pipe();
798 pipe.atomic();
799 pipe.del(&key);
800 pipe.del(&reverse_key);
801
802 pipe.exec_async(&mut conn)
803 .await
804 .map_err(|e| self.map_redis_error(e, "delete_transaction"))?;
805
806 if let Err(e) = self.remove_all_indexes(&tx).await {
808 error!(tx_id = %id, error = %e, "failed to remove indexes for deleted transaction");
809 }
810
811 debug!(tx_id = %id, "successfully deleted transaction");
812 Ok(())
813 }
814
815 async fn count(&self) -> Result<usize, RepositoryError> {
817 let mut conn = self
818 .get_connection(self.connections.reader(), "count")
819 .await?;
820
821 debug!("counting transactions");
822
823 let relayer_list_key = self.relayer_list_key();
825 let relayer_ids: Vec<String> = conn
826 .smembers(&relayer_list_key)
827 .await
828 .map_err(|e| self.map_redis_error(e, "count_relayer_ids"))?;
829
830 let mut total_count = 0usize;
831 for relayer_id in relayer_ids {
832 let relayer_sorted_key = self.relayer_tx_by_created_at_key(&relayer_id);
833 let count: usize = conn
834 .zcard(&relayer_sorted_key)
835 .await
836 .map_err(|e| self.map_redis_error(e, "count_relayer_transactions"))?;
837 total_count += count;
838 }
839
840 debug!(count = %total_count, "transaction count");
841 Ok(total_count)
842 }
843
844 async fn has_entries(&self) -> Result<bool, RepositoryError> {
845 let mut conn = self
846 .get_connection(self.connections.reader(), "has_entries")
847 .await?;
848 let relayer_list_key = self.relayer_list_key();
849
850 debug!("checking if transaction entries exist");
851
852 let exists: bool = conn
853 .exists(&relayer_list_key)
854 .await
855 .map_err(|e| self.map_redis_error(e, "has_entries_check"))?;
856
857 debug!(exists = %exists, "transaction entries exist");
858 Ok(exists)
859 }
860
861 async fn drop_all_entries(&self) -> Result<(), RepositoryError> {
862 let mut conn = self
863 .get_connection(self.connections.primary(), "drop_all_entries")
864 .await?;
865 let relayer_list_key = self.relayer_list_key();
866
867 debug!("dropping all transaction entries");
868
869 let relayer_ids: Vec<String> = conn
871 .smembers(&relayer_list_key)
872 .await
873 .map_err(|e| self.map_redis_error(e, "drop_all_entries_get_relayer_ids"))?;
874
875 if relayer_ids.is_empty() {
876 debug!("no transaction entries to drop");
877 return Ok(());
878 }
879
880 let mut pipe = redis::pipe();
882 pipe.atomic();
883
884 for relayer_id in &relayer_ids {
886 let pattern = format!(
888 "{}:{}:{}:{}:*",
889 self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX
890 );
891 let mut cursor = 0;
892 let mut tx_ids = Vec::new();
893
894 loop {
895 let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
896 .cursor_arg(cursor)
897 .arg("MATCH")
898 .arg(&pattern)
899 .query_async(&mut conn)
900 .await
901 .map_err(|e| self.map_redis_error(e, "drop_all_entries_scan"))?;
902
903 for key in keys {
905 pipe.del(&key);
906 if let Some(tx_id) = key.split(':').next_back() {
907 tx_ids.push(tx_id.to_string());
908 }
909 }
910
911 cursor = next_cursor;
912 if cursor == 0 {
913 break;
914 }
915 }
916
917 for tx_id in tx_ids {
919 let reverse_key = self.tx_to_relayer_key(&tx_id);
920 pipe.del(&reverse_key);
921
922 for status in &[
925 TransactionStatus::Canceled,
926 TransactionStatus::Pending,
927 TransactionStatus::Sent,
928 TransactionStatus::Submitted,
929 TransactionStatus::Mined,
930 TransactionStatus::Confirmed,
931 TransactionStatus::Failed,
932 TransactionStatus::Expired,
933 ] {
934 let status_sorted_key = self.relayer_status_sorted_key(relayer_id, status);
936 pipe.zrem(&status_sorted_key, &tx_id);
937
938 let status_key = self.relayer_status_key(relayer_id, status);
940 pipe.srem(&status_key, &tx_id);
941 }
942 }
943
944 let relayer_sorted_key = self.relayer_tx_by_created_at_key(relayer_id);
946 pipe.del(&relayer_sorted_key);
947 }
948
949 pipe.del(&relayer_list_key);
951
952 pipe.exec_async(&mut conn)
953 .await
954 .map_err(|e| self.map_redis_error(e, "drop_all_entries_pipeline"))?;
955
956 debug!(count = %relayer_ids.len(), "dropped all transaction entries for relayers");
957 Ok(())
958 }
959}
960
961#[async_trait]
962impl TransactionRepository for RedisTransactionRepository {
963 async fn find_by_relayer_id(
964 &self,
965 relayer_id: &str,
966 query: PaginationQuery,
967 ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
968 let mut conn = self
969 .get_connection(self.connections.reader(), "find_by_relayer_id")
970 .await?;
971
972 debug!(relayer_id = %relayer_id, page = %query.page, per_page = %query.per_page, "fetching transactions for relayer sorted by created_at (newest first)");
973
974 let relayer_sorted_key = self.relayer_tx_by_created_at_key(relayer_id);
975
976 let sorted_set_count: u64 = conn
978 .zcard(&relayer_sorted_key)
979 .await
980 .map_err(|e| self.map_redis_error(e, "find_by_relayer_id_count"))?;
981
982 if sorted_set_count == 0 {
985 debug!(relayer_id = %relayer_id, "no transactions found for relayer (sorted set is empty)");
986 return Ok(PaginatedResult {
987 items: vec![],
988 total: 0,
989 page: query.page,
990 per_page: query.per_page,
991 });
992 }
993
994 let total = sorted_set_count;
995
996 let start = ((query.page - 1) * query.per_page) as isize;
998 let end = start + query.per_page as isize - 1;
999
1000 if start as u64 >= total {
1001 debug!(relayer_id = %relayer_id, page = %query.page, total = %total, "page is beyond available data");
1002 return Ok(PaginatedResult {
1003 items: vec![],
1004 total,
1005 page: query.page,
1006 per_page: query.per_page,
1007 });
1008 }
1009
1010 let page_ids: Vec<String> = redis::cmd("ZRANGE")
1012 .arg(&relayer_sorted_key)
1013 .arg(start)
1014 .arg(end)
1015 .arg("REV")
1016 .query_async(&mut conn)
1017 .await
1018 .map_err(|e| self.map_redis_error(e, "find_by_relayer_id_sorted"))?;
1019
1020 drop(conn);
1022
1023 let items = self.get_transactions_by_ids(&page_ids).await?;
1024
1025 debug!(relayer_id = %relayer_id, count = %items.results.len(), page = %query.page, "successfully fetched transactions for relayer");
1026
1027 Ok(PaginatedResult {
1028 items: items.results,
1029 total,
1030 page: query.page,
1031 per_page: query.per_page,
1032 })
1033 }
1034
1035 async fn find_by_status(
1037 &self,
1038 relayer_id: &str,
1039 statuses: &[TransactionStatus],
1040 ) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
1041 for status in statuses {
1043 self.ensure_status_sorted_set(relayer_id, status).await?;
1044 }
1045
1046 let mut conn = self
1048 .get_connection(self.connections.reader(), "find_by_status")
1049 .await?;
1050
1051 let mut all_ids: Vec<String> = Vec::new();
1052 for status in statuses {
1053 let sorted_key = self.relayer_status_sorted_key(relayer_id, status);
1055 let ids: Vec<String> = redis::cmd("ZRANGE")
1056 .arg(&sorted_key)
1057 .arg(0)
1058 .arg(-1)
1059 .arg("REV") .query_async(&mut conn)
1061 .await
1062 .map_err(|e| self.map_redis_error(e, "find_by_status"))?;
1063
1064 all_ids.extend(ids);
1065 }
1066
1067 drop(conn);
1069
1070 if all_ids.is_empty() {
1071 return Ok(vec![]);
1072 }
1073
1074 all_ids.sort();
1076 all_ids.dedup();
1077
1078 let mut transactions = self.get_transactions_by_ids(&all_ids).await?;
1080
1081 transactions
1083 .results
1084 .sort_by(|a, b| b.created_at.cmp(&a.created_at));
1085
1086 Ok(transactions.results)
1087 }
1088
1089 async fn find_by_status_paginated(
1090 &self,
1091 relayer_id: &str,
1092 statuses: &[TransactionStatus],
1093 query: PaginationQuery,
1094 oldest_first: bool,
1095 ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
1096 for status in statuses {
1098 self.ensure_status_sorted_set(relayer_id, status).await?;
1099 }
1100
1101 let mut conn = self
1102 .get_connection(self.connections.reader(), "find_by_status_paginated")
1103 .await?;
1104
1105 if statuses.len() == 1 {
1107 let sorted_key = self.relayer_status_sorted_key(relayer_id, &statuses[0]);
1108
1109 let total: u64 = conn
1111 .zcard(&sorted_key)
1112 .await
1113 .map_err(|e| self.map_redis_error(e, "find_by_status_paginated_count"))?;
1114
1115 if total == 0 {
1116 return Ok(PaginatedResult {
1117 items: vec![],
1118 total: 0,
1119 page: query.page,
1120 per_page: query.per_page,
1121 });
1122 }
1123
1124 let start = ((query.page.saturating_sub(1)) * query.per_page) as isize;
1126 let end = start + query.per_page as isize - 1;
1127
1128 let mut cmd = redis::cmd("ZRANGE");
1131 cmd.arg(&sorted_key).arg(start).arg(end);
1132 if !oldest_first {
1133 cmd.arg("REV");
1134 }
1135 let page_ids: Vec<String> = cmd
1136 .query_async(&mut conn)
1137 .await
1138 .map_err(|e| self.map_redis_error(e, "find_by_status_paginated"))?;
1139
1140 drop(conn);
1142
1143 let transactions = self.get_transactions_by_ids(&page_ids).await?;
1144
1145 debug!(
1146 relayer_id = %relayer_id,
1147 status = %statuses[0],
1148 total = %total,
1149 page = %query.page,
1150 page_size = %transactions.results.len(),
1151 "fetched paginated transactions by single status"
1152 );
1153
1154 return Ok(PaginatedResult {
1155 items: transactions.results,
1156 total,
1157 page: query.page,
1158 per_page: query.per_page,
1159 });
1160 }
1161
1162 let mut all_ids: Vec<(String, f64)> = Vec::new();
1164 for status in statuses {
1165 let sorted_key = self.relayer_status_sorted_key(relayer_id, status);
1166
1167 let ids_with_scores: Vec<(String, f64)> = redis::cmd("ZRANGE")
1169 .arg(&sorted_key)
1170 .arg(0)
1171 .arg(-1)
1172 .arg("WITHSCORES")
1173 .query_async(&mut conn)
1174 .await
1175 .map_err(|e| self.map_redis_error(e, "find_by_status_paginated_multi"))?;
1176
1177 all_ids.extend(ids_with_scores);
1178 }
1179
1180 drop(conn);
1182
1183 let mut id_map: std::collections::HashMap<String, f64> = std::collections::HashMap::new();
1185 for (id, score) in all_ids {
1186 id_map
1187 .entry(id)
1188 .and_modify(|s| {
1189 if oldest_first {
1191 if score < *s {
1192 *s = score
1193 }
1194 } else if score > *s {
1195 *s = score
1196 }
1197 })
1198 .or_insert(score);
1199 }
1200
1201 let mut sorted_ids: Vec<(String, f64)> = id_map.into_iter().collect();
1203 if oldest_first {
1204 sorted_ids.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
1205 } else {
1206 sorted_ids.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
1207 }
1208
1209 let total = sorted_ids.len() as u64;
1210
1211 if total == 0 {
1212 return Ok(PaginatedResult {
1213 items: vec![],
1214 total: 0,
1215 page: query.page,
1216 per_page: query.per_page,
1217 });
1218 }
1219
1220 let start = ((query.page.saturating_sub(1)) * query.per_page) as usize;
1222 let page_ids: Vec<String> = sorted_ids
1223 .into_iter()
1224 .skip(start)
1225 .take(query.per_page as usize)
1226 .map(|(id, _)| id)
1227 .collect();
1228
1229 let transactions = self.get_transactions_by_ids(&page_ids).await?;
1231
1232 debug!(
1233 relayer_id = %relayer_id,
1234 total = %total,
1235 page = %query.page,
1236 page_size = %transactions.results.len(),
1237 "fetched paginated transactions by status"
1238 );
1239
1240 Ok(PaginatedResult {
1241 items: transactions.results,
1242 total,
1243 page: query.page,
1244 per_page: query.per_page,
1245 })
1246 }
1247
1248 async fn find_by_nonce(
1249 &self,
1250 relayer_id: &str,
1251 nonce: u64,
1252 ) -> Result<Option<TransactionRepoModel>, RepositoryError> {
1253 let mut conn = self
1254 .get_connection(self.connections.reader(), "find_by_nonce")
1255 .await?;
1256 let nonce_key = self.relayer_nonce_key(relayer_id, nonce);
1257
1258 let tx_id: Option<String> = conn
1260 .get(nonce_key)
1261 .await
1262 .map_err(|e| self.map_redis_error(e, "find_by_nonce"))?;
1263
1264 match tx_id {
1265 Some(tx_id) => {
1266 match self.get_by_id(tx_id.clone()).await {
1267 Ok(tx) => Ok(Some(tx)),
1268 Err(RepositoryError::NotFound(_)) => {
1269 warn!(relayer_id = %relayer_id, nonce = %nonce, "stale nonce index found for relayer");
1271 Ok(None)
1272 }
1273 Err(e) => Err(e),
1274 }
1275 }
1276 None => Ok(None),
1277 }
1278 }
1279
1280 async fn update_status(
1281 &self,
1282 tx_id: String,
1283 status: TransactionStatus,
1284 ) -> Result<TransactionRepoModel, RepositoryError> {
1285 let update = TransactionUpdateRequest {
1286 status: Some(status),
1287 ..Default::default()
1288 };
1289 self.partial_update(tx_id, update).await
1290 }
1291
1292 async fn partial_update(
1293 &self,
1294 tx_id: String,
1295 update: TransactionUpdateRequest,
1296 ) -> Result<TransactionRepoModel, RepositoryError> {
1297 const MAX_RETRIES: u32 = 3;
1298 const BACKOFF_MS: u64 = 100;
1299
1300 let mut original_tx = self.get_by_id(tx_id.clone()).await?;
1303 let mut updated_tx = original_tx.clone();
1304 updated_tx.apply_partial_update(update.clone());
1305
1306 let key = self.tx_key(&updated_tx.relayer_id, &tx_id);
1307 let mut original_value = self.serialize_entity(&original_tx, |t| &t.id, "transaction")?;
1308 let mut updated_value = self.serialize_entity(&updated_tx, |t| &t.id, "transaction")?;
1309 let mut data_updated = false;
1310
1311 let mut last_error = None;
1312
1313 for attempt in 0..MAX_RETRIES {
1314 let mut conn = match self
1315 .get_connection(self.connections.primary(), "partial_update")
1316 .await
1317 {
1318 Ok(conn) => conn,
1319 Err(e) => {
1320 last_error = Some(e);
1321 if attempt < MAX_RETRIES - 1 {
1322 tokio::time::sleep(tokio::time::Duration::from_millis(BACKOFF_MS)).await;
1323 continue;
1324 }
1325 return Err(last_error.unwrap());
1326 }
1327 };
1328
1329 if !data_updated {
1330 let cas_script = Script::new(
1331 r#"
1332 local current = redis.call('GET', KEYS[1])
1333 if not current then
1334 return -1
1335 end
1336 if current == ARGV[1] then
1337 redis.call('SET', KEYS[1], ARGV[2])
1338 return 1
1339 end
1340 return 0
1341 "#,
1342 );
1343
1344 let cas_result: i32 = match cas_script
1345 .key(&key)
1346 .arg(&original_value)
1347 .arg(&updated_value)
1348 .invoke_async(&mut conn)
1349 .await
1350 {
1351 Ok(result) => result,
1352 Err(e) => {
1353 if attempt < MAX_RETRIES - 1 {
1354 warn!(tx_id = %tx_id, attempt = %attempt, error = %e, "failed CAS transaction update, retrying");
1355 last_error = Some(self.map_redis_error(e, "partial_update_cas"));
1356 tokio::time::sleep(tokio::time::Duration::from_millis(BACKOFF_MS))
1357 .await;
1358 continue;
1359 }
1360 return Err(self.map_redis_error(e, "partial_update_cas"));
1361 }
1362 };
1363
1364 if cas_result == -1 {
1365 return Err(RepositoryError::NotFound(format!(
1366 "Transaction with ID {tx_id} not found"
1367 )));
1368 }
1369
1370 if cas_result == 0 {
1371 if attempt < MAX_RETRIES - 1 {
1372 warn!(tx_id = %tx_id, attempt = %attempt, "concurrent transaction update detected, rebasing retry");
1373 original_tx = self.get_by_id(tx_id.clone()).await?;
1374 updated_tx = original_tx.clone();
1375 updated_tx.apply_partial_update(update.clone());
1376 original_value =
1377 self.serialize_entity(&original_tx, |t| &t.id, "transaction")?;
1378 updated_value =
1379 self.serialize_entity(&updated_tx, |t| &t.id, "transaction")?;
1380 tokio::time::sleep(tokio::time::Duration::from_millis(BACKOFF_MS)).await;
1381 continue;
1382 }
1383 return Err(RepositoryError::TransactionFailure(format!(
1384 "Concurrent update conflict for transaction {tx_id}"
1385 )));
1386 }
1387
1388 data_updated = true;
1389 }
1390
1391 match self.update_indexes(&updated_tx, Some(&original_tx)).await {
1394 Ok(_) => {
1395 debug!(tx_id = %tx_id, attempt = %attempt, "successfully updated transaction");
1396
1397 if let Some(new_status) = &update.status {
1399 let network_type = format!("{:?}", updated_tx.network_type).to_lowercase();
1400 let relayer_id = updated_tx.relayer_id.as_str();
1401
1402 if original_tx.status != TransactionStatus::Submitted
1404 && *new_status == TransactionStatus::Submitted
1405 {
1406 TRANSACTIONS_SUBMITTED
1407 .with_label_values(&[relayer_id, &network_type])
1408 .inc();
1409
1410 if let Ok(created_time) =
1412 chrono::DateTime::parse_from_rfc3339(&updated_tx.created_at)
1413 {
1414 let processing_seconds =
1415 (Utc::now() - created_time.with_timezone(&Utc)).num_seconds()
1416 as f64;
1417 TRANSACTION_PROCESSING_TIME
1418 .with_label_values(&[
1419 relayer_id,
1420 &network_type,
1421 "creation_to_submission",
1422 ])
1423 .observe(processing_seconds);
1424 }
1425 }
1426
1427 if original_tx.status != *new_status {
1429 let old_status = &original_tx.status;
1431 let old_status_str = format!("{old_status:?}").to_lowercase();
1432 let old_status_gauge = TRANSACTIONS_BY_STATUS.with_label_values(&[
1433 relayer_id,
1434 &network_type,
1435 &old_status_str,
1436 ]);
1437 let clamped_value = (old_status_gauge.get() - 1.0).max(0.0);
1438 old_status_gauge.set(clamped_value);
1439
1440 let new_status_str = format!("{new_status:?}").to_lowercase();
1442 TRANSACTIONS_BY_STATUS
1443 .with_label_values(&[relayer_id, &network_type, &new_status_str])
1444 .inc();
1445 }
1446
1447 let was_final = is_final_state(&original_tx.status);
1450 let is_final = is_final_state(new_status);
1451
1452 if !was_final && is_final {
1453 match new_status {
1454 TransactionStatus::Confirmed => {
1455 TRANSACTIONS_SUCCESS
1456 .with_label_values(&[relayer_id, &network_type])
1457 .inc();
1458
1459 if let (Some(sent_at_str), Some(confirmed_at_str)) =
1461 (&updated_tx.sent_at, &updated_tx.confirmed_at)
1462 {
1463 if let (Ok(sent_time), Ok(confirmed_time)) = (
1464 chrono::DateTime::parse_from_rfc3339(sent_at_str),
1465 chrono::DateTime::parse_from_rfc3339(confirmed_at_str),
1466 ) {
1467 let processing_seconds = (confirmed_time
1468 .with_timezone(&Utc)
1469 - sent_time.with_timezone(&Utc))
1470 .num_seconds()
1471 as f64;
1472 TRANSACTION_PROCESSING_TIME
1473 .with_label_values(&[
1474 relayer_id,
1475 &network_type,
1476 "submission_to_confirmation",
1477 ])
1478 .observe(processing_seconds);
1479 }
1480 }
1481
1482 if let Ok(created_time) =
1484 chrono::DateTime::parse_from_rfc3339(&updated_tx.created_at)
1485 {
1486 if let Some(confirmed_at_str) = &updated_tx.confirmed_at {
1487 if let Ok(confirmed_time) =
1488 chrono::DateTime::parse_from_rfc3339(
1489 confirmed_at_str,
1490 )
1491 {
1492 let processing_seconds = (confirmed_time
1493 .with_timezone(&Utc)
1494 - created_time.with_timezone(&Utc))
1495 .num_seconds()
1496 as f64;
1497 TRANSACTION_PROCESSING_TIME
1498 .with_label_values(&[
1499 relayer_id,
1500 &network_type,
1501 "creation_to_confirmation",
1502 ])
1503 .observe(processing_seconds);
1504 }
1505 }
1506 }
1507 }
1508 TransactionStatus::Failed => {
1509 let failure_reason = updated_tx
1511 .status_reason
1512 .as_deref()
1513 .map(|reason| {
1514 if reason.starts_with("Submission failed:") {
1515 "submission_failed"
1516 } else if reason.starts_with("Preparation failed:") {
1517 "preparation_failed"
1518 } else {
1519 "failed"
1520 }
1521 })
1522 .unwrap_or("failed");
1523 TRANSACTIONS_FAILED
1524 .with_label_values(&[
1525 relayer_id,
1526 &network_type,
1527 failure_reason,
1528 ])
1529 .inc();
1530 }
1531 TransactionStatus::Expired => {
1532 TRANSACTIONS_FAILED
1533 .with_label_values(&[relayer_id, &network_type, "expired"])
1534 .inc();
1535 }
1536 TransactionStatus::Canceled => {
1537 TRANSACTIONS_FAILED
1538 .with_label_values(&[relayer_id, &network_type, "canceled"])
1539 .inc();
1540 }
1541 _ => {
1542 }
1544 }
1545 }
1546 }
1547 return Ok(updated_tx);
1548 }
1549 Err(e) if attempt < MAX_RETRIES - 1 => {
1550 warn!(tx_id = %tx_id, attempt = %attempt, error = %e, "failed to update indexes, retrying");
1551 last_error = Some(e);
1552 tokio::time::sleep(tokio::time::Duration::from_millis(BACKOFF_MS)).await;
1553 continue;
1554 }
1555 Err(e) => return Err(e),
1556 }
1557 }
1558
1559 Err(last_error.unwrap_or_else(|| {
1560 RepositoryError::UnexpectedError("partial_update exhausted retries".to_string())
1561 }))
1562 }
1563
1564 async fn update_network_data(
1565 &self,
1566 tx_id: String,
1567 network_data: NetworkTransactionData,
1568 ) -> Result<TransactionRepoModel, RepositoryError> {
1569 let update = TransactionUpdateRequest {
1570 network_data: Some(network_data),
1571 ..Default::default()
1572 };
1573 self.partial_update(tx_id, update).await
1574 }
1575
1576 async fn set_sent_at(
1577 &self,
1578 tx_id: String,
1579 sent_at: String,
1580 ) -> Result<TransactionRepoModel, RepositoryError> {
1581 let update = TransactionUpdateRequest {
1582 sent_at: Some(sent_at),
1583 ..Default::default()
1584 };
1585 self.partial_update(tx_id, update).await
1586 }
1587
1588 async fn set_confirmed_at(
1589 &self,
1590 tx_id: String,
1591 confirmed_at: String,
1592 ) -> Result<TransactionRepoModel, RepositoryError> {
1593 let update = TransactionUpdateRequest {
1594 confirmed_at: Some(confirmed_at),
1595 ..Default::default()
1596 };
1597 self.partial_update(tx_id, update).await
1598 }
1599
1600 async fn count_by_status(
1604 &self,
1605 relayer_id: &str,
1606 statuses: &[TransactionStatus],
1607 ) -> Result<u64, RepositoryError> {
1608 let mut conn = self
1609 .get_connection(self.connections.reader(), "count_by_status")
1610 .await?;
1611 let mut total_count: u64 = 0;
1612
1613 for status in statuses {
1614 self.ensure_status_sorted_set(relayer_id, status).await?;
1616
1617 let sorted_key = self.relayer_status_sorted_key(relayer_id, status);
1618 let count: u64 = conn
1619 .zcard(&sorted_key)
1620 .await
1621 .map_err(|e| self.map_redis_error(e, "count_by_status"))?;
1622 total_count += count;
1623 }
1624
1625 debug!(relayer_id = %relayer_id, count = %total_count, "counted transactions by status");
1626 Ok(total_count)
1627 }
1628
1629 async fn delete_by_ids(&self, ids: Vec<String>) -> Result<BatchDeleteResult, RepositoryError> {
1630 if ids.is_empty() {
1631 debug!("no transaction IDs provided for batch delete");
1632 return Ok(BatchDeleteResult::default());
1633 }
1634
1635 debug!(count = %ids.len(), "batch deleting transactions by IDs (with fetch)");
1636
1637 let batch_result = self.get_transactions_by_ids(&ids).await?;
1639
1640 let requests: Vec<TransactionDeleteRequest> = batch_result
1642 .results
1643 .iter()
1644 .map(|tx| TransactionDeleteRequest {
1645 id: tx.id.clone(),
1646 relayer_id: tx.relayer_id.clone(),
1647 nonce: self.extract_nonce(&tx.network_data),
1648 })
1649 .collect();
1650
1651 let mut result = self.delete_by_requests(requests).await?;
1653
1654 for id in batch_result.failed_ids {
1656 result
1657 .failed
1658 .push((id.clone(), format!("Transaction with ID {id} not found")));
1659 }
1660
1661 Ok(result)
1662 }
1663
1664 async fn delete_by_requests(
1665 &self,
1666 requests: Vec<TransactionDeleteRequest>,
1667 ) -> Result<BatchDeleteResult, RepositoryError> {
1668 if requests.is_empty() {
1669 debug!("no delete requests provided for batch delete");
1670 return Ok(BatchDeleteResult::default());
1671 }
1672
1673 debug!(count = %requests.len(), "batch deleting transactions by requests (no fetch)");
1674 let mut conn = self
1675 .get_connection(self.connections.primary(), "batch_delete_no_fetch")
1676 .await?;
1677 let mut pipe = redis::pipe();
1678 pipe.atomic();
1679
1680 let all_statuses = [
1682 TransactionStatus::Canceled,
1683 TransactionStatus::Pending,
1684 TransactionStatus::Sent,
1685 TransactionStatus::Submitted,
1686 TransactionStatus::Mined,
1687 TransactionStatus::Confirmed,
1688 TransactionStatus::Failed,
1689 TransactionStatus::Expired,
1690 ];
1691
1692 for req in &requests {
1694 let tx_key = self.tx_key(&req.relayer_id, &req.id);
1696 pipe.del(&tx_key);
1697
1698 let reverse_key = self.tx_to_relayer_key(&req.id);
1700 pipe.del(&reverse_key);
1701
1702 for status in &all_statuses {
1704 let status_sorted_key = self.relayer_status_sorted_key(&req.relayer_id, status);
1705 pipe.zrem(&status_sorted_key, &req.id);
1706
1707 let status_legacy_key = self.relayer_status_key(&req.relayer_id, status);
1708 pipe.srem(&status_legacy_key, &req.id);
1709 }
1710
1711 if let Some(nonce) = req.nonce {
1713 let nonce_key = self.relayer_nonce_key(&req.relayer_id, nonce);
1714 pipe.del(&nonce_key);
1715 }
1716
1717 let relayer_sorted_key = self.relayer_tx_by_created_at_key(&req.relayer_id);
1719 pipe.zrem(&relayer_sorted_key, &req.id);
1720 }
1721
1722 match pipe.exec_async(&mut conn).await {
1724 Ok(_) => {
1725 let deleted_count = requests.len();
1726 debug!(
1727 deleted_count = %deleted_count,
1728 "batch delete completed"
1729 );
1730 Ok(BatchDeleteResult {
1731 deleted_count,
1732 failed: vec![],
1733 })
1734 }
1735 Err(e) => {
1736 error!(error = %e, "batch delete pipeline failed");
1737 let failed: Vec<(String, String)> = requests
1739 .iter()
1740 .map(|req| (req.id.clone(), format!("Redis pipeline error: {e}")))
1741 .collect();
1742 Ok(BatchDeleteResult {
1743 deleted_count: 0,
1744 failed,
1745 })
1746 }
1747 }
1748 }
1749}
1750
1751#[cfg(test)]
1752mod tests {
1753 use super::*;
1754 use crate::models::{evm::Speed, EvmTransactionData, NetworkType};
1755 use alloy::primitives::U256;
1756 use deadpool_redis::{Config, Runtime};
1757 use lazy_static::lazy_static;
1758 use std::str::FromStr;
1759 use tokio;
1760 use uuid::Uuid;
1761
1762 use tokio::sync::Mutex;
1763
1764 lazy_static! {
1766 static ref ENV_MUTEX: Mutex<()> = Mutex::new(());
1767 }
1768
1769 fn create_test_transaction(id: &str) -> TransactionRepoModel {
1771 TransactionRepoModel {
1772 id: id.to_string(),
1773 relayer_id: "relayer-1".to_string(),
1774 status: TransactionStatus::Pending,
1775 status_reason: None,
1776 created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
1777 sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
1778 confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
1779 valid_until: None,
1780 delete_at: None,
1781 network_type: NetworkType::Evm,
1782 priced_at: None,
1783 hashes: vec![],
1784 network_data: NetworkTransactionData::Evm(EvmTransactionData {
1785 gas_price: Some(1000000000),
1786 gas_limit: Some(21000),
1787 nonce: Some(1),
1788 value: U256::from_str("1000000000000000000").unwrap(),
1789 data: Some("0x".to_string()),
1790 from: "0xSender".to_string(),
1791 to: Some("0xRecipient".to_string()),
1792 chain_id: 1,
1793 signature: None,
1794 hash: Some(format!("0x{}", id)),
1795 speed: Some(Speed::Fast),
1796 max_fee_per_gas: None,
1797 max_priority_fee_per_gas: None,
1798 raw: None,
1799 }),
1800 noop_count: None,
1801 is_canceled: Some(false),
1802 }
1803 }
1804
1805 fn create_test_transaction_with_relayer(id: &str, relayer_id: &str) -> TransactionRepoModel {
1806 let mut tx = create_test_transaction(id);
1807 tx.relayer_id = relayer_id.to_string();
1808 tx
1809 }
1810
1811 fn create_test_transaction_with_status(
1812 id: &str,
1813 relayer_id: &str,
1814 status: TransactionStatus,
1815 ) -> TransactionRepoModel {
1816 let mut tx = create_test_transaction_with_relayer(id, relayer_id);
1817 tx.status = status;
1818 tx
1819 }
1820
1821 fn create_test_transaction_with_nonce(
1822 id: &str,
1823 nonce: u64,
1824 relayer_id: &str,
1825 ) -> TransactionRepoModel {
1826 let mut tx = create_test_transaction_with_relayer(id, relayer_id);
1827 if let NetworkTransactionData::Evm(ref mut evm_data) = tx.network_data {
1828 evm_data.nonce = Some(nonce);
1829 }
1830 tx
1831 }
1832
1833 async fn setup_test_repo() -> RedisTransactionRepository {
1834 let redis_url = std::env::var("REDIS_TEST_URL")
1836 .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
1837
1838 let cfg = Config::from_url(&redis_url);
1839 let pool = Arc::new(
1840 cfg.builder()
1841 .expect("Failed to create pool builder")
1842 .max_size(16)
1843 .runtime(Runtime::Tokio1)
1844 .build()
1845 .expect("Failed to build Redis pool"),
1846 );
1847
1848 let connections = Arc::new(RedisConnections::new_single_pool(pool));
1850
1851 let random_id = Uuid::new_v4().to_string();
1852 let key_prefix = format!("test_prefix:{}", random_id);
1853
1854 RedisTransactionRepository::new(connections, key_prefix)
1855 .expect("Failed to create RedisTransactionRepository")
1856 }
1857
1858 #[tokio::test]
1859 #[ignore = "Requires active Redis instance"]
1860 async fn test_new_repository_creation() {
1861 let repo = setup_test_repo().await;
1862 assert!(repo.key_prefix.contains("test_prefix"));
1863 }
1864
1865 #[tokio::test]
1866 #[ignore = "Requires active Redis instance"]
1867 async fn test_new_repository_empty_prefix_fails() {
1868 let redis_url = std::env::var("REDIS_TEST_URL")
1869 .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
1870 let cfg = Config::from_url(&redis_url);
1871 let pool = Arc::new(
1872 cfg.builder()
1873 .expect("Failed to create pool builder")
1874 .max_size(16)
1875 .runtime(Runtime::Tokio1)
1876 .build()
1877 .expect("Failed to build Redis pool"),
1878 );
1879 let connections = Arc::new(RedisConnections::new_single_pool(pool));
1880
1881 let result = RedisTransactionRepository::new(connections, "".to_string());
1882 assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
1883 }
1884
1885 #[tokio::test]
1886 #[ignore = "Requires active Redis instance"]
1887 async fn test_key_generation() {
1888 let repo = setup_test_repo().await;
1889
1890 assert!(repo
1891 .tx_key("relayer-1", "test-id")
1892 .contains(":relayer:relayer-1:tx:test-id"));
1893 assert!(repo
1894 .tx_to_relayer_key("test-id")
1895 .contains(":relayer:tx_to_relayer:test-id"));
1896 assert!(repo.relayer_list_key().contains(":relayer_list"));
1897 assert!(repo
1898 .relayer_status_key("relayer-1", &TransactionStatus::Pending)
1899 .contains(":relayer:relayer-1:status:Pending"));
1900 assert!(repo
1901 .relayer_nonce_key("relayer-1", 42)
1902 .contains(":relayer:relayer-1:nonce:42"));
1903 }
1904
1905 #[tokio::test]
1906 #[ignore = "Requires active Redis instance"]
1907 async fn test_serialize_deserialize_transaction() {
1908 let repo = setup_test_repo().await;
1909 let tx = create_test_transaction("test-1");
1910
1911 let serialized = repo
1912 .serialize_entity(&tx, |t| &t.id, "transaction")
1913 .expect("Serialization should succeed");
1914 let deserialized: TransactionRepoModel = repo
1915 .deserialize_entity(&serialized, "test-1", "transaction")
1916 .expect("Deserialization should succeed");
1917
1918 assert_eq!(tx.id, deserialized.id);
1919 assert_eq!(tx.relayer_id, deserialized.relayer_id);
1920 assert_eq!(tx.status, deserialized.status);
1921 }
1922
1923 #[tokio::test]
1924 #[ignore = "Requires active Redis instance"]
1925 async fn test_extract_nonce() {
1926 let repo = setup_test_repo().await;
1927 let random_id = Uuid::new_v4().to_string();
1928 let relayer_id = Uuid::new_v4().to_string();
1929 let tx_with_nonce = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
1930
1931 let nonce = repo.extract_nonce(&tx_with_nonce.network_data);
1932 assert_eq!(nonce, Some(42));
1933 }
1934
1935 #[tokio::test]
1936 #[ignore = "Requires active Redis instance"]
1937 async fn test_create_transaction() {
1938 let repo = setup_test_repo().await;
1939 let random_id = Uuid::new_v4().to_string();
1940 let tx = create_test_transaction(&random_id);
1941
1942 let result = repo.create(tx.clone()).await.unwrap();
1943 assert_eq!(result.id, tx.id);
1944 }
1945
1946 #[tokio::test]
1947 #[ignore = "Requires active Redis instance"]
1948 async fn test_get_transaction() {
1949 let repo = setup_test_repo().await;
1950 let random_id = Uuid::new_v4().to_string();
1951 let tx = create_test_transaction(&random_id);
1952
1953 repo.create(tx.clone()).await.unwrap();
1954 let stored = repo.get_by_id(random_id.to_string()).await.unwrap();
1955 assert_eq!(stored.id, tx.id);
1956 assert_eq!(stored.relayer_id, tx.relayer_id);
1957 }
1958
1959 #[tokio::test]
1960 #[ignore = "Requires active Redis instance"]
1961 async fn test_update_transaction() {
1962 let repo = setup_test_repo().await;
1963 let random_id = Uuid::new_v4().to_string();
1964 let mut tx = create_test_transaction(&random_id);
1965
1966 repo.create(tx.clone()).await.unwrap();
1967 tx.status = TransactionStatus::Confirmed;
1968
1969 let updated = repo.update(random_id.to_string(), tx).await.unwrap();
1970 assert!(matches!(updated.status, TransactionStatus::Confirmed));
1971 }
1972
1973 #[tokio::test]
1974 #[ignore = "Requires active Redis instance"]
1975 async fn test_delete_transaction() {
1976 let repo = setup_test_repo().await;
1977 let random_id = Uuid::new_v4().to_string();
1978 let tx = create_test_transaction(&random_id);
1979
1980 repo.create(tx).await.unwrap();
1981 repo.delete_by_id(random_id.to_string()).await.unwrap();
1982
1983 let result = repo.get_by_id(random_id.to_string()).await;
1984 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
1985 }
1986
1987 #[tokio::test]
1988 #[ignore = "Requires active Redis instance"]
1989 async fn test_list_all_transactions() {
1990 let repo = setup_test_repo().await;
1991 let random_id = Uuid::new_v4().to_string();
1992 let random_id2 = Uuid::new_v4().to_string();
1993
1994 let tx1 = create_test_transaction(&random_id);
1995 let tx2 = create_test_transaction(&random_id2);
1996
1997 repo.create(tx1).await.unwrap();
1998 repo.create(tx2).await.unwrap();
1999
2000 let transactions = repo.list_all().await.unwrap();
2001 assert!(transactions.len() >= 2);
2002 }
2003
2004 #[tokio::test]
2005 #[ignore = "Requires active Redis instance"]
2006 async fn test_count_transactions() {
2007 let repo = setup_test_repo().await;
2008 let random_id = Uuid::new_v4().to_string();
2009 let tx = create_test_transaction(&random_id);
2010
2011 let count = repo.count().await.unwrap();
2012 repo.create(tx).await.unwrap();
2013 assert!(repo.count().await.unwrap() > count);
2014 }
2015
2016 #[tokio::test]
2017 #[ignore = "Requires active Redis instance"]
2018 async fn test_get_nonexistent_transaction() {
2019 let repo = setup_test_repo().await;
2020 let result = repo.get_by_id("nonexistent".to_string()).await;
2021 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
2022 }
2023
2024 #[tokio::test]
2025 #[ignore = "Requires active Redis instance"]
2026 async fn test_duplicate_transaction_creation() {
2027 let repo = setup_test_repo().await;
2028 let random_id = Uuid::new_v4().to_string();
2029
2030 let tx = create_test_transaction(&random_id);
2031
2032 repo.create(tx.clone()).await.unwrap();
2033 let result = repo.create(tx).await;
2034
2035 assert!(matches!(
2036 result,
2037 Err(RepositoryError::ConstraintViolation(_))
2038 ));
2039 }
2040
2041 #[tokio::test]
2042 #[ignore = "Requires active Redis instance"]
2043 async fn test_update_nonexistent_transaction() {
2044 let repo = setup_test_repo().await;
2045 let tx = create_test_transaction("test-1");
2046
2047 let result = repo.update("nonexistent".to_string(), tx).await;
2048 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
2049 }
2050
2051 #[tokio::test]
2052 #[ignore = "Requires active Redis instance"]
2053 async fn test_list_paginated() {
2054 let repo = setup_test_repo().await;
2055
2056 for _ in 1..=10 {
2058 let random_id = Uuid::new_v4().to_string();
2059 let tx = create_test_transaction(&random_id);
2060 repo.create(tx).await.unwrap();
2061 }
2062
2063 let query = PaginationQuery {
2065 page: 1,
2066 per_page: 3,
2067 };
2068 let result = repo.list_paginated(query).await.unwrap();
2069 assert_eq!(result.items.len(), 3);
2070 assert!(result.total >= 10);
2071 assert_eq!(result.page, 1);
2072 assert_eq!(result.per_page, 3);
2073
2074 let query = PaginationQuery {
2076 page: 1000,
2077 per_page: 3,
2078 };
2079 let result = repo.list_paginated(query).await.unwrap();
2080 assert_eq!(result.items.len(), 0);
2081 }
2082
2083 #[tokio::test]
2084 #[ignore = "Requires active Redis instance"]
2085 async fn test_find_by_relayer_id() {
2086 let repo = setup_test_repo().await;
2087 let random_id = Uuid::new_v4().to_string();
2088 let random_id2 = Uuid::new_v4().to_string();
2089 let random_id3 = Uuid::new_v4().to_string();
2090
2091 let tx1 = create_test_transaction_with_relayer(&random_id, "relayer-1");
2092 let tx2 = create_test_transaction_with_relayer(&random_id2, "relayer-1");
2093 let tx3 = create_test_transaction_with_relayer(&random_id3, "relayer-2");
2094
2095 repo.create(tx1).await.unwrap();
2096 repo.create(tx2).await.unwrap();
2097 repo.create(tx3).await.unwrap();
2098
2099 let query = PaginationQuery {
2101 page: 1,
2102 per_page: 10,
2103 };
2104 let result = repo
2105 .find_by_relayer_id("relayer-1", query.clone())
2106 .await
2107 .unwrap();
2108 assert!(result.total >= 2);
2109 assert!(result.items.len() >= 2);
2110 assert!(result.items.iter().all(|tx| tx.relayer_id == "relayer-1"));
2111
2112 let result = repo
2114 .find_by_relayer_id("relayer-2", query.clone())
2115 .await
2116 .unwrap();
2117 assert!(result.total >= 1);
2118 assert!(!result.items.is_empty());
2119 assert!(result.items.iter().all(|tx| tx.relayer_id == "relayer-2"));
2120
2121 let result = repo
2123 .find_by_relayer_id("non-existent", query.clone())
2124 .await
2125 .unwrap();
2126 assert_eq!(result.total, 0);
2127 assert_eq!(result.items.len(), 0);
2128 }
2129
2130 #[tokio::test]
2131 #[ignore = "Requires active Redis instance"]
2132 async fn test_find_by_relayer_id_sorted_by_created_at_newest_first() {
2133 let repo = setup_test_repo().await;
2134 let relayer_id = Uuid::new_v4().to_string();
2135
2136 let mut tx1 = create_test_transaction_with_relayer("test-1", &relayer_id);
2138 tx1.created_at = "2025-01-27T10:00:00.000000+00:00".to_string(); let mut tx2 = create_test_transaction_with_relayer("test-2", &relayer_id);
2141 tx2.created_at = "2025-01-27T12:00:00.000000+00:00".to_string(); let mut tx3 = create_test_transaction_with_relayer("test-3", &relayer_id);
2144 tx3.created_at = "2025-01-27T14:00:00.000000+00:00".to_string(); repo.create(tx2.clone()).await.unwrap(); repo.create(tx1.clone()).await.unwrap(); repo.create(tx3.clone()).await.unwrap(); let query = PaginationQuery {
2152 page: 1,
2153 per_page: 10,
2154 };
2155 let result = repo.find_by_relayer_id(&relayer_id, query).await.unwrap();
2156
2157 assert_eq!(result.total, 3);
2158 assert_eq!(result.items.len(), 3);
2159
2160 assert_eq!(
2162 result.items[0].id, "test-3",
2163 "First item should be newest (test-3)"
2164 );
2165 assert_eq!(
2166 result.items[0].created_at,
2167 "2025-01-27T14:00:00.000000+00:00"
2168 );
2169
2170 assert_eq!(
2171 result.items[1].id, "test-2",
2172 "Second item should be middle (test-2)"
2173 );
2174 assert_eq!(
2175 result.items[1].created_at,
2176 "2025-01-27T12:00:00.000000+00:00"
2177 );
2178
2179 assert_eq!(
2180 result.items[2].id, "test-1",
2181 "Third item should be oldest (test-1)"
2182 );
2183 assert_eq!(
2184 result.items[2].created_at,
2185 "2025-01-27T10:00:00.000000+00:00"
2186 );
2187 }
2188
2189 #[tokio::test]
2190 #[ignore = "Requires active Redis instance"]
2191 async fn test_find_by_relayer_id_migration_from_old_index() {
2192 let repo = setup_test_repo().await;
2193 let relayer_id = Uuid::new_v4().to_string();
2194
2195 let mut tx1 = create_test_transaction_with_relayer("migrate-test-1", &relayer_id);
2197 tx1.created_at = "2025-01-27T10:00:00.000000+00:00".to_string(); let mut tx2 = create_test_transaction_with_relayer("migrate-test-2", &relayer_id);
2200 tx2.created_at = "2025-01-27T12:00:00.000000+00:00".to_string(); let mut tx3 = create_test_transaction_with_relayer("migrate-test-3", &relayer_id);
2203 tx3.created_at = "2025-01-27T14:00:00.000000+00:00".to_string(); let mut conn = repo.connections.primary().get().await.unwrap();
2208 let relayer_list_key = repo.relayer_list_key();
2209 let _: () = conn.sadd(&relayer_list_key, &relayer_id).await.unwrap();
2210
2211 for tx in &[&tx1, &tx2, &tx3] {
2212 let key = repo.tx_key(&tx.relayer_id, &tx.id);
2213 let reverse_key = repo.tx_to_relayer_key(&tx.id);
2214 let value = repo.serialize_entity(tx, |t| &t.id, "transaction").unwrap();
2215
2216 let mut pipe = redis::pipe();
2217 pipe.atomic();
2218 pipe.set(&key, &value);
2219 pipe.set(&reverse_key, &tx.relayer_id);
2220
2221 let status_key = repo.relayer_status_key(&tx.relayer_id, &tx.status);
2223 pipe.sadd(&status_key, &tx.id);
2224
2225 pipe.exec_async(&mut conn).await.unwrap();
2226 }
2227
2228 let relayer_sorted_key = repo.relayer_tx_by_created_at_key(&relayer_id);
2230 let count: u64 = conn.zcard(&relayer_sorted_key).await.unwrap();
2231 assert_eq!(count, 0, "Sorted set should be empty for old transactions");
2232
2233 let query = PaginationQuery {
2235 page: 1,
2236 per_page: 10,
2237 };
2238 let result = repo
2239 .find_by_relayer_id(&relayer_id, query.clone())
2240 .await
2241 .unwrap();
2242
2243 let count_after: u64 = conn.zcard(&relayer_sorted_key).await.unwrap();
2245 assert_eq!(
2246 count_after, 3,
2247 "Sorted set should be populated after migration"
2248 );
2249
2250 assert_eq!(result.total, 3);
2252 assert_eq!(result.items.len(), 3);
2253
2254 assert_eq!(
2255 result.items[0].id, "migrate-test-3",
2256 "First item should be newest after migration"
2257 );
2258 assert_eq!(
2259 result.items[0].created_at,
2260 "2025-01-27T14:00:00.000000+00:00"
2261 );
2262
2263 assert_eq!(
2264 result.items[1].id, "migrate-test-2",
2265 "Second item should be middle after migration"
2266 );
2267 assert_eq!(
2268 result.items[1].created_at,
2269 "2025-01-27T12:00:00.000000+00:00"
2270 );
2271
2272 assert_eq!(
2273 result.items[2].id, "migrate-test-1",
2274 "Third item should be oldest after migration"
2275 );
2276 assert_eq!(
2277 result.items[2].created_at,
2278 "2025-01-27T10:00:00.000000+00:00"
2279 );
2280
2281 let result2 = repo.find_by_relayer_id(&relayer_id, query).await.unwrap();
2283 assert_eq!(result2.total, 3);
2284 assert_eq!(result2.items.len(), 3);
2285 assert_eq!(result.items[0].id, result2.items[0].id);
2287 }
2288
2289 #[tokio::test]
2290 #[ignore = "Requires active Redis instance"]
2291 async fn test_find_by_status() {
2292 let repo = setup_test_repo().await;
2293 let random_id = Uuid::new_v4().to_string();
2294 let random_id2 = Uuid::new_v4().to_string();
2295 let random_id3 = Uuid::new_v4().to_string();
2296 let relayer_id = Uuid::new_v4().to_string();
2297 let tx1 = create_test_transaction_with_status(
2298 &random_id,
2299 &relayer_id,
2300 TransactionStatus::Pending,
2301 );
2302 let tx2 =
2303 create_test_transaction_with_status(&random_id2, &relayer_id, TransactionStatus::Sent);
2304 let tx3 = create_test_transaction_with_status(
2305 &random_id3,
2306 &relayer_id,
2307 TransactionStatus::Confirmed,
2308 );
2309
2310 repo.create(tx1).await.unwrap();
2311 repo.create(tx2).await.unwrap();
2312 repo.create(tx3).await.unwrap();
2313
2314 let result = repo
2316 .find_by_status(&relayer_id, &[TransactionStatus::Pending])
2317 .await
2318 .unwrap();
2319 assert_eq!(result.len(), 1);
2320 assert_eq!(result[0].status, TransactionStatus::Pending);
2321
2322 let result = repo
2324 .find_by_status(
2325 &relayer_id,
2326 &[TransactionStatus::Pending, TransactionStatus::Sent],
2327 )
2328 .await
2329 .unwrap();
2330 assert_eq!(result.len(), 2);
2331
2332 let result = repo
2334 .find_by_status(&relayer_id, &[TransactionStatus::Failed])
2335 .await
2336 .unwrap();
2337 assert_eq!(result.len(), 0);
2338 }
2339
2340 #[tokio::test]
2341 #[ignore = "Requires active Redis instance"]
2342 async fn test_find_by_status_paginated() {
2343 let repo = setup_test_repo().await;
2344 let relayer_id = Uuid::new_v4().to_string();
2345
2346 for i in 1..=5 {
2348 let tx_id = Uuid::new_v4().to_string();
2349 let mut tx = create_test_transaction_with_status(
2350 &tx_id,
2351 &relayer_id,
2352 TransactionStatus::Pending,
2353 );
2354 tx.created_at = format!("2025-01-27T{:02}:00:00.000000+00:00", 10 + i);
2355 repo.create(tx).await.unwrap();
2356 }
2357
2358 for i in 6..=7 {
2360 let tx_id = Uuid::new_v4().to_string();
2361 let mut tx = create_test_transaction_with_status(
2362 &tx_id,
2363 &relayer_id,
2364 TransactionStatus::Confirmed,
2365 );
2366 tx.created_at = format!("2025-01-27T{:02}:00:00.000000+00:00", 10 + i);
2367 repo.create(tx).await.unwrap();
2368 }
2369
2370 let query = PaginationQuery {
2372 page: 1,
2373 per_page: 2,
2374 };
2375 let result = repo
2376 .find_by_status_paginated(&relayer_id, &[TransactionStatus::Pending], query, false)
2377 .await
2378 .unwrap();
2379
2380 assert_eq!(result.total, 5);
2381 assert_eq!(result.items.len(), 2);
2382 assert_eq!(result.page, 1);
2383 assert_eq!(result.per_page, 2);
2384
2385 let query = PaginationQuery {
2387 page: 2,
2388 per_page: 2,
2389 };
2390 let result = repo
2391 .find_by_status_paginated(&relayer_id, &[TransactionStatus::Pending], query, false)
2392 .await
2393 .unwrap();
2394
2395 assert_eq!(result.total, 5);
2396 assert_eq!(result.items.len(), 2);
2397 assert_eq!(result.page, 2);
2398
2399 let query = PaginationQuery {
2401 page: 3,
2402 per_page: 2,
2403 };
2404 let result = repo
2405 .find_by_status_paginated(&relayer_id, &[TransactionStatus::Pending], query, false)
2406 .await
2407 .unwrap();
2408
2409 assert_eq!(result.total, 5);
2410 assert_eq!(result.items.len(), 1);
2411
2412 let query = PaginationQuery {
2414 page: 1,
2415 per_page: 10,
2416 };
2417 let result = repo
2418 .find_by_status_paginated(
2419 &relayer_id,
2420 &[TransactionStatus::Pending, TransactionStatus::Confirmed],
2421 query,
2422 false,
2423 )
2424 .await
2425 .unwrap();
2426
2427 assert_eq!(result.total, 7);
2428 assert_eq!(result.items.len(), 7);
2429
2430 let query = PaginationQuery {
2432 page: 1,
2433 per_page: 10,
2434 };
2435 let result = repo
2436 .find_by_status_paginated(&relayer_id, &[TransactionStatus::Failed], query, false)
2437 .await
2438 .unwrap();
2439
2440 assert_eq!(result.total, 0);
2441 assert_eq!(result.items.len(), 0);
2442 }
2443
2444 #[tokio::test]
2445 #[ignore = "Requires active Redis instance"]
2446 async fn test_find_by_status_paginated_oldest_first() {
2447 let repo = setup_test_repo().await;
2448 let relayer_id = Uuid::new_v4().to_string();
2449
2450 for i in 1..=5 {
2452 let tx_id = format!("tx{}-{}", i, Uuid::new_v4());
2453 let mut tx = create_test_transaction(&tx_id);
2454 tx.relayer_id = relayer_id.clone();
2455 tx.status = TransactionStatus::Pending;
2456 tx.created_at = format!("2025-01-27T{:02}:00:00.000000+00:00", 10 + i);
2457 repo.create(tx).await.unwrap();
2458 }
2459
2460 let query = PaginationQuery {
2462 page: 1,
2463 per_page: 3,
2464 };
2465 let result = repo
2466 .find_by_status_paginated(
2467 &relayer_id,
2468 &[TransactionStatus::Pending],
2469 query.clone(),
2470 true,
2471 )
2472 .await
2473 .unwrap();
2474
2475 assert_eq!(result.total, 5);
2476 assert_eq!(result.items.len(), 3);
2477 assert!(
2479 result.items[0].created_at < result.items[1].created_at,
2480 "First item should be older than second"
2481 );
2482 assert!(
2483 result.items[1].created_at < result.items[2].created_at,
2484 "Second item should be older than third"
2485 );
2486
2487 let result_newest = repo
2489 .find_by_status_paginated(&relayer_id, &[TransactionStatus::Pending], query, false)
2490 .await
2491 .unwrap();
2492
2493 assert_eq!(result_newest.items.len(), 3);
2494 assert!(
2496 result_newest.items[0].created_at > result_newest.items[1].created_at,
2497 "First item should be newer than second"
2498 );
2499 assert!(
2500 result_newest.items[1].created_at > result_newest.items[2].created_at,
2501 "Second item should be newer than third"
2502 );
2503 }
2504
2505 #[tokio::test]
2506 #[ignore = "Requires active Redis instance"]
2507 async fn test_find_by_status_paginated_oldest_first_single_item() {
2508 let repo = setup_test_repo().await;
2509 let relayer_id = Uuid::new_v4().to_string();
2510
2511 let timestamps = [
2513 "2025-01-27T08:00:00.000000+00:00", "2025-01-27T10:00:00.000000+00:00", "2025-01-27T12:00:00.000000+00:00", ];
2517
2518 let mut oldest_id = String::new();
2519 let mut newest_id = String::new();
2520
2521 for (i, timestamp) in timestamps.iter().enumerate() {
2522 let tx_id = format!("tx-{}-{}", i, Uuid::new_v4());
2523 if i == 0 {
2524 oldest_id = tx_id.clone();
2525 }
2526 if i == 2 {
2527 newest_id = tx_id.clone();
2528 }
2529 let mut tx = create_test_transaction(&tx_id);
2530 tx.relayer_id = relayer_id.clone();
2531 tx.status = TransactionStatus::Pending;
2532 tx.created_at = timestamp.to_string();
2533 repo.create(tx).await.unwrap();
2534 }
2535
2536 let query = PaginationQuery {
2538 page: 1,
2539 per_page: 1,
2540 };
2541 let result = repo
2542 .find_by_status_paginated(
2543 &relayer_id,
2544 &[TransactionStatus::Pending],
2545 query.clone(),
2546 true,
2547 )
2548 .await
2549 .unwrap();
2550
2551 assert_eq!(result.total, 3);
2552 assert_eq!(result.items.len(), 1);
2553 assert_eq!(
2554 result.items[0].id, oldest_id,
2555 "With oldest_first=true and per_page=1, should return the oldest transaction"
2556 );
2557
2558 let result = repo
2560 .find_by_status_paginated(&relayer_id, &[TransactionStatus::Pending], query, false)
2561 .await
2562 .unwrap();
2563
2564 assert_eq!(result.items.len(), 1);
2565 assert_eq!(
2566 result.items[0].id, newest_id,
2567 "With oldest_first=false and per_page=1, should return the newest transaction"
2568 );
2569 }
2570
2571 #[tokio::test]
2572 #[ignore = "Requires active Redis instance"]
2573 async fn test_find_by_nonce() {
2574 let repo = setup_test_repo().await;
2575 let random_id = Uuid::new_v4().to_string();
2576 let random_id2 = Uuid::new_v4().to_string();
2577 let relayer_id = Uuid::new_v4().to_string();
2578
2579 let tx1 = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
2580 let tx2 = create_test_transaction_with_nonce(&random_id2, 43, &relayer_id);
2581
2582 repo.create(tx1.clone()).await.unwrap();
2583 repo.create(tx2).await.unwrap();
2584
2585 let result = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
2587 assert!(result.is_some());
2588 assert_eq!(result.unwrap().id, random_id);
2589
2590 let result = repo.find_by_nonce(&relayer_id, 99).await.unwrap();
2592 assert!(result.is_none());
2593
2594 let result = repo.find_by_nonce("non-existent", 42).await.unwrap();
2596 assert!(result.is_none());
2597 }
2598
2599 #[tokio::test]
2600 #[ignore = "Requires active Redis instance"]
2601 async fn test_update_status() {
2602 let repo = setup_test_repo().await;
2603 let random_id = Uuid::new_v4().to_string();
2604 let tx = create_test_transaction(&random_id);
2605
2606 repo.create(tx).await.unwrap();
2607 let updated = repo
2608 .update_status(random_id.to_string(), TransactionStatus::Confirmed)
2609 .await
2610 .unwrap();
2611 assert_eq!(updated.status, TransactionStatus::Confirmed);
2612 }
2613
2614 #[tokio::test]
2615 #[ignore = "Requires active Redis instance"]
2616 async fn test_partial_update() {
2617 let repo = setup_test_repo().await;
2618 let random_id = Uuid::new_v4().to_string();
2619 let tx = create_test_transaction(&random_id);
2620
2621 repo.create(tx).await.unwrap();
2622
2623 let update = TransactionUpdateRequest {
2624 status: Some(TransactionStatus::Sent),
2625 status_reason: Some("Transaction sent".to_string()),
2626 sent_at: Some("2025-01-27T16:00:00.000000+00:00".to_string()),
2627 confirmed_at: None,
2628 network_data: None,
2629 hashes: None,
2630 is_canceled: None,
2631 priced_at: None,
2632 noop_count: None,
2633 delete_at: None,
2634 };
2635
2636 let updated = repo
2637 .partial_update(random_id.to_string(), update)
2638 .await
2639 .unwrap();
2640 assert_eq!(updated.status, TransactionStatus::Sent);
2641 assert_eq!(updated.status_reason, Some("Transaction sent".to_string()));
2642 assert_eq!(
2643 updated.sent_at,
2644 Some("2025-01-27T16:00:00.000000+00:00".to_string())
2645 );
2646 }
2647
2648 #[tokio::test]
2649 #[ignore = "Requires active Redis instance"]
2650 async fn test_set_sent_at() {
2651 let repo = setup_test_repo().await;
2652 let random_id = Uuid::new_v4().to_string();
2653 let tx = create_test_transaction(&random_id);
2654
2655 repo.create(tx).await.unwrap();
2656 let updated = repo
2657 .set_sent_at(
2658 random_id.to_string(),
2659 "2025-01-27T16:00:00.000000+00:00".to_string(),
2660 )
2661 .await
2662 .unwrap();
2663 assert_eq!(
2664 updated.sent_at,
2665 Some("2025-01-27T16:00:00.000000+00:00".to_string())
2666 );
2667 }
2668
2669 #[tokio::test]
2670 #[ignore = "Requires active Redis instance"]
2671 async fn test_set_confirmed_at() {
2672 let repo = setup_test_repo().await;
2673 let random_id = Uuid::new_v4().to_string();
2674 let tx = create_test_transaction(&random_id);
2675
2676 repo.create(tx).await.unwrap();
2677 let updated = repo
2678 .set_confirmed_at(
2679 random_id.to_string(),
2680 "2025-01-27T16:00:00.000000+00:00".to_string(),
2681 )
2682 .await
2683 .unwrap();
2684 assert_eq!(
2685 updated.confirmed_at,
2686 Some("2025-01-27T16:00:00.000000+00:00".to_string())
2687 );
2688 }
2689
2690 #[tokio::test]
2691 #[ignore = "Requires active Redis instance"]
2692 async fn test_update_network_data() {
2693 let repo = setup_test_repo().await;
2694 let random_id = Uuid::new_v4().to_string();
2695 let tx = create_test_transaction(&random_id);
2696
2697 repo.create(tx).await.unwrap();
2698
2699 let new_network_data = NetworkTransactionData::Evm(EvmTransactionData {
2700 gas_price: Some(2000000000),
2701 gas_limit: Some(42000),
2702 nonce: Some(2),
2703 value: U256::from_str("2000000000000000000").unwrap(),
2704 data: Some("0x1234".to_string()),
2705 from: "0xNewSender".to_string(),
2706 to: Some("0xNewRecipient".to_string()),
2707 chain_id: 1,
2708 signature: None,
2709 hash: Some("0xnewhash".to_string()),
2710 speed: Some(Speed::SafeLow),
2711 max_fee_per_gas: None,
2712 max_priority_fee_per_gas: None,
2713 raw: None,
2714 });
2715
2716 let updated = repo
2717 .update_network_data(random_id.to_string(), new_network_data.clone())
2718 .await
2719 .unwrap();
2720 assert_eq!(
2721 updated
2722 .network_data
2723 .get_evm_transaction_data()
2724 .unwrap()
2725 .hash,
2726 new_network_data.get_evm_transaction_data().unwrap().hash
2727 );
2728 }
2729
2730 #[tokio::test]
2731 #[ignore = "Requires active Redis instance"]
2732 async fn test_debug_implementation() {
2733 let repo = setup_test_repo().await;
2734 let debug_str = format!("{:?}", repo);
2735 assert!(debug_str.contains("RedisTransactionRepository"));
2736 assert!(debug_str.contains("test_prefix"));
2737 }
2738
2739 #[tokio::test]
2740 #[ignore = "Requires active Redis instance"]
2741 async fn test_error_handling_empty_id() {
2742 let repo = setup_test_repo().await;
2743
2744 let result = repo.get_by_id("".to_string()).await;
2745 assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
2746
2747 let result = repo
2748 .update("".to_string(), create_test_transaction("test"))
2749 .await;
2750 assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
2751
2752 let result = repo.delete_by_id("".to_string()).await;
2753 assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
2754 }
2755
2756 #[tokio::test]
2757 #[ignore = "Requires active Redis instance"]
2758 async fn test_pagination_validation() {
2759 let repo = setup_test_repo().await;
2760
2761 let query = PaginationQuery {
2762 page: 1,
2763 per_page: 0,
2764 };
2765 let result = repo.list_paginated(query).await;
2766 assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
2767 }
2768
2769 #[tokio::test]
2770 #[ignore = "Requires active Redis instance"]
2771 async fn test_index_consistency() {
2772 let repo = setup_test_repo().await;
2773 let random_id = Uuid::new_v4().to_string();
2774 let relayer_id = Uuid::new_v4().to_string();
2775 let tx = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
2776
2777 repo.create(tx.clone()).await.unwrap();
2779
2780 let found = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
2782 assert!(found.is_some());
2783
2784 let mut updated_tx = tx.clone();
2786 if let NetworkTransactionData::Evm(ref mut evm_data) = updated_tx.network_data {
2787 evm_data.nonce = Some(43);
2788 }
2789
2790 repo.update(random_id.to_string(), updated_tx)
2791 .await
2792 .unwrap();
2793
2794 let old_nonce_result = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
2796 assert!(old_nonce_result.is_none());
2797
2798 let new_nonce_result = repo.find_by_nonce(&relayer_id, 43).await.unwrap();
2800 assert!(new_nonce_result.is_some());
2801 }
2802
2803 #[tokio::test]
2804 #[ignore = "Requires active Redis instance"]
2805 async fn test_has_entries() {
2806 let repo = setup_test_repo().await;
2807 assert!(!repo.has_entries().await.unwrap());
2808
2809 let tx_id = uuid::Uuid::new_v4().to_string();
2810 let tx = create_test_transaction(&tx_id);
2811 repo.create(tx.clone()).await.unwrap();
2812
2813 assert!(repo.has_entries().await.unwrap());
2814 }
2815
2816 #[tokio::test]
2817 #[ignore = "Requires active Redis instance"]
2818 async fn test_drop_all_entries() {
2819 let repo = setup_test_repo().await;
2820 let tx_id = uuid::Uuid::new_v4().to_string();
2821 let tx = create_test_transaction(&tx_id);
2822 repo.create(tx.clone()).await.unwrap();
2823 assert!(repo.has_entries().await.unwrap());
2824
2825 repo.drop_all_entries().await.unwrap();
2826 assert!(!repo.has_entries().await.unwrap());
2827 }
2828
2829 #[tokio::test]
2831 #[ignore = "Requires active Redis instance"]
2832 async fn test_update_status_sets_delete_at_for_final_statuses() {
2833 let _lock = ENV_MUTEX.lock().await;
2834
2835 use chrono::{DateTime, Duration, Utc};
2836 use std::env;
2837
2838 env::set_var("TRANSACTION_EXPIRATION_HOURS", "6");
2840
2841 let repo = setup_test_repo().await;
2842
2843 let final_statuses = [
2844 TransactionStatus::Canceled,
2845 TransactionStatus::Confirmed,
2846 TransactionStatus::Failed,
2847 TransactionStatus::Expired,
2848 ];
2849
2850 for (i, status) in final_statuses.iter().enumerate() {
2851 let tx_id = format!("test-final-{}-{}", i, Uuid::new_v4());
2852 let mut tx = create_test_transaction(&tx_id);
2853
2854 tx.delete_at = None;
2856 tx.status = TransactionStatus::Pending;
2857
2858 repo.create(tx).await.unwrap();
2859
2860 let before_update = Utc::now();
2861
2862 let updated = repo
2864 .update_status(tx_id.clone(), status.clone())
2865 .await
2866 .unwrap();
2867
2868 assert!(
2870 updated.delete_at.is_some(),
2871 "delete_at should be set for status: {:?}",
2872 status
2873 );
2874
2875 let delete_at_str = updated.delete_at.unwrap();
2877 let delete_at = DateTime::parse_from_rfc3339(&delete_at_str)
2878 .expect("delete_at should be valid RFC3339")
2879 .with_timezone(&Utc);
2880
2881 let duration_from_before = delete_at.signed_duration_since(before_update);
2882 let expected_duration = Duration::hours(6);
2883 let tolerance = Duration::minutes(5);
2884
2885 assert!(
2886 duration_from_before >= expected_duration - tolerance
2887 && duration_from_before <= expected_duration + tolerance,
2888 "delete_at should be approximately 6 hours from now for status: {:?}. Duration: {:?}",
2889 status,
2890 duration_from_before
2891 );
2892 }
2893
2894 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
2896 }
2897
2898 #[tokio::test]
2899 #[ignore = "Requires active Redis instance"]
2900 async fn test_update_status_does_not_set_delete_at_for_non_final_statuses() {
2901 let _lock = ENV_MUTEX.lock().await;
2902
2903 use std::env;
2904
2905 env::set_var("TRANSACTION_EXPIRATION_HOURS", "4");
2906
2907 let repo = setup_test_repo().await;
2908
2909 let non_final_statuses = [
2910 TransactionStatus::Pending,
2911 TransactionStatus::Sent,
2912 TransactionStatus::Submitted,
2913 TransactionStatus::Mined,
2914 ];
2915
2916 for (i, status) in non_final_statuses.iter().enumerate() {
2917 let tx_id = format!("test-non-final-{}-{}", i, Uuid::new_v4());
2918 let mut tx = create_test_transaction(&tx_id);
2919 tx.delete_at = None;
2920 tx.status = TransactionStatus::Pending;
2921
2922 repo.create(tx).await.unwrap();
2923
2924 let updated = repo
2926 .update_status(tx_id.clone(), status.clone())
2927 .await
2928 .unwrap();
2929
2930 assert!(
2932 updated.delete_at.is_none(),
2933 "delete_at should NOT be set for status: {:?}",
2934 status
2935 );
2936 }
2937
2938 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
2940 }
2941
2942 #[tokio::test]
2943 #[ignore = "Requires active Redis instance"]
2944 async fn test_partial_update_sets_delete_at_for_final_statuses() {
2945 let _lock = ENV_MUTEX.lock().await;
2946
2947 use chrono::{DateTime, Duration, Utc};
2948 use std::env;
2949
2950 env::set_var("TRANSACTION_EXPIRATION_HOURS", "8");
2951
2952 let repo = setup_test_repo().await;
2953 let tx_id = format!("test-partial-final-{}", Uuid::new_v4());
2954 let mut tx = create_test_transaction(&tx_id);
2955 tx.delete_at = None;
2956 tx.status = TransactionStatus::Pending;
2957
2958 repo.create(tx).await.unwrap();
2959
2960 let before_update = Utc::now();
2961
2962 let update = TransactionUpdateRequest {
2964 status: Some(TransactionStatus::Confirmed),
2965 status_reason: Some("Transaction completed".to_string()),
2966 confirmed_at: Some("2023-01-01T12:05:00Z".to_string()),
2967 ..Default::default()
2968 };
2969
2970 let updated = repo.partial_update(tx_id.clone(), update).await.unwrap();
2971
2972 assert!(
2974 updated.delete_at.is_some(),
2975 "delete_at should be set when updating to Confirmed status"
2976 );
2977
2978 let delete_at_str = updated.delete_at.unwrap();
2980 let delete_at = DateTime::parse_from_rfc3339(&delete_at_str)
2981 .expect("delete_at should be valid RFC3339")
2982 .with_timezone(&Utc);
2983
2984 let duration_from_before = delete_at.signed_duration_since(before_update);
2985 let expected_duration = Duration::hours(8);
2986 let tolerance = Duration::minutes(5);
2987
2988 assert!(
2989 duration_from_before >= expected_duration - tolerance
2990 && duration_from_before <= expected_duration + tolerance,
2991 "delete_at should be approximately 8 hours from now. Duration: {:?}",
2992 duration_from_before
2993 );
2994
2995 assert_eq!(updated.status, TransactionStatus::Confirmed);
2997 assert_eq!(
2998 updated.status_reason,
2999 Some("Transaction completed".to_string())
3000 );
3001 assert_eq!(
3002 updated.confirmed_at,
3003 Some("2023-01-01T12:05:00Z".to_string())
3004 );
3005
3006 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
3008 }
3009
3010 #[tokio::test]
3011 #[ignore = "Requires active Redis instance"]
3012 async fn test_update_status_preserves_existing_delete_at() {
3013 let _lock = ENV_MUTEX.lock().await;
3014
3015 use std::env;
3016
3017 env::set_var("TRANSACTION_EXPIRATION_HOURS", "2");
3018
3019 let repo = setup_test_repo().await;
3020 let tx_id = format!("test-preserve-delete-at-{}", Uuid::new_v4());
3021 let mut tx = create_test_transaction(&tx_id);
3022
3023 let existing_delete_at = "2025-01-01T12:00:00Z".to_string();
3025 tx.delete_at = Some(existing_delete_at.clone());
3026 tx.status = TransactionStatus::Pending;
3027
3028 repo.create(tx).await.unwrap();
3029
3030 let updated = repo
3032 .update_status(tx_id.clone(), TransactionStatus::Confirmed)
3033 .await
3034 .unwrap();
3035
3036 assert_eq!(
3038 updated.delete_at,
3039 Some(existing_delete_at),
3040 "Existing delete_at should be preserved when updating to final status"
3041 );
3042
3043 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
3045 }
3046 #[tokio::test]
3047 #[ignore = "Requires active Redis instance"]
3048 async fn test_partial_update_without_status_change_preserves_delete_at() {
3049 let _lock = ENV_MUTEX.lock().await;
3050
3051 use std::env;
3052
3053 env::set_var("TRANSACTION_EXPIRATION_HOURS", "3");
3054
3055 let repo = setup_test_repo().await;
3056 let tx_id = format!("test-preserve-no-status-{}", Uuid::new_v4());
3057 let mut tx = create_test_transaction(&tx_id);
3058 tx.delete_at = None;
3059 tx.status = TransactionStatus::Pending;
3060
3061 repo.create(tx).await.unwrap();
3062
3063 let updated1 = repo
3065 .update_status(tx_id.clone(), TransactionStatus::Confirmed)
3066 .await
3067 .unwrap();
3068
3069 assert!(updated1.delete_at.is_some());
3070 let original_delete_at = updated1.delete_at.clone();
3071
3072 let update = TransactionUpdateRequest {
3074 status: None, status_reason: Some("Updated reason".to_string()),
3076 confirmed_at: Some("2023-01-01T12:10:00Z".to_string()),
3077 ..Default::default()
3078 };
3079
3080 let updated2 = repo.partial_update(tx_id.clone(), update).await.unwrap();
3081
3082 assert_eq!(
3084 updated2.delete_at, original_delete_at,
3085 "delete_at should be preserved when status is not updated"
3086 );
3087
3088 assert_eq!(updated2.status, TransactionStatus::Confirmed); assert_eq!(updated2.status_reason, Some("Updated reason".to_string()));
3091 assert_eq!(
3092 updated2.confirmed_at,
3093 Some("2023-01-01T12:10:00Z".to_string())
3094 );
3095
3096 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
3098 }
3099
3100 #[tokio::test]
3103 #[ignore = "Requires active Redis instance"]
3104 async fn test_delete_by_ids_empty_list() {
3105 let repo = setup_test_repo().await;
3106 let tx_id = format!("test-empty-{}", Uuid::new_v4());
3107
3108 let tx = create_test_transaction(&tx_id);
3110 repo.create(tx).await.unwrap();
3111
3112 let result = repo.delete_by_ids(vec![]).await.unwrap();
3114
3115 assert_eq!(result.deleted_count, 0);
3116 assert!(result.failed.is_empty());
3117
3118 assert!(repo.get_by_id(tx_id).await.is_ok());
3120 }
3121
3122 #[tokio::test]
3123 #[ignore = "Requires active Redis instance"]
3124 async fn test_delete_by_ids_single_transaction() {
3125 let repo = setup_test_repo().await;
3126 let tx_id = format!("test-single-{}", Uuid::new_v4());
3127
3128 let tx = create_test_transaction(&tx_id);
3129 repo.create(tx).await.unwrap();
3130
3131 let result = repo.delete_by_ids(vec![tx_id.clone()]).await.unwrap();
3132
3133 assert_eq!(result.deleted_count, 1);
3134 assert!(result.failed.is_empty());
3135
3136 assert!(repo.get_by_id(tx_id).await.is_err());
3138 }
3139
3140 #[tokio::test]
3141 #[ignore = "Requires active Redis instance"]
3142 async fn test_delete_by_ids_multiple_transactions() {
3143 let repo = setup_test_repo().await;
3144 let base_id = Uuid::new_v4();
3145
3146 let mut created_ids = Vec::new();
3148 for i in 1..=5 {
3149 let tx_id = format!("test-multi-{}-{}", base_id, i);
3150 let tx = create_test_transaction(&tx_id);
3151 repo.create(tx).await.unwrap();
3152 created_ids.push(tx_id);
3153 }
3154
3155 let ids_to_delete = vec![
3157 created_ids[0].clone(),
3158 created_ids[2].clone(),
3159 created_ids[4].clone(),
3160 ];
3161 let result = repo.delete_by_ids(ids_to_delete).await.unwrap();
3162
3163 assert_eq!(result.deleted_count, 3);
3164 assert!(result.failed.is_empty());
3165
3166 assert!(repo.get_by_id(created_ids[0].clone()).await.is_err());
3168 assert!(repo.get_by_id(created_ids[1].clone()).await.is_ok()); assert!(repo.get_by_id(created_ids[2].clone()).await.is_err());
3170 assert!(repo.get_by_id(created_ids[3].clone()).await.is_ok()); assert!(repo.get_by_id(created_ids[4].clone()).await.is_err());
3172 }
3173
3174 #[tokio::test]
3175 #[ignore = "Requires active Redis instance"]
3176 async fn test_delete_by_ids_nonexistent_transactions() {
3177 let repo = setup_test_repo().await;
3178 let base_id = Uuid::new_v4();
3179
3180 let ids_to_delete = vec![
3182 format!("nonexistent-{}-1", base_id),
3183 format!("nonexistent-{}-2", base_id),
3184 ];
3185 let result = repo.delete_by_ids(ids_to_delete.clone()).await.unwrap();
3186
3187 assert_eq!(result.deleted_count, 0);
3188 assert_eq!(result.failed.len(), 2);
3189
3190 let failed_ids: Vec<&String> = result.failed.iter().map(|(id, _)| id).collect();
3192 assert!(failed_ids.contains(&&ids_to_delete[0]));
3193 assert!(failed_ids.contains(&&ids_to_delete[1]));
3194 }
3195
3196 #[tokio::test]
3197 #[ignore = "Requires active Redis instance"]
3198 async fn test_delete_by_ids_mixed_existing_and_nonexistent() {
3199 let repo = setup_test_repo().await;
3200 let base_id = Uuid::new_v4();
3201
3202 let existing_ids: Vec<String> = (1..=3)
3204 .map(|i| format!("test-mixed-existing-{}-{}", base_id, i))
3205 .collect();
3206
3207 for id in &existing_ids {
3208 let tx = create_test_transaction(id);
3209 repo.create(tx).await.unwrap();
3210 }
3211
3212 let nonexistent_ids: Vec<String> = (1..=2)
3213 .map(|i| format!("test-mixed-nonexistent-{}-{}", base_id, i))
3214 .collect();
3215
3216 let ids_to_delete = vec![
3218 existing_ids[0].clone(),
3219 nonexistent_ids[0].clone(),
3220 existing_ids[1].clone(),
3221 nonexistent_ids[1].clone(),
3222 ];
3223 let result = repo.delete_by_ids(ids_to_delete).await.unwrap();
3224
3225 assert_eq!(result.deleted_count, 2);
3226 assert_eq!(result.failed.len(), 2);
3227
3228 assert!(repo.get_by_id(existing_ids[0].clone()).await.is_err());
3230 assert!(repo.get_by_id(existing_ids[1].clone()).await.is_err());
3231
3232 assert!(repo.get_by_id(existing_ids[2].clone()).await.is_ok());
3234 }
3235
3236 #[tokio::test]
3237 #[ignore = "Requires active Redis instance"]
3238 async fn test_delete_by_ids_removes_all_indexes() {
3239 let repo = setup_test_repo().await;
3240 let relayer_id = format!("relayer-{}", Uuid::new_v4());
3241 let tx_id = format!("test-indexes-{}", Uuid::new_v4());
3242
3243 let mut tx = create_test_transaction(&tx_id);
3245 tx.relayer_id = relayer_id.clone();
3246 tx.status = TransactionStatus::Confirmed;
3247 repo.create(tx).await.unwrap();
3248
3249 let found = repo
3251 .find_by_status(&relayer_id, &[TransactionStatus::Confirmed])
3252 .await
3253 .unwrap();
3254 assert!(found.iter().any(|t| t.id == tx_id));
3255
3256 let result = repo.delete_by_ids(vec![tx_id.clone()]).await.unwrap();
3258 assert_eq!(result.deleted_count, 1);
3259
3260 let found_after = repo
3262 .find_by_status(&relayer_id, &[TransactionStatus::Confirmed])
3263 .await
3264 .unwrap();
3265 assert!(!found_after.iter().any(|t| t.id == tx_id));
3266
3267 assert!(repo.get_by_id(tx_id).await.is_err());
3269 }
3270
3271 #[tokio::test]
3272 #[ignore = "Requires active Redis instance"]
3273 async fn test_delete_by_ids_removes_nonce_index() {
3274 let repo = setup_test_repo().await;
3275 let relayer_id = format!("relayer-{}", Uuid::new_v4());
3276 let tx_id = format!("test-nonce-{}", Uuid::new_v4());
3277 let nonce = 12345u64;
3278
3279 let tx = create_test_transaction_with_nonce(&tx_id, nonce, &relayer_id);
3281 repo.create(tx).await.unwrap();
3282
3283 let found = repo.find_by_nonce(&relayer_id, nonce).await.unwrap();
3285 assert!(found.is_some());
3286 assert_eq!(found.unwrap().id, tx_id);
3287
3288 let result = repo.delete_by_ids(vec![tx_id.clone()]).await.unwrap();
3290 assert_eq!(result.deleted_count, 1);
3291
3292 let found_after = repo.find_by_nonce(&relayer_id, nonce).await.unwrap();
3294 assert!(found_after.is_none());
3295 }
3296
3297 #[tokio::test]
3298 #[ignore = "Requires active Redis instance"]
3299 async fn test_delete_by_ids_large_batch() {
3300 let repo = setup_test_repo().await;
3301 let base_id = Uuid::new_v4();
3302
3303 let count = 50;
3305 let mut created_ids = Vec::new();
3306
3307 for i in 0..count {
3308 let tx_id = format!("test-large-{}-{}", base_id, i);
3309 let tx = create_test_transaction(&tx_id);
3310 repo.create(tx).await.unwrap();
3311 created_ids.push(tx_id);
3312 }
3313
3314 let result = repo.delete_by_ids(created_ids.clone()).await.unwrap();
3316
3317 assert_eq!(result.deleted_count, count);
3318 assert!(result.failed.is_empty());
3319
3320 for id in created_ids {
3322 assert!(repo.get_by_id(id).await.is_err());
3323 }
3324 }
3325
3326 #[tokio::test]
3327 #[ignore = "Requires active Redis instance"]
3328 async fn test_delete_by_ids_preserves_other_relayer_transactions() {
3329 let repo = setup_test_repo().await;
3330 let relayer_1 = format!("relayer-1-{}", Uuid::new_v4());
3331 let relayer_2 = format!("relayer-2-{}", Uuid::new_v4());
3332 let tx_id_1 = format!("tx-relayer-1-{}", Uuid::new_v4());
3333 let tx_id_2 = format!("tx-relayer-2-{}", Uuid::new_v4());
3334
3335 let tx1 = create_test_transaction_with_relayer(&tx_id_1, &relayer_1);
3337 let tx2 = create_test_transaction_with_relayer(&tx_id_2, &relayer_2);
3338
3339 repo.create(tx1).await.unwrap();
3340 repo.create(tx2).await.unwrap();
3341
3342 let result = repo.delete_by_ids(vec![tx_id_1.clone()]).await.unwrap();
3344
3345 assert_eq!(result.deleted_count, 1);
3346
3347 assert!(repo.get_by_id(tx_id_1).await.is_err());
3349
3350 let remaining = repo.get_by_id(tx_id_2).await.unwrap();
3352 assert_eq!(remaining.relayer_id, relayer_2);
3353 }
3354}