openzeppelin_relayer/repositories/transaction/
transaction_redis.rs

1//! Redis-backed implementation of the TransactionRepository.
2
3use crate::domain::transaction::common::is_final_state;
4use crate::metrics::{
5    TRANSACTIONS_BY_STATUS, TRANSACTIONS_CREATED, TRANSACTIONS_FAILED, TRANSACTIONS_SUBMITTED,
6    TRANSACTIONS_SUCCESS, TRANSACTION_PROCESSING_TIME,
7};
8use crate::models::{
9    NetworkTransactionData, PaginationQuery, RepositoryError, TransactionRepoModel,
10    TransactionStatus, TransactionUpdateRequest,
11};
12use crate::repositories::redis_base::RedisRepository;
13use crate::repositories::{
14    BatchDeleteResult, BatchRetrievalResult, PaginatedResult, Repository, TransactionDeleteRequest,
15    TransactionRepository,
16};
17use crate::utils::RedisConnections;
18use async_trait::async_trait;
19use chrono::Utc;
20use redis::{AsyncCommands, Script};
21use std::fmt;
22use std::sync::Arc;
23use tracing::{debug, error, warn};
24
25const RELAYER_PREFIX: &str = "relayer";
26const TX_PREFIX: &str = "tx";
27const STATUS_PREFIX: &str = "status";
28const STATUS_SORTED_PREFIX: &str = "status_sorted";
29const NONCE_PREFIX: &str = "nonce";
30const TX_TO_RELAYER_PREFIX: &str = "tx_to_relayer";
31const RELAYER_LIST_KEY: &str = "relayer_list";
32const TX_BY_CREATED_AT_PREFIX: &str = "tx_by_created_at";
33
34#[derive(Clone)]
35pub struct RedisTransactionRepository {
36    pub connections: Arc<RedisConnections>,
37    pub key_prefix: String,
38}
39
40impl RedisRepository for RedisTransactionRepository {}
41
42impl RedisTransactionRepository {
43    pub fn new(
44        connections: Arc<RedisConnections>,
45        key_prefix: String,
46    ) -> Result<Self, RepositoryError> {
47        if key_prefix.is_empty() {
48            return Err(RepositoryError::InvalidData(
49                "Redis key prefix cannot be empty".to_string(),
50            ));
51        }
52
53        Ok(Self {
54            connections,
55            key_prefix,
56        })
57    }
58
59    /// Generate key for transaction data: relayer:{relayer_id}:tx:{tx_id}
60    fn tx_key(&self, relayer_id: &str, tx_id: &str) -> String {
61        format!(
62            "{}:{}:{}:{}:{}",
63            self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX, tx_id
64        )
65    }
66
67    /// Generate key for reverse lookup: tx_to_relayer:{tx_id}
68    fn tx_to_relayer_key(&self, tx_id: &str) -> String {
69        format!(
70            "{}:{}:{}:{}",
71            self.key_prefix, RELAYER_PREFIX, TX_TO_RELAYER_PREFIX, tx_id
72        )
73    }
74
75    /// Generate key for relayer status index (legacy SET): relayer:{relayer_id}:status:{status}
76    fn relayer_status_key(&self, relayer_id: &str, status: &TransactionStatus) -> String {
77        format!(
78            "{}:{}:{}:{}:{}",
79            self.key_prefix, RELAYER_PREFIX, relayer_id, STATUS_PREFIX, status
80        )
81    }
82
83    /// Generate key for relayer status sorted index (SORTED SET): relayer:{relayer_id}:status_sorted:{status}
84    /// Score is created_at timestamp in milliseconds for efficient ordering.
85    fn relayer_status_sorted_key(&self, relayer_id: &str, status: &TransactionStatus) -> String {
86        format!(
87            "{}:{}:{}:{}:{}",
88            self.key_prefix, RELAYER_PREFIX, relayer_id, STATUS_SORTED_PREFIX, status
89        )
90    }
91
92    /// Generate key for relayer nonce index: relayer:{relayer_id}:nonce:{nonce}
93    fn relayer_nonce_key(&self, relayer_id: &str, nonce: u64) -> String {
94        format!(
95            "{}:{}:{}:{}:{}",
96            self.key_prefix, RELAYER_PREFIX, relayer_id, NONCE_PREFIX, nonce
97        )
98    }
99
100    /// Generate key for relayer list: relayer_list (set of all relayer IDs)
101    fn relayer_list_key(&self) -> String {
102        format!("{}:{}", self.key_prefix, RELAYER_LIST_KEY)
103    }
104
105    /// Generate key for relayer's sorted set by created_at: relayer:{relayer_id}:tx_by_created_at
106    fn relayer_tx_by_created_at_key(&self, relayer_id: &str) -> String {
107        format!(
108            "{}:{}:{}:{}",
109            self.key_prefix, RELAYER_PREFIX, relayer_id, TX_BY_CREATED_AT_PREFIX
110        )
111    }
112
113    /// Parse timestamp string to score for sorted set (milliseconds since epoch)
114    fn timestamp_to_score(&self, timestamp: &str) -> f64 {
115        chrono::DateTime::parse_from_rfc3339(timestamp)
116            .map(|dt| dt.timestamp_millis() as f64)
117            .unwrap_or_else(|_| {
118                warn!(timestamp = %timestamp, "failed to parse timestamp, using 0");
119                0.0
120            })
121    }
122
123    /// Compute the appropriate score for a transaction's status sorted set.
124    /// - For Confirmed status: use confirmed_at (on-chain confirmation order)
125    /// - For all other statuses: use created_at (queue/processing order)
126    fn status_sorted_score(&self, tx: &TransactionRepoModel) -> f64 {
127        if tx.status == TransactionStatus::Confirmed {
128            // For Confirmed, prefer confirmed_at for accurate on-chain ordering
129            if let Some(ref confirmed_at) = tx.confirmed_at {
130                return self.timestamp_to_score(confirmed_at);
131            }
132            // Fallback to created_at if confirmed_at not set (shouldn't happen)
133            warn!(tx_id = %tx.id, "Confirmed transaction missing confirmed_at, using created_at");
134        }
135        self.timestamp_to_score(&tx.created_at)
136    }
137
138    /// Batch fetch transactions by IDs using reverse lookup
139    async fn get_transactions_by_ids(
140        &self,
141        ids: &[String],
142    ) -> Result<BatchRetrievalResult<TransactionRepoModel>, RepositoryError> {
143        if ids.is_empty() {
144            debug!("no transaction IDs provided for batch fetch");
145            return Ok(BatchRetrievalResult {
146                results: vec![],
147                failed_ids: vec![],
148            });
149        }
150
151        let mut conn = self
152            .get_connection(self.connections.reader(), "batch_fetch_transactions")
153            .await?;
154
155        let reverse_keys: Vec<String> = ids.iter().map(|id| self.tx_to_relayer_key(id)).collect();
156
157        debug!(count = %ids.len(), "fetching relayer IDs for transactions");
158
159        let relayer_ids: Vec<Option<String>> = conn
160            .mget(&reverse_keys)
161            .await
162            .map_err(|e| self.map_redis_error(e, "batch_fetch_relayer_ids"))?;
163
164        let mut tx_keys = Vec::new();
165        let mut valid_ids = Vec::new();
166        let mut failed_ids = Vec::new();
167        for (i, relayer_id) in relayer_ids.into_iter().enumerate() {
168            match relayer_id {
169                Some(relayer_id) => {
170                    tx_keys.push(self.tx_key(&relayer_id, &ids[i]));
171                    valid_ids.push(ids[i].clone());
172                }
173                None => {
174                    warn!(tx_id = %ids[i], "no relayer found for transaction");
175                    failed_ids.push(ids[i].clone());
176                }
177            }
178        }
179
180        if tx_keys.is_empty() {
181            debug!("no valid transactions found for batch fetch");
182            return Ok(BatchRetrievalResult {
183                results: vec![],
184                failed_ids,
185            });
186        }
187
188        debug!(count = %tx_keys.len(), "batch fetching transaction data");
189
190        let values: Vec<Option<String>> = conn
191            .mget(&tx_keys)
192            .await
193            .map_err(|e| self.map_redis_error(e, "batch_fetch_transactions"))?;
194
195        let mut transactions = Vec::new();
196        let mut failed_count = 0;
197        let mut failed_ids = Vec::new();
198        for (i, value) in values.into_iter().enumerate() {
199            match value {
200                Some(json) => {
201                    match self.deserialize_entity::<TransactionRepoModel>(
202                        &json,
203                        &valid_ids[i],
204                        "transaction",
205                    ) {
206                        Ok(tx) => transactions.push(tx),
207                        Err(e) => {
208                            failed_count += 1;
209                            error!(tx_id = %valid_ids[i], error = %e, "failed to deserialize transaction");
210                            // Continue processing other transactions
211                        }
212                    }
213                }
214                None => {
215                    warn!(tx_id = %valid_ids[i], "transaction not found in batch fetch");
216                    failed_ids.push(valid_ids[i].clone());
217                }
218            }
219        }
220
221        if failed_count > 0 {
222            warn!(failed_count = %failed_count, total_count = %valid_ids.len(), "failed to deserialize transactions in batch");
223        }
224
225        debug!(count = %transactions.len(), "successfully fetched transactions");
226        Ok(BatchRetrievalResult {
227            results: transactions,
228            failed_ids,
229        })
230    }
231
232    /// Extract nonce from EVM transaction data
233    fn extract_nonce(&self, network_data: &NetworkTransactionData) -> Option<u64> {
234        match network_data.get_evm_transaction_data() {
235            Ok(tx_data) => tx_data.nonce,
236            Err(_) => {
237                debug!("no EVM transaction data available for nonce extraction");
238                None
239            }
240        }
241    }
242
243    /// Ensures the status sorted set exists, migrating from legacy SET if needed.
244    ///
245    /// This handles the transition from unordered SETs to sorted SETs for status indexing.
246    /// If the sorted set is empty but the legacy set has data, it migrates the data
247    /// by looking up each transaction's created_at timestamp to compute the score.
248    ///
249    /// # Concurrency
250    /// This function is safe for concurrent calls. If multiple calls race to migrate
251    /// the same status set:
252    /// - ZADD is idempotent (same member + score = no-op)
253    /// - DEL on non-existent key is safe (returns 0)
254    /// - After first successful migration, subsequent calls hit the fast path (ZCARD > 0)
255    ///
256    /// The only downside of concurrent migrations is wasted work, not data corruption.
257    ///
258    /// Returns the count of items in the sorted set after migration.
259    async fn ensure_status_sorted_set(
260        &self,
261        relayer_id: &str,
262        status: &TransactionStatus,
263    ) -> Result<u64, RepositoryError> {
264        let sorted_key = self.relayer_status_sorted_key(relayer_id, status);
265        let legacy_key = self.relayer_status_key(relayer_id, status);
266
267        // Phase 1: Check if migration is needed
268        let legacy_ids = {
269            let mut conn = self
270                .get_connection(self.connections.primary(), "ensure_status_sorted_set_check")
271                .await?;
272
273            // Always check if legacy set has data that needs migration
274            let legacy_count: u64 = conn
275                .scard(&legacy_key)
276                .await
277                .map_err(|e| self.map_redis_error(e, "ensure_status_sorted_set_scard"))?;
278
279            if legacy_count == 0 {
280                // No legacy data to migrate, return current ZSET count
281                let sorted_count: u64 = conn
282                    .zcard(&sorted_key)
283                    .await
284                    .map_err(|e| self.map_redis_error(e, "ensure_status_sorted_set_zcard"))?;
285                return Ok(sorted_count);
286            }
287
288            // Migration needed: get all IDs from legacy set
289            debug!(
290                relayer_id = %relayer_id,
291                status = %status,
292                legacy_count = %legacy_count,
293                "migrating status set to sorted set"
294            );
295
296            let ids: Vec<String> = conn
297                .smembers(&legacy_key)
298                .await
299                .map_err(|e| self.map_redis_error(e, "ensure_status_sorted_set_smembers"))?;
300
301            ids
302            // Connection dropped here before nested call to avoid connection doubling
303        };
304
305        if legacy_ids.is_empty() {
306            return Ok(0);
307        }
308
309        // Phase 2: Fetch transactions (uses its own connection internally)
310        let transactions = self.get_transactions_by_ids(&legacy_ids).await?;
311
312        // Phase 3: Perform migration with a new connection
313        let mut conn = self
314            .get_connection(
315                self.connections.primary(),
316                "ensure_status_sorted_set_migrate",
317            )
318            .await?;
319
320        if transactions.results.is_empty() {
321            // All transactions were stale/deleted, clean up legacy set
322            let _: () = conn
323                .del(&legacy_key)
324                .await
325                .map_err(|e| self.map_redis_error(e, "ensure_status_sorted_set_del_stale"))?;
326            return Ok(0);
327        }
328
329        // Build sorted set entries and migrate atomically
330        // Use status-aware scoring: confirmed_at for Confirmed, created_at for others
331        let mut pipe = redis::pipe();
332        pipe.atomic();
333
334        for tx in &transactions.results {
335            let score = self.status_sorted_score(tx);
336            pipe.zadd(&sorted_key, &tx.id, score);
337        }
338
339        // Delete legacy set after migration
340        pipe.del(&legacy_key);
341
342        pipe.query_async::<()>(&mut conn)
343            .await
344            .map_err(|e| self.map_redis_error(e, "ensure_status_sorted_set_migrate"))?;
345
346        let migrated_count = transactions.results.len() as u64;
347        debug!(
348            relayer_id = %relayer_id,
349            status = %status,
350            migrated_count = %migrated_count,
351            "completed migration of status set to sorted set"
352        );
353
354        Ok(migrated_count)
355    }
356
357    /// Update indexes atomically with comprehensive error handling
358    async fn update_indexes(
359        &self,
360        tx: &TransactionRepoModel,
361        old_tx: Option<&TransactionRepoModel>,
362    ) -> Result<(), RepositoryError> {
363        let mut conn = self
364            .get_connection(self.connections.primary(), "update_indexes")
365            .await?;
366        let mut pipe = redis::pipe();
367        pipe.atomic();
368
369        debug!(tx_id = %tx.id, "updating indexes for transaction");
370
371        // Add relayer to the global relayer list
372        let relayer_list_key = self.relayer_list_key();
373        pipe.sadd(&relayer_list_key, &tx.relayer_id);
374
375        // Compute scores for sorted sets
376        // Status sorted set: uses confirmed_at for Confirmed status, created_at for others
377        let status_score = self.status_sorted_score(tx);
378        // Global tx_by_created_at: always uses created_at for consistent ordering
379        let created_at_score = self.timestamp_to_score(&tx.created_at);
380
381        // Handle status index updates - write to SORTED SET (new format)
382        let new_status_sorted_key = self.relayer_status_sorted_key(&tx.relayer_id, &tx.status);
383        pipe.zadd(&new_status_sorted_key, &tx.id, status_score);
384        debug!(tx_id = %tx.id, status = %tx.status, score = %status_score, "added transaction to status sorted set");
385
386        if let Some(nonce) = self.extract_nonce(&tx.network_data) {
387            let nonce_key = self.relayer_nonce_key(&tx.relayer_id, nonce);
388            pipe.set(&nonce_key, &tx.id);
389            debug!(tx_id = %tx.id, nonce = %nonce, "added nonce index for transaction");
390        }
391
392        // Add to per-relayer sorted set by created_at (for efficient sorted pagination)
393        let relayer_sorted_key = self.relayer_tx_by_created_at_key(&tx.relayer_id);
394        pipe.zadd(&relayer_sorted_key, &tx.id, created_at_score);
395        debug!(tx_id = %tx.id, score = %created_at_score, "added transaction to sorted set by created_at");
396
397        // Remove old indexes if updating
398        if let Some(old) = old_tx {
399            if old.status != tx.status {
400                // Remove from old status sorted set (new format)
401                let old_status_sorted_key =
402                    self.relayer_status_sorted_key(&old.relayer_id, &old.status);
403                pipe.zrem(&old_status_sorted_key, &tx.id);
404
405                // Also clean up legacy SET if it exists (for migration cleanup)
406                let old_status_legacy_key = self.relayer_status_key(&old.relayer_id, &old.status);
407                pipe.srem(&old_status_legacy_key, &tx.id);
408
409                debug!(tx_id = %tx.id, old_status = %old.status, new_status = %tx.status, "removing old status indexes for transaction");
410            }
411
412            // Handle nonce index cleanup
413            if let Some(old_nonce) = self.extract_nonce(&old.network_data) {
414                let new_nonce = self.extract_nonce(&tx.network_data);
415                if Some(old_nonce) != new_nonce {
416                    let old_nonce_key = self.relayer_nonce_key(&old.relayer_id, old_nonce);
417                    pipe.del(&old_nonce_key);
418                    debug!(tx_id = %tx.id, old_nonce = %old_nonce, new_nonce = ?new_nonce, "removing old nonce index for transaction");
419                }
420            }
421        }
422
423        // Execute all operations in a single pipeline
424        pipe.exec_async(&mut conn).await.map_err(|e| {
425            error!(tx_id = %tx.id, error = %e, "index update pipeline failed for transaction");
426            self.map_redis_error(e, &format!("update_indexes_for_tx_{}", tx.id))
427        })?;
428
429        debug!(tx_id = %tx.id, "successfully updated indexes for transaction");
430        Ok(())
431    }
432
433    /// Remove all indexes with error recovery
434    async fn remove_all_indexes(&self, tx: &TransactionRepoModel) -> Result<(), RepositoryError> {
435        let mut conn = self
436            .get_connection(self.connections.primary(), "remove_all_indexes")
437            .await?;
438        let mut pipe = redis::pipe();
439        pipe.atomic();
440
441        debug!(tx_id = %tx.id, "removing all indexes for transaction");
442
443        // Remove from ALL possible status indexes to ensure complete cleanup
444        // This handles cases where a transaction might be in multiple status sets
445        // due to race conditions, partial failures, or bugs
446        for status in &[
447            TransactionStatus::Canceled,
448            TransactionStatus::Pending,
449            TransactionStatus::Sent,
450            TransactionStatus::Submitted,
451            TransactionStatus::Mined,
452            TransactionStatus::Confirmed,
453            TransactionStatus::Failed,
454            TransactionStatus::Expired,
455        ] {
456            // Remove from sorted status set (new format)
457            let status_sorted_key = self.relayer_status_sorted_key(&tx.relayer_id, status);
458            pipe.zrem(&status_sorted_key, &tx.id);
459
460            // Remove from legacy status set (for migration cleanup)
461            let status_legacy_key = self.relayer_status_key(&tx.relayer_id, status);
462            pipe.srem(&status_legacy_key, &tx.id);
463        }
464
465        // Remove nonce index if exists
466        if let Some(nonce) = self.extract_nonce(&tx.network_data) {
467            let nonce_key = self.relayer_nonce_key(&tx.relayer_id, nonce);
468            pipe.del(&nonce_key);
469            debug!(tx_id = %tx.id, nonce = %nonce, "removing nonce index for transaction");
470        }
471
472        // Remove from per-relayer sorted set by created_at
473        let relayer_sorted_key = self.relayer_tx_by_created_at_key(&tx.relayer_id);
474        pipe.zrem(&relayer_sorted_key, &tx.id);
475        debug!(tx_id = %tx.id, "removing transaction from sorted set by created_at");
476
477        // Remove reverse lookup
478        let reverse_key = self.tx_to_relayer_key(&tx.id);
479        pipe.del(&reverse_key);
480
481        pipe.exec_async(&mut conn).await.map_err(|e| {
482            error!(tx_id = %tx.id, error = %e, "index removal failed for transaction");
483            self.map_redis_error(e, &format!("remove_indexes_for_tx_{}", tx.id))
484        })?;
485
486        debug!(tx_id = %tx.id, "successfully removed all indexes for transaction");
487        Ok(())
488    }
489}
490
491impl fmt::Debug for RedisTransactionRepository {
492    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
493        f.debug_struct("RedisTransactionRepository")
494            .field("connections", &"<RedisConnections>")
495            .field("key_prefix", &self.key_prefix)
496            .finish()
497    }
498}
499
500#[async_trait]
501impl Repository<TransactionRepoModel, String> for RedisTransactionRepository {
502    async fn create(
503        &self,
504        entity: TransactionRepoModel,
505    ) -> Result<TransactionRepoModel, RepositoryError> {
506        if entity.id.is_empty() {
507            return Err(RepositoryError::InvalidData(
508                "Transaction ID cannot be empty".to_string(),
509            ));
510        }
511
512        let key = self.tx_key(&entity.relayer_id, &entity.id);
513        let reverse_key = self.tx_to_relayer_key(&entity.id);
514        let mut conn = self
515            .get_connection(self.connections.primary(), "create")
516            .await?;
517
518        debug!(tx_id = %entity.id, "creating transaction");
519
520        let value = self.serialize_entity(&entity, |t| &t.id, "transaction")?;
521
522        // Check if transaction already exists by checking reverse lookup
523        let existing: Option<String> = conn
524            .get(&reverse_key)
525            .await
526            .map_err(|e| self.map_redis_error(e, "create_transaction_check"))?;
527
528        if existing.is_some() {
529            return Err(RepositoryError::ConstraintViolation(format!(
530                "Transaction with ID {} already exists",
531                entity.id
532            )));
533        }
534
535        // Use atomic pipeline for consistency
536        let mut pipe = redis::pipe();
537        pipe.atomic();
538        pipe.set(&key, &value);
539        pipe.set(&reverse_key, &entity.relayer_id);
540
541        pipe.exec_async(&mut conn)
542            .await
543            .map_err(|e| self.map_redis_error(e, "create_transaction"))?;
544
545        // Update indexes separately to handle partial failures gracefully
546        if let Err(e) = self.update_indexes(&entity, None).await {
547            error!(tx_id = %entity.id, error = %e, "failed to update indexes for new transaction");
548            return Err(e);
549        }
550
551        // Track transaction creation metric
552        let network_type = format!("{:?}", entity.network_type).to_lowercase();
553        let relayer_id = entity.relayer_id.as_str();
554        TRANSACTIONS_CREATED
555            .with_label_values(&[relayer_id, &network_type])
556            .inc();
557
558        // Track initial status distribution (Pending)
559        let status = &entity.status;
560        let status_str = format!("{status:?}").to_lowercase();
561        TRANSACTIONS_BY_STATUS
562            .with_label_values(&[relayer_id, &network_type, &status_str])
563            .inc();
564
565        debug!(tx_id = %entity.id, "successfully created transaction");
566        Ok(entity)
567    }
568
569    async fn get_by_id(&self, id: String) -> Result<TransactionRepoModel, RepositoryError> {
570        if id.is_empty() {
571            return Err(RepositoryError::InvalidData(
572                "Transaction ID cannot be empty".to_string(),
573            ));
574        }
575
576        let mut conn = self
577            .get_connection(self.connections.reader(), "get_by_id")
578            .await?;
579
580        debug!(tx_id = %id, "fetching transaction");
581
582        let reverse_key = self.tx_to_relayer_key(&id);
583        let relayer_id: Option<String> = conn
584            .get(&reverse_key)
585            .await
586            .map_err(|e| self.map_redis_error(e, "get_transaction_reverse_lookup"))?;
587
588        let relayer_id = match relayer_id {
589            Some(relayer_id) => relayer_id,
590            None => {
591                debug!(tx_id = %id, "transaction not found (no reverse lookup)");
592                return Err(RepositoryError::NotFound(format!(
593                    "Transaction with ID {id} not found"
594                )));
595            }
596        };
597
598        let key = self.tx_key(&relayer_id, &id);
599        let value: Option<String> = conn
600            .get(&key)
601            .await
602            .map_err(|e| self.map_redis_error(e, "get_transaction_by_id"))?;
603
604        match value {
605            Some(json) => {
606                let tx =
607                    self.deserialize_entity::<TransactionRepoModel>(&json, &id, "transaction")?;
608                debug!(tx_id = %id, "successfully fetched transaction");
609                Ok(tx)
610            }
611            None => {
612                debug!(tx_id = %id, "transaction not found");
613                Err(RepositoryError::NotFound(format!(
614                    "Transaction with ID {id} not found"
615                )))
616            }
617        }
618    }
619
620    // Unoptimized implementation of list_paginated. Rarely used. find_by_relayer_id is preferred.
621    async fn list_all(&self) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
622        let mut conn = self
623            .get_connection(self.connections.reader(), "list_all")
624            .await?;
625
626        debug!("fetching all transactions sorted by created_at (newest first)");
627
628        // Get all relayer IDs
629        let relayer_list_key = self.relayer_list_key();
630        let relayer_ids: Vec<String> = conn
631            .smembers(&relayer_list_key)
632            .await
633            .map_err(|e| self.map_redis_error(e, "list_all_relayer_ids"))?;
634
635        debug!(count = %relayer_ids.len(), "found relayers");
636
637        // Collect all transaction IDs from all relayers using their sorted sets
638        let mut all_tx_ids = Vec::new();
639        for relayer_id in relayer_ids {
640            let relayer_sorted_key = self.relayer_tx_by_created_at_key(&relayer_id);
641            let tx_ids: Vec<String> = redis::cmd("ZRANGE")
642                .arg(&relayer_sorted_key)
643                .arg(0)
644                .arg(-1)
645                .arg("REV")
646                .query_async(&mut conn)
647                .await
648                .map_err(|e| self.map_redis_error(e, "list_all_relayer_sorted"))?;
649
650            all_tx_ids.extend(tx_ids);
651        }
652
653        // Release connection before nested call to avoid connection doubling
654        drop(conn);
655
656        // Batch fetch all transactions at once
657        let batch_result = self.get_transactions_by_ids(&all_tx_ids).await?;
658        let mut all_transactions = batch_result.results;
659
660        // Sort all transactions by created_at (newest first)
661        all_transactions.sort_by(|a, b| b.created_at.cmp(&a.created_at));
662
663        debug!(count = %all_transactions.len(), "found transactions");
664        Ok(all_transactions)
665    }
666
667    // Unoptimized implementation of list_paginated. Rarely used. find_by_relayer_id is preferred.
668    async fn list_paginated(
669        &self,
670        query: PaginationQuery,
671    ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
672        if query.per_page == 0 {
673            return Err(RepositoryError::InvalidData(
674                "per_page must be greater than 0".to_string(),
675            ));
676        }
677
678        let mut conn = self
679            .get_connection(self.connections.reader(), "list_paginated")
680            .await?;
681
682        debug!(page = %query.page, per_page = %query.per_page, "fetching paginated transactions sorted by created_at (newest first)");
683
684        // Get all relayer IDs
685        let relayer_list_key = self.relayer_list_key();
686        let relayer_ids: Vec<String> = conn
687            .smembers(&relayer_list_key)
688            .await
689            .map_err(|e| self.map_redis_error(e, "list_paginated_relayer_ids"))?;
690
691        // Collect all transaction IDs from all relayers using their sorted sets
692        let mut all_tx_ids = Vec::new();
693        for relayer_id in relayer_ids {
694            let relayer_sorted_key = self.relayer_tx_by_created_at_key(&relayer_id);
695            let tx_ids: Vec<String> = redis::cmd("ZRANGE")
696                .arg(&relayer_sorted_key)
697                .arg(0)
698                .arg(-1)
699                .arg("REV")
700                .query_async(&mut conn)
701                .await
702                .map_err(|e| self.map_redis_error(e, "list_paginated_relayer_sorted"))?;
703
704            all_tx_ids.extend(tx_ids);
705        }
706
707        // Release connection before nested call to avoid connection doubling
708        drop(conn);
709
710        // Batch fetch all transactions at once
711        let batch_result = self.get_transactions_by_ids(&all_tx_ids).await?;
712        let mut all_transactions = batch_result.results;
713
714        // Sort all transactions by created_at (newest first)
715        all_transactions.sort_by(|a, b| b.created_at.cmp(&a.created_at));
716
717        let total = all_transactions.len() as u64;
718        let start = ((query.page - 1) * query.per_page) as usize;
719        let end = (start + query.per_page as usize).min(all_transactions.len());
720
721        if start >= all_transactions.len() {
722            debug!(page = %query.page, total = %total, "page is beyond available data");
723            return Ok(PaginatedResult {
724                items: vec![],
725                total,
726                page: query.page,
727                per_page: query.per_page,
728            });
729        }
730
731        let items = all_transactions[start..end].to_vec();
732
733        debug!(count = %items.len(), page = %query.page, "successfully fetched transactions for page");
734
735        Ok(PaginatedResult {
736            items,
737            total,
738            page: query.page,
739            per_page: query.per_page,
740        })
741    }
742
743    async fn update(
744        &self,
745        id: String,
746        entity: TransactionRepoModel,
747    ) -> Result<TransactionRepoModel, RepositoryError> {
748        if id.is_empty() {
749            return Err(RepositoryError::InvalidData(
750                "Transaction ID cannot be empty".to_string(),
751            ));
752        }
753
754        debug!(tx_id = %id, "updating transaction");
755
756        // Get the old transaction for index cleanup
757        let old_tx = self.get_by_id(id.clone()).await?;
758
759        let key = self.tx_key(&entity.relayer_id, &id);
760        let mut conn = self
761            .get_connection(self.connections.primary(), "update")
762            .await?;
763
764        let value = self.serialize_entity(&entity, |t| &t.id, "transaction")?;
765
766        // Update transaction
767        let _: () = conn
768            .set(&key, value)
769            .await
770            .map_err(|e| self.map_redis_error(e, "update_transaction"))?;
771
772        // Update indexes
773        self.update_indexes(&entity, Some(&old_tx)).await?;
774
775        debug!(tx_id = %id, "successfully updated transaction");
776        Ok(entity)
777    }
778
779    async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError> {
780        if id.is_empty() {
781            return Err(RepositoryError::InvalidData(
782                "Transaction ID cannot be empty".to_string(),
783            ));
784        }
785
786        debug!(tx_id = %id, "deleting transaction");
787
788        // Get transaction first for index cleanup
789        let tx = self.get_by_id(id.clone()).await?;
790
791        let key = self.tx_key(&tx.relayer_id, &id);
792        let reverse_key = self.tx_to_relayer_key(&id);
793        let mut conn = self
794            .get_connection(self.connections.primary(), "delete_by_id")
795            .await?;
796
797        let mut pipe = redis::pipe();
798        pipe.atomic();
799        pipe.del(&key);
800        pipe.del(&reverse_key);
801
802        pipe.exec_async(&mut conn)
803            .await
804            .map_err(|e| self.map_redis_error(e, "delete_transaction"))?;
805
806        // Remove indexes (log errors but don't fail the delete)
807        if let Err(e) = self.remove_all_indexes(&tx).await {
808            error!(tx_id = %id, error = %e, "failed to remove indexes for deleted transaction");
809        }
810
811        debug!(tx_id = %id, "successfully deleted transaction");
812        Ok(())
813    }
814
815    // Unoptimized implementation of count. Rarely used. find_by_relayer_id is preferred.
816    async fn count(&self) -> Result<usize, RepositoryError> {
817        let mut conn = self
818            .get_connection(self.connections.reader(), "count")
819            .await?;
820
821        debug!("counting transactions");
822
823        // Get all relayer IDs and sum their sorted set counts
824        let relayer_list_key = self.relayer_list_key();
825        let relayer_ids: Vec<String> = conn
826            .smembers(&relayer_list_key)
827            .await
828            .map_err(|e| self.map_redis_error(e, "count_relayer_ids"))?;
829
830        let mut total_count = 0usize;
831        for relayer_id in relayer_ids {
832            let relayer_sorted_key = self.relayer_tx_by_created_at_key(&relayer_id);
833            let count: usize = conn
834                .zcard(&relayer_sorted_key)
835                .await
836                .map_err(|e| self.map_redis_error(e, "count_relayer_transactions"))?;
837            total_count += count;
838        }
839
840        debug!(count = %total_count, "transaction count");
841        Ok(total_count)
842    }
843
844    async fn has_entries(&self) -> Result<bool, RepositoryError> {
845        let mut conn = self
846            .get_connection(self.connections.reader(), "has_entries")
847            .await?;
848        let relayer_list_key = self.relayer_list_key();
849
850        debug!("checking if transaction entries exist");
851
852        let exists: bool = conn
853            .exists(&relayer_list_key)
854            .await
855            .map_err(|e| self.map_redis_error(e, "has_entries_check"))?;
856
857        debug!(exists = %exists, "transaction entries exist");
858        Ok(exists)
859    }
860
861    async fn drop_all_entries(&self) -> Result<(), RepositoryError> {
862        let mut conn = self
863            .get_connection(self.connections.primary(), "drop_all_entries")
864            .await?;
865        let relayer_list_key = self.relayer_list_key();
866
867        debug!("dropping all transaction entries");
868
869        // Get all relayer IDs first
870        let relayer_ids: Vec<String> = conn
871            .smembers(&relayer_list_key)
872            .await
873            .map_err(|e| self.map_redis_error(e, "drop_all_entries_get_relayer_ids"))?;
874
875        if relayer_ids.is_empty() {
876            debug!("no transaction entries to drop");
877            return Ok(());
878        }
879
880        // Use pipeline for atomic operations
881        let mut pipe = redis::pipe();
882        pipe.atomic();
883
884        // Delete all transactions and their indexes for each relayer
885        for relayer_id in &relayer_ids {
886            // Get all transaction IDs for this relayer
887            let pattern = format!(
888                "{}:{}:{}:{}:*",
889                self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX
890            );
891            let mut cursor = 0;
892            let mut tx_ids = Vec::new();
893
894            loop {
895                let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
896                    .cursor_arg(cursor)
897                    .arg("MATCH")
898                    .arg(&pattern)
899                    .query_async(&mut conn)
900                    .await
901                    .map_err(|e| self.map_redis_error(e, "drop_all_entries_scan"))?;
902
903                // Extract transaction IDs from keys and delete keys
904                for key in keys {
905                    pipe.del(&key);
906                    if let Some(tx_id) = key.split(':').next_back() {
907                        tx_ids.push(tx_id.to_string());
908                    }
909                }
910
911                cursor = next_cursor;
912                if cursor == 0 {
913                    break;
914                }
915            }
916
917            // Delete reverse lookup keys and indexes
918            for tx_id in tx_ids {
919                let reverse_key = self.tx_to_relayer_key(&tx_id);
920                pipe.del(&reverse_key);
921
922                // Delete status indexes (we can't know the specific status, so we'll clean up all possible ones)
923                // This ensures complete cleanup even if there are orphaned entries
924                for status in &[
925                    TransactionStatus::Canceled,
926                    TransactionStatus::Pending,
927                    TransactionStatus::Sent,
928                    TransactionStatus::Submitted,
929                    TransactionStatus::Mined,
930                    TransactionStatus::Confirmed,
931                    TransactionStatus::Failed,
932                    TransactionStatus::Expired,
933                ] {
934                    // Remove from sorted status set (new format)
935                    let status_sorted_key = self.relayer_status_sorted_key(relayer_id, status);
936                    pipe.zrem(&status_sorted_key, &tx_id);
937
938                    // Remove from legacy status set (for migration cleanup)
939                    let status_key = self.relayer_status_key(relayer_id, status);
940                    pipe.srem(&status_key, &tx_id);
941                }
942            }
943
944            // Delete the relayer's sorted set by created_at
945            let relayer_sorted_key = self.relayer_tx_by_created_at_key(relayer_id);
946            pipe.del(&relayer_sorted_key);
947        }
948
949        // Delete the relayer list key
950        pipe.del(&relayer_list_key);
951
952        pipe.exec_async(&mut conn)
953            .await
954            .map_err(|e| self.map_redis_error(e, "drop_all_entries_pipeline"))?;
955
956        debug!(count = %relayer_ids.len(), "dropped all transaction entries for relayers");
957        Ok(())
958    }
959}
960
961#[async_trait]
962impl TransactionRepository for RedisTransactionRepository {
963    async fn find_by_relayer_id(
964        &self,
965        relayer_id: &str,
966        query: PaginationQuery,
967    ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
968        let mut conn = self
969            .get_connection(self.connections.reader(), "find_by_relayer_id")
970            .await?;
971
972        debug!(relayer_id = %relayer_id, page = %query.page, per_page = %query.per_page, "fetching transactions for relayer sorted by created_at (newest first)");
973
974        let relayer_sorted_key = self.relayer_tx_by_created_at_key(relayer_id);
975
976        // Get total count from relayer's sorted set
977        let sorted_set_count: u64 = conn
978            .zcard(&relayer_sorted_key)
979            .await
980            .map_err(|e| self.map_redis_error(e, "find_by_relayer_id_count"))?;
981
982        // If sorted set is empty, return empty result immediately
983        // All new transactions are automatically added to the sorted set
984        if sorted_set_count == 0 {
985            debug!(relayer_id = %relayer_id, "no transactions found for relayer (sorted set is empty)");
986            return Ok(PaginatedResult {
987                items: vec![],
988                total: 0,
989                page: query.page,
990                per_page: query.per_page,
991            });
992        }
993
994        let total = sorted_set_count;
995
996        // Calculate pagination range (0-indexed for Redis ZRANGE with REV)
997        let start = ((query.page - 1) * query.per_page) as isize;
998        let end = start + query.per_page as isize - 1;
999
1000        if start as u64 >= total {
1001            debug!(relayer_id = %relayer_id, page = %query.page, total = %total, "page is beyond available data");
1002            return Ok(PaginatedResult {
1003                items: vec![],
1004                total,
1005                page: query.page,
1006                per_page: query.per_page,
1007            });
1008        }
1009
1010        // Get page of transaction IDs from sorted set (newest first using ZRANGE with REV)
1011        let page_ids: Vec<String> = redis::cmd("ZRANGE")
1012            .arg(&relayer_sorted_key)
1013            .arg(start)
1014            .arg(end)
1015            .arg("REV")
1016            .query_async(&mut conn)
1017            .await
1018            .map_err(|e| self.map_redis_error(e, "find_by_relayer_id_sorted"))?;
1019
1020        // Release connection before nested call to avoid connection doubling
1021        drop(conn);
1022
1023        let items = self.get_transactions_by_ids(&page_ids).await?;
1024
1025        debug!(relayer_id = %relayer_id, count = %items.results.len(), page = %query.page, "successfully fetched transactions for relayer");
1026
1027        Ok(PaginatedResult {
1028            items: items.results,
1029            total,
1030            page: query.page,
1031            per_page: query.per_page,
1032        })
1033    }
1034
1035    // Unoptimized implementation of find_by_status. Rarely used. find_by_status_paginated is preferred.
1036    async fn find_by_status(
1037        &self,
1038        relayer_id: &str,
1039        statuses: &[TransactionStatus],
1040    ) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
1041        // Ensure all status sorted sets are migrated first (releases connection after each)
1042        for status in statuses {
1043            self.ensure_status_sorted_set(relayer_id, status).await?;
1044        }
1045
1046        // Now get a connection and collect all IDs
1047        let mut conn = self
1048            .get_connection(self.connections.reader(), "find_by_status")
1049            .await?;
1050
1051        let mut all_ids: Vec<String> = Vec::new();
1052        for status in statuses {
1053            // Get IDs from sorted set (already ordered by created_at)
1054            let sorted_key = self.relayer_status_sorted_key(relayer_id, status);
1055            let ids: Vec<String> = redis::cmd("ZRANGE")
1056                .arg(&sorted_key)
1057                .arg(0)
1058                .arg(-1)
1059                .arg("REV") // Newest first
1060                .query_async(&mut conn)
1061                .await
1062                .map_err(|e| self.map_redis_error(e, "find_by_status"))?;
1063
1064            all_ids.extend(ids);
1065        }
1066
1067        // Release connection before nested call to avoid connection doubling
1068        drop(conn);
1069
1070        if all_ids.is_empty() {
1071            return Ok(vec![]);
1072        }
1073
1074        // Remove duplicates (can happen if a transaction is in multiple status sets due to partial failures)
1075        all_ids.sort();
1076        all_ids.dedup();
1077
1078        // Fetch all transactions and sort by created_at (newest first)
1079        let mut transactions = self.get_transactions_by_ids(&all_ids).await?;
1080
1081        // Sort by created_at descending (newest first)
1082        transactions
1083            .results
1084            .sort_by(|a, b| b.created_at.cmp(&a.created_at));
1085
1086        Ok(transactions.results)
1087    }
1088
1089    async fn find_by_status_paginated(
1090        &self,
1091        relayer_id: &str,
1092        statuses: &[TransactionStatus],
1093        query: PaginationQuery,
1094        oldest_first: bool,
1095    ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
1096        // Ensure all status sorted sets are migrated first (releases connection after each)
1097        for status in statuses {
1098            self.ensure_status_sorted_set(relayer_id, status).await?;
1099        }
1100
1101        let mut conn = self
1102            .get_connection(self.connections.reader(), "find_by_status_paginated")
1103            .await?;
1104
1105        // For single status, we can paginate directly from the sorted set
1106        if statuses.len() == 1 {
1107            let sorted_key = self.relayer_status_sorted_key(relayer_id, &statuses[0]);
1108
1109            // Get total count
1110            let total: u64 = conn
1111                .zcard(&sorted_key)
1112                .await
1113                .map_err(|e| self.map_redis_error(e, "find_by_status_paginated_count"))?;
1114
1115            if total == 0 {
1116                return Ok(PaginatedResult {
1117                    items: vec![],
1118                    total: 0,
1119                    page: query.page,
1120                    per_page: query.per_page,
1121                });
1122            }
1123
1124            // Calculate pagination bounds
1125            let start = ((query.page.saturating_sub(1)) * query.per_page) as isize;
1126            let end = start + query.per_page as isize - 1;
1127
1128            // Get page of IDs directly from sorted set
1129            // REV = newest first (descending), no REV = oldest first (ascending)
1130            let mut cmd = redis::cmd("ZRANGE");
1131            cmd.arg(&sorted_key).arg(start).arg(end);
1132            if !oldest_first {
1133                cmd.arg("REV");
1134            }
1135            let page_ids: Vec<String> = cmd
1136                .query_async(&mut conn)
1137                .await
1138                .map_err(|e| self.map_redis_error(e, "find_by_status_paginated"))?;
1139
1140            // Release connection before nested call to avoid connection doubling
1141            drop(conn);
1142
1143            let transactions = self.get_transactions_by_ids(&page_ids).await?;
1144
1145            debug!(
1146                relayer_id = %relayer_id,
1147                status = %statuses[0],
1148                total = %total,
1149                page = %query.page,
1150                page_size = %transactions.results.len(),
1151                "fetched paginated transactions by single status"
1152            );
1153
1154            return Ok(PaginatedResult {
1155                items: transactions.results,
1156                total,
1157                page: query.page,
1158                per_page: query.per_page,
1159            });
1160        }
1161
1162        // For multiple statuses, collect all IDs and merge
1163        let mut all_ids: Vec<(String, f64)> = Vec::new();
1164        for status in statuses {
1165            let sorted_key = self.relayer_status_sorted_key(relayer_id, status);
1166
1167            // Get IDs with scores for proper sorting
1168            let ids_with_scores: Vec<(String, f64)> = redis::cmd("ZRANGE")
1169                .arg(&sorted_key)
1170                .arg(0)
1171                .arg(-1)
1172                .arg("WITHSCORES")
1173                .query_async(&mut conn)
1174                .await
1175                .map_err(|e| self.map_redis_error(e, "find_by_status_paginated_multi"))?;
1176
1177            all_ids.extend(ids_with_scores);
1178        }
1179
1180        // Release connection before nested call to avoid connection doubling
1181        drop(conn);
1182
1183        // Remove duplicates (keep highest/lowest score based on sort order)
1184        let mut id_map: std::collections::HashMap<String, f64> = std::collections::HashMap::new();
1185        for (id, score) in all_ids {
1186            id_map
1187                .entry(id)
1188                .and_modify(|s| {
1189                    // For oldest_first, keep the lowest score; otherwise keep highest
1190                    if oldest_first {
1191                        if score < *s {
1192                            *s = score
1193                        }
1194                    } else if score > *s {
1195                        *s = score
1196                    }
1197                })
1198                .or_insert(score);
1199        }
1200
1201        // Sort by score: descending for newest first, ascending for oldest first
1202        let mut sorted_ids: Vec<(String, f64)> = id_map.into_iter().collect();
1203        if oldest_first {
1204            sorted_ids.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
1205        } else {
1206            sorted_ids.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
1207        }
1208
1209        let total = sorted_ids.len() as u64;
1210
1211        if total == 0 {
1212            return Ok(PaginatedResult {
1213                items: vec![],
1214                total: 0,
1215                page: query.page,
1216                per_page: query.per_page,
1217            });
1218        }
1219
1220        // Apply pagination
1221        let start = ((query.page.saturating_sub(1)) * query.per_page) as usize;
1222        let page_ids: Vec<String> = sorted_ids
1223            .into_iter()
1224            .skip(start)
1225            .take(query.per_page as usize)
1226            .map(|(id, _)| id)
1227            .collect();
1228
1229        // Fetch only the transactions for this page
1230        let transactions = self.get_transactions_by_ids(&page_ids).await?;
1231
1232        debug!(
1233            relayer_id = %relayer_id,
1234            total = %total,
1235            page = %query.page,
1236            page_size = %transactions.results.len(),
1237            "fetched paginated transactions by status"
1238        );
1239
1240        Ok(PaginatedResult {
1241            items: transactions.results,
1242            total,
1243            page: query.page,
1244            per_page: query.per_page,
1245        })
1246    }
1247
1248    async fn find_by_nonce(
1249        &self,
1250        relayer_id: &str,
1251        nonce: u64,
1252    ) -> Result<Option<TransactionRepoModel>, RepositoryError> {
1253        let mut conn = self
1254            .get_connection(self.connections.reader(), "find_by_nonce")
1255            .await?;
1256        let nonce_key = self.relayer_nonce_key(relayer_id, nonce);
1257
1258        // Get transaction ID with this nonce for this relayer (should be single value)
1259        let tx_id: Option<String> = conn
1260            .get(nonce_key)
1261            .await
1262            .map_err(|e| self.map_redis_error(e, "find_by_nonce"))?;
1263
1264        match tx_id {
1265            Some(tx_id) => {
1266                match self.get_by_id(tx_id.clone()).await {
1267                    Ok(tx) => Ok(Some(tx)),
1268                    Err(RepositoryError::NotFound(_)) => {
1269                        // Transaction was deleted but index wasn't cleaned up
1270                        warn!(relayer_id = %relayer_id, nonce = %nonce, "stale nonce index found for relayer");
1271                        Ok(None)
1272                    }
1273                    Err(e) => Err(e),
1274                }
1275            }
1276            None => Ok(None),
1277        }
1278    }
1279
1280    async fn update_status(
1281        &self,
1282        tx_id: String,
1283        status: TransactionStatus,
1284    ) -> Result<TransactionRepoModel, RepositoryError> {
1285        let update = TransactionUpdateRequest {
1286            status: Some(status),
1287            ..Default::default()
1288        };
1289        self.partial_update(tx_id, update).await
1290    }
1291
1292    async fn partial_update(
1293        &self,
1294        tx_id: String,
1295        update: TransactionUpdateRequest,
1296    ) -> Result<TransactionRepoModel, RepositoryError> {
1297        const MAX_RETRIES: u32 = 3;
1298        const BACKOFF_MS: u64 = 100;
1299
1300        // Optimistic CAS: only apply update if the current stored value still matches the
1301        // expected pre-update value. This avoids duplicate status metric updates on races.
1302        let mut original_tx = self.get_by_id(tx_id.clone()).await?;
1303        let mut updated_tx = original_tx.clone();
1304        updated_tx.apply_partial_update(update.clone());
1305
1306        let key = self.tx_key(&updated_tx.relayer_id, &tx_id);
1307        let mut original_value = self.serialize_entity(&original_tx, |t| &t.id, "transaction")?;
1308        let mut updated_value = self.serialize_entity(&updated_tx, |t| &t.id, "transaction")?;
1309        let mut data_updated = false;
1310
1311        let mut last_error = None;
1312
1313        for attempt in 0..MAX_RETRIES {
1314            let mut conn = match self
1315                .get_connection(self.connections.primary(), "partial_update")
1316                .await
1317            {
1318                Ok(conn) => conn,
1319                Err(e) => {
1320                    last_error = Some(e);
1321                    if attempt < MAX_RETRIES - 1 {
1322                        tokio::time::sleep(tokio::time::Duration::from_millis(BACKOFF_MS)).await;
1323                        continue;
1324                    }
1325                    return Err(last_error.unwrap());
1326                }
1327            };
1328
1329            if !data_updated {
1330                let cas_script = Script::new(
1331                    r#"
1332                    local current = redis.call('GET', KEYS[1])
1333                    if not current then
1334                        return -1
1335                    end
1336                    if current == ARGV[1] then
1337                        redis.call('SET', KEYS[1], ARGV[2])
1338                        return 1
1339                    end
1340                    return 0
1341                    "#,
1342                );
1343
1344                let cas_result: i32 = match cas_script
1345                    .key(&key)
1346                    .arg(&original_value)
1347                    .arg(&updated_value)
1348                    .invoke_async(&mut conn)
1349                    .await
1350                {
1351                    Ok(result) => result,
1352                    Err(e) => {
1353                        if attempt < MAX_RETRIES - 1 {
1354                            warn!(tx_id = %tx_id, attempt = %attempt, error = %e, "failed CAS transaction update, retrying");
1355                            last_error = Some(self.map_redis_error(e, "partial_update_cas"));
1356                            tokio::time::sleep(tokio::time::Duration::from_millis(BACKOFF_MS))
1357                                .await;
1358                            continue;
1359                        }
1360                        return Err(self.map_redis_error(e, "partial_update_cas"));
1361                    }
1362                };
1363
1364                if cas_result == -1 {
1365                    return Err(RepositoryError::NotFound(format!(
1366                        "Transaction with ID {tx_id} not found"
1367                    )));
1368                }
1369
1370                if cas_result == 0 {
1371                    if attempt < MAX_RETRIES - 1 {
1372                        warn!(tx_id = %tx_id, attempt = %attempt, "concurrent transaction update detected, rebasing retry");
1373                        original_tx = self.get_by_id(tx_id.clone()).await?;
1374                        updated_tx = original_tx.clone();
1375                        updated_tx.apply_partial_update(update.clone());
1376                        original_value =
1377                            self.serialize_entity(&original_tx, |t| &t.id, "transaction")?;
1378                        updated_value =
1379                            self.serialize_entity(&updated_tx, |t| &t.id, "transaction")?;
1380                        tokio::time::sleep(tokio::time::Duration::from_millis(BACKOFF_MS)).await;
1381                        continue;
1382                    }
1383                    return Err(RepositoryError::TransactionFailure(format!(
1384                        "Concurrent update conflict for transaction {tx_id}"
1385                    )));
1386                }
1387
1388                data_updated = true;
1389            }
1390
1391            // Try to update indexes with the original pre-update state
1392            // This ensures stale indexes are removed even on retry attempts
1393            match self.update_indexes(&updated_tx, Some(&original_tx)).await {
1394                Ok(_) => {
1395                    debug!(tx_id = %tx_id, attempt = %attempt, "successfully updated transaction");
1396
1397                    // Track metrics for transaction state changes
1398                    if let Some(new_status) = &update.status {
1399                        let network_type = format!("{:?}", updated_tx.network_type).to_lowercase();
1400                        let relayer_id = updated_tx.relayer_id.as_str();
1401
1402                        // Track submission (when status changes to Submitted)
1403                        if original_tx.status != TransactionStatus::Submitted
1404                            && *new_status == TransactionStatus::Submitted
1405                        {
1406                            TRANSACTIONS_SUBMITTED
1407                                .with_label_values(&[relayer_id, &network_type])
1408                                .inc();
1409
1410                            // Track processing time: creation to submission
1411                            if let Ok(created_time) =
1412                                chrono::DateTime::parse_from_rfc3339(&updated_tx.created_at)
1413                            {
1414                                let processing_seconds =
1415                                    (Utc::now() - created_time.with_timezone(&Utc)).num_seconds()
1416                                        as f64;
1417                                TRANSACTION_PROCESSING_TIME
1418                                    .with_label_values(&[
1419                                        relayer_id,
1420                                        &network_type,
1421                                        "creation_to_submission",
1422                                    ])
1423                                    .observe(processing_seconds);
1424                            }
1425                        }
1426
1427                        // Track status distribution (update gauge when status changes)
1428                        if original_tx.status != *new_status {
1429                            // Decrement old status and clamp to zero to avoid negative gauges.
1430                            let old_status = &original_tx.status;
1431                            let old_status_str = format!("{old_status:?}").to_lowercase();
1432                            let old_status_gauge = TRANSACTIONS_BY_STATUS.with_label_values(&[
1433                                relayer_id,
1434                                &network_type,
1435                                &old_status_str,
1436                            ]);
1437                            let clamped_value = (old_status_gauge.get() - 1.0).max(0.0);
1438                            old_status_gauge.set(clamped_value);
1439
1440                            // Increment new status
1441                            let new_status_str = format!("{new_status:?}").to_lowercase();
1442                            TRANSACTIONS_BY_STATUS
1443                                .with_label_values(&[relayer_id, &network_type, &new_status_str])
1444                                .inc();
1445                        }
1446
1447                        // Track metrics for final transaction states
1448                        // Only track when status changes from non-final to final state
1449                        let was_final = is_final_state(&original_tx.status);
1450                        let is_final = is_final_state(new_status);
1451
1452                        if !was_final && is_final {
1453                            match new_status {
1454                                TransactionStatus::Confirmed => {
1455                                    TRANSACTIONS_SUCCESS
1456                                        .with_label_values(&[relayer_id, &network_type])
1457                                        .inc();
1458
1459                                    // Track processing time: submission to confirmation
1460                                    if let (Some(sent_at_str), Some(confirmed_at_str)) =
1461                                        (&updated_tx.sent_at, &updated_tx.confirmed_at)
1462                                    {
1463                                        if let (Ok(sent_time), Ok(confirmed_time)) = (
1464                                            chrono::DateTime::parse_from_rfc3339(sent_at_str),
1465                                            chrono::DateTime::parse_from_rfc3339(confirmed_at_str),
1466                                        ) {
1467                                            let processing_seconds = (confirmed_time
1468                                                .with_timezone(&Utc)
1469                                                - sent_time.with_timezone(&Utc))
1470                                            .num_seconds()
1471                                                as f64;
1472                                            TRANSACTION_PROCESSING_TIME
1473                                                .with_label_values(&[
1474                                                    relayer_id,
1475                                                    &network_type,
1476                                                    "submission_to_confirmation",
1477                                                ])
1478                                                .observe(processing_seconds);
1479                                        }
1480                                    }
1481
1482                                    // Track processing time: creation to confirmation
1483                                    if let Ok(created_time) =
1484                                        chrono::DateTime::parse_from_rfc3339(&updated_tx.created_at)
1485                                    {
1486                                        if let Some(confirmed_at_str) = &updated_tx.confirmed_at {
1487                                            if let Ok(confirmed_time) =
1488                                                chrono::DateTime::parse_from_rfc3339(
1489                                                    confirmed_at_str,
1490                                                )
1491                                            {
1492                                                let processing_seconds = (confirmed_time
1493                                                    .with_timezone(&Utc)
1494                                                    - created_time.with_timezone(&Utc))
1495                                                .num_seconds()
1496                                                    as f64;
1497                                                TRANSACTION_PROCESSING_TIME
1498                                                    .with_label_values(&[
1499                                                        relayer_id,
1500                                                        &network_type,
1501                                                        "creation_to_confirmation",
1502                                                    ])
1503                                                    .observe(processing_seconds);
1504                                            }
1505                                        }
1506                                    }
1507                                }
1508                                TransactionStatus::Failed => {
1509                                    // Parse status_reason to determine failure type
1510                                    let failure_reason = updated_tx
1511                                        .status_reason
1512                                        .as_deref()
1513                                        .map(|reason| {
1514                                            if reason.starts_with("Submission failed:") {
1515                                                "submission_failed"
1516                                            } else if reason.starts_with("Preparation failed:") {
1517                                                "preparation_failed"
1518                                            } else {
1519                                                "failed"
1520                                            }
1521                                        })
1522                                        .unwrap_or("failed");
1523                                    TRANSACTIONS_FAILED
1524                                        .with_label_values(&[
1525                                            relayer_id,
1526                                            &network_type,
1527                                            failure_reason,
1528                                        ])
1529                                        .inc();
1530                                }
1531                                TransactionStatus::Expired => {
1532                                    TRANSACTIONS_FAILED
1533                                        .with_label_values(&[relayer_id, &network_type, "expired"])
1534                                        .inc();
1535                                }
1536                                TransactionStatus::Canceled => {
1537                                    TRANSACTIONS_FAILED
1538                                        .with_label_values(&[relayer_id, &network_type, "canceled"])
1539                                        .inc();
1540                                }
1541                                _ => {
1542                                    // Other final states (shouldn't happen, but handle gracefully)
1543                                }
1544                            }
1545                        }
1546                    }
1547                    return Ok(updated_tx);
1548                }
1549                Err(e) if attempt < MAX_RETRIES - 1 => {
1550                    warn!(tx_id = %tx_id, attempt = %attempt, error = %e, "failed to update indexes, retrying");
1551                    last_error = Some(e);
1552                    tokio::time::sleep(tokio::time::Duration::from_millis(BACKOFF_MS)).await;
1553                    continue;
1554                }
1555                Err(e) => return Err(e),
1556            }
1557        }
1558
1559        Err(last_error.unwrap_or_else(|| {
1560            RepositoryError::UnexpectedError("partial_update exhausted retries".to_string())
1561        }))
1562    }
1563
1564    async fn update_network_data(
1565        &self,
1566        tx_id: String,
1567        network_data: NetworkTransactionData,
1568    ) -> Result<TransactionRepoModel, RepositoryError> {
1569        let update = TransactionUpdateRequest {
1570            network_data: Some(network_data),
1571            ..Default::default()
1572        };
1573        self.partial_update(tx_id, update).await
1574    }
1575
1576    async fn set_sent_at(
1577        &self,
1578        tx_id: String,
1579        sent_at: String,
1580    ) -> Result<TransactionRepoModel, RepositoryError> {
1581        let update = TransactionUpdateRequest {
1582            sent_at: Some(sent_at),
1583            ..Default::default()
1584        };
1585        self.partial_update(tx_id, update).await
1586    }
1587
1588    async fn set_confirmed_at(
1589        &self,
1590        tx_id: String,
1591        confirmed_at: String,
1592    ) -> Result<TransactionRepoModel, RepositoryError> {
1593        let update = TransactionUpdateRequest {
1594            confirmed_at: Some(confirmed_at),
1595            ..Default::default()
1596        };
1597        self.partial_update(tx_id, update).await
1598    }
1599
1600    /// Count transactions by status using Redis ZCARD (O(1) per sorted set).
1601    /// Much more efficient than find_by_status when you only need the count.
1602    /// Triggers migration from legacy SETs if needed.
1603    async fn count_by_status(
1604        &self,
1605        relayer_id: &str,
1606        statuses: &[TransactionStatus],
1607    ) -> Result<u64, RepositoryError> {
1608        let mut conn = self
1609            .get_connection(self.connections.reader(), "count_by_status")
1610            .await?;
1611        let mut total_count: u64 = 0;
1612
1613        for status in statuses {
1614            // Ensure sorted set is migrated
1615            self.ensure_status_sorted_set(relayer_id, status).await?;
1616
1617            let sorted_key = self.relayer_status_sorted_key(relayer_id, status);
1618            let count: u64 = conn
1619                .zcard(&sorted_key)
1620                .await
1621                .map_err(|e| self.map_redis_error(e, "count_by_status"))?;
1622            total_count += count;
1623        }
1624
1625        debug!(relayer_id = %relayer_id, count = %total_count, "counted transactions by status");
1626        Ok(total_count)
1627    }
1628
1629    async fn delete_by_ids(&self, ids: Vec<String>) -> Result<BatchDeleteResult, RepositoryError> {
1630        if ids.is_empty() {
1631            debug!("no transaction IDs provided for batch delete");
1632            return Ok(BatchDeleteResult::default());
1633        }
1634
1635        debug!(count = %ids.len(), "batch deleting transactions by IDs (with fetch)");
1636
1637        // Fetch transactions to get their data for index cleanup
1638        let batch_result = self.get_transactions_by_ids(&ids).await?;
1639
1640        // Convert to delete requests
1641        let requests: Vec<TransactionDeleteRequest> = batch_result
1642            .results
1643            .iter()
1644            .map(|tx| TransactionDeleteRequest {
1645                id: tx.id.clone(),
1646                relayer_id: tx.relayer_id.clone(),
1647                nonce: self.extract_nonce(&tx.network_data),
1648            })
1649            .collect();
1650
1651        // Track IDs that weren't found
1652        let mut result = self.delete_by_requests(requests).await?;
1653
1654        // Add the IDs that weren't found during fetch
1655        for id in batch_result.failed_ids {
1656            result
1657                .failed
1658                .push((id.clone(), format!("Transaction with ID {id} not found")));
1659        }
1660
1661        Ok(result)
1662    }
1663
1664    async fn delete_by_requests(
1665        &self,
1666        requests: Vec<TransactionDeleteRequest>,
1667    ) -> Result<BatchDeleteResult, RepositoryError> {
1668        if requests.is_empty() {
1669            debug!("no delete requests provided for batch delete");
1670            return Ok(BatchDeleteResult::default());
1671        }
1672
1673        debug!(count = %requests.len(), "batch deleting transactions by requests (no fetch)");
1674        let mut conn = self
1675            .get_connection(self.connections.primary(), "batch_delete_no_fetch")
1676            .await?;
1677        let mut pipe = redis::pipe();
1678        pipe.atomic();
1679
1680        // All possible statuses for index cleanup
1681        let all_statuses = [
1682            TransactionStatus::Canceled,
1683            TransactionStatus::Pending,
1684            TransactionStatus::Sent,
1685            TransactionStatus::Submitted,
1686            TransactionStatus::Mined,
1687            TransactionStatus::Confirmed,
1688            TransactionStatus::Failed,
1689            TransactionStatus::Expired,
1690        ];
1691
1692        // Build pipeline for all deletions and index removals
1693        for req in &requests {
1694            // Delete transaction data
1695            let tx_key = self.tx_key(&req.relayer_id, &req.id);
1696            pipe.del(&tx_key);
1697
1698            // Delete reverse lookup
1699            let reverse_key = self.tx_to_relayer_key(&req.id);
1700            pipe.del(&reverse_key);
1701
1702            // Remove from all possible status indexes
1703            for status in &all_statuses {
1704                let status_sorted_key = self.relayer_status_sorted_key(&req.relayer_id, status);
1705                pipe.zrem(&status_sorted_key, &req.id);
1706
1707                let status_legacy_key = self.relayer_status_key(&req.relayer_id, status);
1708                pipe.srem(&status_legacy_key, &req.id);
1709            }
1710
1711            // Remove nonce index if exists
1712            if let Some(nonce) = req.nonce {
1713                let nonce_key = self.relayer_nonce_key(&req.relayer_id, nonce);
1714                pipe.del(&nonce_key);
1715            }
1716
1717            // Remove from per-relayer sorted set by created_at
1718            let relayer_sorted_key = self.relayer_tx_by_created_at_key(&req.relayer_id);
1719            pipe.zrem(&relayer_sorted_key, &req.id);
1720        }
1721
1722        // Execute the entire pipeline in one round-trip
1723        match pipe.exec_async(&mut conn).await {
1724            Ok(_) => {
1725                let deleted_count = requests.len();
1726                debug!(
1727                    deleted_count = %deleted_count,
1728                    "batch delete completed"
1729                );
1730                Ok(BatchDeleteResult {
1731                    deleted_count,
1732                    failed: vec![],
1733                })
1734            }
1735            Err(e) => {
1736                error!(error = %e, "batch delete pipeline failed");
1737                // Mark all requests as failed
1738                let failed: Vec<(String, String)> = requests
1739                    .iter()
1740                    .map(|req| (req.id.clone(), format!("Redis pipeline error: {e}")))
1741                    .collect();
1742                Ok(BatchDeleteResult {
1743                    deleted_count: 0,
1744                    failed,
1745                })
1746            }
1747        }
1748    }
1749}
1750
1751#[cfg(test)]
1752mod tests {
1753    use super::*;
1754    use crate::models::{evm::Speed, EvmTransactionData, NetworkType};
1755    use alloy::primitives::U256;
1756    use deadpool_redis::{Config, Runtime};
1757    use lazy_static::lazy_static;
1758    use std::str::FromStr;
1759    use tokio;
1760    use uuid::Uuid;
1761
1762    use tokio::sync::Mutex;
1763
1764    // Use a mutex to ensure tests don't run in parallel when modifying env vars
1765    lazy_static! {
1766        static ref ENV_MUTEX: Mutex<()> = Mutex::new(());
1767    }
1768
1769    // Helper function to create test transactions
1770    fn create_test_transaction(id: &str) -> TransactionRepoModel {
1771        TransactionRepoModel {
1772            id: id.to_string(),
1773            relayer_id: "relayer-1".to_string(),
1774            status: TransactionStatus::Pending,
1775            status_reason: None,
1776            created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
1777            sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
1778            confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
1779            valid_until: None,
1780            delete_at: None,
1781            network_type: NetworkType::Evm,
1782            priced_at: None,
1783            hashes: vec![],
1784            network_data: NetworkTransactionData::Evm(EvmTransactionData {
1785                gas_price: Some(1000000000),
1786                gas_limit: Some(21000),
1787                nonce: Some(1),
1788                value: U256::from_str("1000000000000000000").unwrap(),
1789                data: Some("0x".to_string()),
1790                from: "0xSender".to_string(),
1791                to: Some("0xRecipient".to_string()),
1792                chain_id: 1,
1793                signature: None,
1794                hash: Some(format!("0x{}", id)),
1795                speed: Some(Speed::Fast),
1796                max_fee_per_gas: None,
1797                max_priority_fee_per_gas: None,
1798                raw: None,
1799            }),
1800            noop_count: None,
1801            is_canceled: Some(false),
1802        }
1803    }
1804
1805    fn create_test_transaction_with_relayer(id: &str, relayer_id: &str) -> TransactionRepoModel {
1806        let mut tx = create_test_transaction(id);
1807        tx.relayer_id = relayer_id.to_string();
1808        tx
1809    }
1810
1811    fn create_test_transaction_with_status(
1812        id: &str,
1813        relayer_id: &str,
1814        status: TransactionStatus,
1815    ) -> TransactionRepoModel {
1816        let mut tx = create_test_transaction_with_relayer(id, relayer_id);
1817        tx.status = status;
1818        tx
1819    }
1820
1821    fn create_test_transaction_with_nonce(
1822        id: &str,
1823        nonce: u64,
1824        relayer_id: &str,
1825    ) -> TransactionRepoModel {
1826        let mut tx = create_test_transaction_with_relayer(id, relayer_id);
1827        if let NetworkTransactionData::Evm(ref mut evm_data) = tx.network_data {
1828            evm_data.nonce = Some(nonce);
1829        }
1830        tx
1831    }
1832
1833    async fn setup_test_repo() -> RedisTransactionRepository {
1834        // Use a mock Redis URL - in real integration tests, this would connect to a test Redis instance
1835        let redis_url = std::env::var("REDIS_TEST_URL")
1836            .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
1837
1838        let cfg = Config::from_url(&redis_url);
1839        let pool = Arc::new(
1840            cfg.builder()
1841                .expect("Failed to create pool builder")
1842                .max_size(16)
1843                .runtime(Runtime::Tokio1)
1844                .build()
1845                .expect("Failed to build Redis pool"),
1846        );
1847
1848        // Create RedisConnections with same pool for both primary and reader (for testing)
1849        let connections = Arc::new(RedisConnections::new_single_pool(pool));
1850
1851        let random_id = Uuid::new_v4().to_string();
1852        let key_prefix = format!("test_prefix:{}", random_id);
1853
1854        RedisTransactionRepository::new(connections, key_prefix)
1855            .expect("Failed to create RedisTransactionRepository")
1856    }
1857
1858    #[tokio::test]
1859    #[ignore = "Requires active Redis instance"]
1860    async fn test_new_repository_creation() {
1861        let repo = setup_test_repo().await;
1862        assert!(repo.key_prefix.contains("test_prefix"));
1863    }
1864
1865    #[tokio::test]
1866    #[ignore = "Requires active Redis instance"]
1867    async fn test_new_repository_empty_prefix_fails() {
1868        let redis_url = std::env::var("REDIS_TEST_URL")
1869            .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
1870        let cfg = Config::from_url(&redis_url);
1871        let pool = Arc::new(
1872            cfg.builder()
1873                .expect("Failed to create pool builder")
1874                .max_size(16)
1875                .runtime(Runtime::Tokio1)
1876                .build()
1877                .expect("Failed to build Redis pool"),
1878        );
1879        let connections = Arc::new(RedisConnections::new_single_pool(pool));
1880
1881        let result = RedisTransactionRepository::new(connections, "".to_string());
1882        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
1883    }
1884
1885    #[tokio::test]
1886    #[ignore = "Requires active Redis instance"]
1887    async fn test_key_generation() {
1888        let repo = setup_test_repo().await;
1889
1890        assert!(repo
1891            .tx_key("relayer-1", "test-id")
1892            .contains(":relayer:relayer-1:tx:test-id"));
1893        assert!(repo
1894            .tx_to_relayer_key("test-id")
1895            .contains(":relayer:tx_to_relayer:test-id"));
1896        assert!(repo.relayer_list_key().contains(":relayer_list"));
1897        assert!(repo
1898            .relayer_status_key("relayer-1", &TransactionStatus::Pending)
1899            .contains(":relayer:relayer-1:status:Pending"));
1900        assert!(repo
1901            .relayer_nonce_key("relayer-1", 42)
1902            .contains(":relayer:relayer-1:nonce:42"));
1903    }
1904
1905    #[tokio::test]
1906    #[ignore = "Requires active Redis instance"]
1907    async fn test_serialize_deserialize_transaction() {
1908        let repo = setup_test_repo().await;
1909        let tx = create_test_transaction("test-1");
1910
1911        let serialized = repo
1912            .serialize_entity(&tx, |t| &t.id, "transaction")
1913            .expect("Serialization should succeed");
1914        let deserialized: TransactionRepoModel = repo
1915            .deserialize_entity(&serialized, "test-1", "transaction")
1916            .expect("Deserialization should succeed");
1917
1918        assert_eq!(tx.id, deserialized.id);
1919        assert_eq!(tx.relayer_id, deserialized.relayer_id);
1920        assert_eq!(tx.status, deserialized.status);
1921    }
1922
1923    #[tokio::test]
1924    #[ignore = "Requires active Redis instance"]
1925    async fn test_extract_nonce() {
1926        let repo = setup_test_repo().await;
1927        let random_id = Uuid::new_v4().to_string();
1928        let relayer_id = Uuid::new_v4().to_string();
1929        let tx_with_nonce = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
1930
1931        let nonce = repo.extract_nonce(&tx_with_nonce.network_data);
1932        assert_eq!(nonce, Some(42));
1933    }
1934
1935    #[tokio::test]
1936    #[ignore = "Requires active Redis instance"]
1937    async fn test_create_transaction() {
1938        let repo = setup_test_repo().await;
1939        let random_id = Uuid::new_v4().to_string();
1940        let tx = create_test_transaction(&random_id);
1941
1942        let result = repo.create(tx.clone()).await.unwrap();
1943        assert_eq!(result.id, tx.id);
1944    }
1945
1946    #[tokio::test]
1947    #[ignore = "Requires active Redis instance"]
1948    async fn test_get_transaction() {
1949        let repo = setup_test_repo().await;
1950        let random_id = Uuid::new_v4().to_string();
1951        let tx = create_test_transaction(&random_id);
1952
1953        repo.create(tx.clone()).await.unwrap();
1954        let stored = repo.get_by_id(random_id.to_string()).await.unwrap();
1955        assert_eq!(stored.id, tx.id);
1956        assert_eq!(stored.relayer_id, tx.relayer_id);
1957    }
1958
1959    #[tokio::test]
1960    #[ignore = "Requires active Redis instance"]
1961    async fn test_update_transaction() {
1962        let repo = setup_test_repo().await;
1963        let random_id = Uuid::new_v4().to_string();
1964        let mut tx = create_test_transaction(&random_id);
1965
1966        repo.create(tx.clone()).await.unwrap();
1967        tx.status = TransactionStatus::Confirmed;
1968
1969        let updated = repo.update(random_id.to_string(), tx).await.unwrap();
1970        assert!(matches!(updated.status, TransactionStatus::Confirmed));
1971    }
1972
1973    #[tokio::test]
1974    #[ignore = "Requires active Redis instance"]
1975    async fn test_delete_transaction() {
1976        let repo = setup_test_repo().await;
1977        let random_id = Uuid::new_v4().to_string();
1978        let tx = create_test_transaction(&random_id);
1979
1980        repo.create(tx).await.unwrap();
1981        repo.delete_by_id(random_id.to_string()).await.unwrap();
1982
1983        let result = repo.get_by_id(random_id.to_string()).await;
1984        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
1985    }
1986
1987    #[tokio::test]
1988    #[ignore = "Requires active Redis instance"]
1989    async fn test_list_all_transactions() {
1990        let repo = setup_test_repo().await;
1991        let random_id = Uuid::new_v4().to_string();
1992        let random_id2 = Uuid::new_v4().to_string();
1993
1994        let tx1 = create_test_transaction(&random_id);
1995        let tx2 = create_test_transaction(&random_id2);
1996
1997        repo.create(tx1).await.unwrap();
1998        repo.create(tx2).await.unwrap();
1999
2000        let transactions = repo.list_all().await.unwrap();
2001        assert!(transactions.len() >= 2);
2002    }
2003
2004    #[tokio::test]
2005    #[ignore = "Requires active Redis instance"]
2006    async fn test_count_transactions() {
2007        let repo = setup_test_repo().await;
2008        let random_id = Uuid::new_v4().to_string();
2009        let tx = create_test_transaction(&random_id);
2010
2011        let count = repo.count().await.unwrap();
2012        repo.create(tx).await.unwrap();
2013        assert!(repo.count().await.unwrap() > count);
2014    }
2015
2016    #[tokio::test]
2017    #[ignore = "Requires active Redis instance"]
2018    async fn test_get_nonexistent_transaction() {
2019        let repo = setup_test_repo().await;
2020        let result = repo.get_by_id("nonexistent".to_string()).await;
2021        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
2022    }
2023
2024    #[tokio::test]
2025    #[ignore = "Requires active Redis instance"]
2026    async fn test_duplicate_transaction_creation() {
2027        let repo = setup_test_repo().await;
2028        let random_id = Uuid::new_v4().to_string();
2029
2030        let tx = create_test_transaction(&random_id);
2031
2032        repo.create(tx.clone()).await.unwrap();
2033        let result = repo.create(tx).await;
2034
2035        assert!(matches!(
2036            result,
2037            Err(RepositoryError::ConstraintViolation(_))
2038        ));
2039    }
2040
2041    #[tokio::test]
2042    #[ignore = "Requires active Redis instance"]
2043    async fn test_update_nonexistent_transaction() {
2044        let repo = setup_test_repo().await;
2045        let tx = create_test_transaction("test-1");
2046
2047        let result = repo.update("nonexistent".to_string(), tx).await;
2048        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
2049    }
2050
2051    #[tokio::test]
2052    #[ignore = "Requires active Redis instance"]
2053    async fn test_list_paginated() {
2054        let repo = setup_test_repo().await;
2055
2056        // Create multiple transactions
2057        for _ in 1..=10 {
2058            let random_id = Uuid::new_v4().to_string();
2059            let tx = create_test_transaction(&random_id);
2060            repo.create(tx).await.unwrap();
2061        }
2062
2063        // Test first page with 3 items per page
2064        let query = PaginationQuery {
2065            page: 1,
2066            per_page: 3,
2067        };
2068        let result = repo.list_paginated(query).await.unwrap();
2069        assert_eq!(result.items.len(), 3);
2070        assert!(result.total >= 10);
2071        assert_eq!(result.page, 1);
2072        assert_eq!(result.per_page, 3);
2073
2074        // Test empty page (beyond total items)
2075        let query = PaginationQuery {
2076            page: 1000,
2077            per_page: 3,
2078        };
2079        let result = repo.list_paginated(query).await.unwrap();
2080        assert_eq!(result.items.len(), 0);
2081    }
2082
2083    #[tokio::test]
2084    #[ignore = "Requires active Redis instance"]
2085    async fn test_find_by_relayer_id() {
2086        let repo = setup_test_repo().await;
2087        let random_id = Uuid::new_v4().to_string();
2088        let random_id2 = Uuid::new_v4().to_string();
2089        let random_id3 = Uuid::new_v4().to_string();
2090
2091        let tx1 = create_test_transaction_with_relayer(&random_id, "relayer-1");
2092        let tx2 = create_test_transaction_with_relayer(&random_id2, "relayer-1");
2093        let tx3 = create_test_transaction_with_relayer(&random_id3, "relayer-2");
2094
2095        repo.create(tx1).await.unwrap();
2096        repo.create(tx2).await.unwrap();
2097        repo.create(tx3).await.unwrap();
2098
2099        // Test finding transactions for relayer-1
2100        let query = PaginationQuery {
2101            page: 1,
2102            per_page: 10,
2103        };
2104        let result = repo
2105            .find_by_relayer_id("relayer-1", query.clone())
2106            .await
2107            .unwrap();
2108        assert!(result.total >= 2);
2109        assert!(result.items.len() >= 2);
2110        assert!(result.items.iter().all(|tx| tx.relayer_id == "relayer-1"));
2111
2112        // Test finding transactions for relayer-2
2113        let result = repo
2114            .find_by_relayer_id("relayer-2", query.clone())
2115            .await
2116            .unwrap();
2117        assert!(result.total >= 1);
2118        assert!(!result.items.is_empty());
2119        assert!(result.items.iter().all(|tx| tx.relayer_id == "relayer-2"));
2120
2121        // Test finding transactions for non-existent relayer
2122        let result = repo
2123            .find_by_relayer_id("non-existent", query.clone())
2124            .await
2125            .unwrap();
2126        assert_eq!(result.total, 0);
2127        assert_eq!(result.items.len(), 0);
2128    }
2129
2130    #[tokio::test]
2131    #[ignore = "Requires active Redis instance"]
2132    async fn test_find_by_relayer_id_sorted_by_created_at_newest_first() {
2133        let repo = setup_test_repo().await;
2134        let relayer_id = Uuid::new_v4().to_string();
2135
2136        // Create transactions with different created_at timestamps
2137        let mut tx1 = create_test_transaction_with_relayer("test-1", &relayer_id);
2138        tx1.created_at = "2025-01-27T10:00:00.000000+00:00".to_string(); // Oldest
2139
2140        let mut tx2 = create_test_transaction_with_relayer("test-2", &relayer_id);
2141        tx2.created_at = "2025-01-27T12:00:00.000000+00:00".to_string(); // Middle
2142
2143        let mut tx3 = create_test_transaction_with_relayer("test-3", &relayer_id);
2144        tx3.created_at = "2025-01-27T14:00:00.000000+00:00".to_string(); // Newest
2145
2146        // Create transactions in non-chronological order to ensure sorting works
2147        repo.create(tx2.clone()).await.unwrap(); // Middle first
2148        repo.create(tx1.clone()).await.unwrap(); // Oldest second
2149        repo.create(tx3.clone()).await.unwrap(); // Newest last
2150
2151        let query = PaginationQuery {
2152            page: 1,
2153            per_page: 10,
2154        };
2155        let result = repo.find_by_relayer_id(&relayer_id, query).await.unwrap();
2156
2157        assert_eq!(result.total, 3);
2158        assert_eq!(result.items.len(), 3);
2159
2160        // Verify transactions are sorted by created_at descending (newest first)
2161        assert_eq!(
2162            result.items[0].id, "test-3",
2163            "First item should be newest (test-3)"
2164        );
2165        assert_eq!(
2166            result.items[0].created_at,
2167            "2025-01-27T14:00:00.000000+00:00"
2168        );
2169
2170        assert_eq!(
2171            result.items[1].id, "test-2",
2172            "Second item should be middle (test-2)"
2173        );
2174        assert_eq!(
2175            result.items[1].created_at,
2176            "2025-01-27T12:00:00.000000+00:00"
2177        );
2178
2179        assert_eq!(
2180            result.items[2].id, "test-1",
2181            "Third item should be oldest (test-1)"
2182        );
2183        assert_eq!(
2184            result.items[2].created_at,
2185            "2025-01-27T10:00:00.000000+00:00"
2186        );
2187    }
2188
2189    #[tokio::test]
2190    #[ignore = "Requires active Redis instance"]
2191    async fn test_find_by_relayer_id_migration_from_old_index() {
2192        let repo = setup_test_repo().await;
2193        let relayer_id = Uuid::new_v4().to_string();
2194
2195        // Create transactions with different created_at timestamps
2196        let mut tx1 = create_test_transaction_with_relayer("migrate-test-1", &relayer_id);
2197        tx1.created_at = "2025-01-27T10:00:00.000000+00:00".to_string(); // Oldest
2198
2199        let mut tx2 = create_test_transaction_with_relayer("migrate-test-2", &relayer_id);
2200        tx2.created_at = "2025-01-27T12:00:00.000000+00:00".to_string(); // Middle
2201
2202        let mut tx3 = create_test_transaction_with_relayer("migrate-test-3", &relayer_id);
2203        tx3.created_at = "2025-01-27T14:00:00.000000+00:00".to_string(); // Newest
2204
2205        // Create transactions directly in Redis WITHOUT adding to sorted set
2206        // This simulates old transactions created before the sorted set index existed
2207        let mut conn = repo.connections.primary().get().await.unwrap();
2208        let relayer_list_key = repo.relayer_list_key();
2209        let _: () = conn.sadd(&relayer_list_key, &relayer_id).await.unwrap();
2210
2211        for tx in &[&tx1, &tx2, &tx3] {
2212            let key = repo.tx_key(&tx.relayer_id, &tx.id);
2213            let reverse_key = repo.tx_to_relayer_key(&tx.id);
2214            let value = repo.serialize_entity(tx, |t| &t.id, "transaction").unwrap();
2215
2216            let mut pipe = redis::pipe();
2217            pipe.atomic();
2218            pipe.set(&key, &value);
2219            pipe.set(&reverse_key, &tx.relayer_id);
2220
2221            // Add to status index (but NOT to sorted set)
2222            let status_key = repo.relayer_status_key(&tx.relayer_id, &tx.status);
2223            pipe.sadd(&status_key, &tx.id);
2224
2225            pipe.exec_async(&mut conn).await.unwrap();
2226        }
2227
2228        // Verify sorted set is empty (transactions were created without sorted set index)
2229        let relayer_sorted_key = repo.relayer_tx_by_created_at_key(&relayer_id);
2230        let count: u64 = conn.zcard(&relayer_sorted_key).await.unwrap();
2231        assert_eq!(count, 0, "Sorted set should be empty for old transactions");
2232
2233        // Call find_by_relayer_id - this should trigger migration
2234        let query = PaginationQuery {
2235            page: 1,
2236            per_page: 10,
2237        };
2238        let result = repo
2239            .find_by_relayer_id(&relayer_id, query.clone())
2240            .await
2241            .unwrap();
2242
2243        // Verify migration happened - sorted set should now have entries
2244        let count_after: u64 = conn.zcard(&relayer_sorted_key).await.unwrap();
2245        assert_eq!(
2246            count_after, 3,
2247            "Sorted set should be populated after migration"
2248        );
2249
2250        // Verify results are correct and sorted (newest first)
2251        assert_eq!(result.total, 3);
2252        assert_eq!(result.items.len(), 3);
2253
2254        assert_eq!(
2255            result.items[0].id, "migrate-test-3",
2256            "First item should be newest after migration"
2257        );
2258        assert_eq!(
2259            result.items[0].created_at,
2260            "2025-01-27T14:00:00.000000+00:00"
2261        );
2262
2263        assert_eq!(
2264            result.items[1].id, "migrate-test-2",
2265            "Second item should be middle after migration"
2266        );
2267        assert_eq!(
2268            result.items[1].created_at,
2269            "2025-01-27T12:00:00.000000+00:00"
2270        );
2271
2272        assert_eq!(
2273            result.items[2].id, "migrate-test-1",
2274            "Third item should be oldest after migration"
2275        );
2276        assert_eq!(
2277            result.items[2].created_at,
2278            "2025-01-27T10:00:00.000000+00:00"
2279        );
2280
2281        // Verify second call uses sorted set (no migration needed)
2282        let result2 = repo.find_by_relayer_id(&relayer_id, query).await.unwrap();
2283        assert_eq!(result2.total, 3);
2284        assert_eq!(result2.items.len(), 3);
2285        // Results should be identical since sorted set is now populated
2286        assert_eq!(result.items[0].id, result2.items[0].id);
2287    }
2288
2289    #[tokio::test]
2290    #[ignore = "Requires active Redis instance"]
2291    async fn test_find_by_status() {
2292        let repo = setup_test_repo().await;
2293        let random_id = Uuid::new_v4().to_string();
2294        let random_id2 = Uuid::new_v4().to_string();
2295        let random_id3 = Uuid::new_v4().to_string();
2296        let relayer_id = Uuid::new_v4().to_string();
2297        let tx1 = create_test_transaction_with_status(
2298            &random_id,
2299            &relayer_id,
2300            TransactionStatus::Pending,
2301        );
2302        let tx2 =
2303            create_test_transaction_with_status(&random_id2, &relayer_id, TransactionStatus::Sent);
2304        let tx3 = create_test_transaction_with_status(
2305            &random_id3,
2306            &relayer_id,
2307            TransactionStatus::Confirmed,
2308        );
2309
2310        repo.create(tx1).await.unwrap();
2311        repo.create(tx2).await.unwrap();
2312        repo.create(tx3).await.unwrap();
2313
2314        // Test finding pending transactions
2315        let result = repo
2316            .find_by_status(&relayer_id, &[TransactionStatus::Pending])
2317            .await
2318            .unwrap();
2319        assert_eq!(result.len(), 1);
2320        assert_eq!(result[0].status, TransactionStatus::Pending);
2321
2322        // Test finding multiple statuses
2323        let result = repo
2324            .find_by_status(
2325                &relayer_id,
2326                &[TransactionStatus::Pending, TransactionStatus::Sent],
2327            )
2328            .await
2329            .unwrap();
2330        assert_eq!(result.len(), 2);
2331
2332        // Test finding non-existent status
2333        let result = repo
2334            .find_by_status(&relayer_id, &[TransactionStatus::Failed])
2335            .await
2336            .unwrap();
2337        assert_eq!(result.len(), 0);
2338    }
2339
2340    #[tokio::test]
2341    #[ignore = "Requires active Redis instance"]
2342    async fn test_find_by_status_paginated() {
2343        let repo = setup_test_repo().await;
2344        let relayer_id = Uuid::new_v4().to_string();
2345
2346        // Create 5 pending transactions with different timestamps
2347        for i in 1..=5 {
2348            let tx_id = Uuid::new_v4().to_string();
2349            let mut tx = create_test_transaction_with_status(
2350                &tx_id,
2351                &relayer_id,
2352                TransactionStatus::Pending,
2353            );
2354            tx.created_at = format!("2025-01-27T{:02}:00:00.000000+00:00", 10 + i);
2355            repo.create(tx).await.unwrap();
2356        }
2357
2358        // Create 2 confirmed transactions
2359        for i in 6..=7 {
2360            let tx_id = Uuid::new_v4().to_string();
2361            let mut tx = create_test_transaction_with_status(
2362                &tx_id,
2363                &relayer_id,
2364                TransactionStatus::Confirmed,
2365            );
2366            tx.created_at = format!("2025-01-27T{:02}:00:00.000000+00:00", 10 + i);
2367            repo.create(tx).await.unwrap();
2368        }
2369
2370        // Test first page (2 items per page)
2371        let query = PaginationQuery {
2372            page: 1,
2373            per_page: 2,
2374        };
2375        let result = repo
2376            .find_by_status_paginated(&relayer_id, &[TransactionStatus::Pending], query, false)
2377            .await
2378            .unwrap();
2379
2380        assert_eq!(result.total, 5);
2381        assert_eq!(result.items.len(), 2);
2382        assert_eq!(result.page, 1);
2383        assert_eq!(result.per_page, 2);
2384
2385        // Test second page
2386        let query = PaginationQuery {
2387            page: 2,
2388            per_page: 2,
2389        };
2390        let result = repo
2391            .find_by_status_paginated(&relayer_id, &[TransactionStatus::Pending], query, false)
2392            .await
2393            .unwrap();
2394
2395        assert_eq!(result.total, 5);
2396        assert_eq!(result.items.len(), 2);
2397        assert_eq!(result.page, 2);
2398
2399        // Test last page (partial)
2400        let query = PaginationQuery {
2401            page: 3,
2402            per_page: 2,
2403        };
2404        let result = repo
2405            .find_by_status_paginated(&relayer_id, &[TransactionStatus::Pending], query, false)
2406            .await
2407            .unwrap();
2408
2409        assert_eq!(result.total, 5);
2410        assert_eq!(result.items.len(), 1);
2411
2412        // Test multiple statuses
2413        let query = PaginationQuery {
2414            page: 1,
2415            per_page: 10,
2416        };
2417        let result = repo
2418            .find_by_status_paginated(
2419                &relayer_id,
2420                &[TransactionStatus::Pending, TransactionStatus::Confirmed],
2421                query,
2422                false,
2423            )
2424            .await
2425            .unwrap();
2426
2427        assert_eq!(result.total, 7);
2428        assert_eq!(result.items.len(), 7);
2429
2430        // Test empty result
2431        let query = PaginationQuery {
2432            page: 1,
2433            per_page: 10,
2434        };
2435        let result = repo
2436            .find_by_status_paginated(&relayer_id, &[TransactionStatus::Failed], query, false)
2437            .await
2438            .unwrap();
2439
2440        assert_eq!(result.total, 0);
2441        assert_eq!(result.items.len(), 0);
2442    }
2443
2444    #[tokio::test]
2445    #[ignore = "Requires active Redis instance"]
2446    async fn test_find_by_status_paginated_oldest_first() {
2447        let repo = setup_test_repo().await;
2448        let relayer_id = Uuid::new_v4().to_string();
2449
2450        // Create 5 pending transactions with ascending timestamps
2451        for i in 1..=5 {
2452            let tx_id = format!("tx{}-{}", i, Uuid::new_v4());
2453            let mut tx = create_test_transaction(&tx_id);
2454            tx.relayer_id = relayer_id.clone();
2455            tx.status = TransactionStatus::Pending;
2456            tx.created_at = format!("2025-01-27T{:02}:00:00.000000+00:00", 10 + i);
2457            repo.create(tx).await.unwrap();
2458        }
2459
2460        // Test oldest_first: true - should return oldest transactions first
2461        let query = PaginationQuery {
2462            page: 1,
2463            per_page: 3,
2464        };
2465        let result = repo
2466            .find_by_status_paginated(
2467                &relayer_id,
2468                &[TransactionStatus::Pending],
2469                query.clone(),
2470                true,
2471            )
2472            .await
2473            .unwrap();
2474
2475        assert_eq!(result.total, 5);
2476        assert_eq!(result.items.len(), 3);
2477        // Verify ordering: oldest first (11:00, 12:00, 13:00)
2478        assert!(
2479            result.items[0].created_at < result.items[1].created_at,
2480            "First item should be older than second"
2481        );
2482        assert!(
2483            result.items[1].created_at < result.items[2].created_at,
2484            "Second item should be older than third"
2485        );
2486
2487        // Contrast with oldest_first: false - should return newest first
2488        let result_newest = repo
2489            .find_by_status_paginated(&relayer_id, &[TransactionStatus::Pending], query, false)
2490            .await
2491            .unwrap();
2492
2493        assert_eq!(result_newest.items.len(), 3);
2494        // Verify ordering: newest first (15:00, 14:00, 13:00)
2495        assert!(
2496            result_newest.items[0].created_at > result_newest.items[1].created_at,
2497            "First item should be newer than second"
2498        );
2499        assert!(
2500            result_newest.items[1].created_at > result_newest.items[2].created_at,
2501            "Second item should be newer than third"
2502        );
2503    }
2504
2505    #[tokio::test]
2506    #[ignore = "Requires active Redis instance"]
2507    async fn test_find_by_status_paginated_oldest_first_single_item() {
2508        let repo = setup_test_repo().await;
2509        let relayer_id = Uuid::new_v4().to_string();
2510
2511        // Create transactions with specific timestamps
2512        let timestamps = [
2513            "2025-01-27T08:00:00.000000+00:00", // oldest
2514            "2025-01-27T10:00:00.000000+00:00", // middle
2515            "2025-01-27T12:00:00.000000+00:00", // newest
2516        ];
2517
2518        let mut oldest_id = String::new();
2519        let mut newest_id = String::new();
2520
2521        for (i, timestamp) in timestamps.iter().enumerate() {
2522            let tx_id = format!("tx-{}-{}", i, Uuid::new_v4());
2523            if i == 0 {
2524                oldest_id = tx_id.clone();
2525            }
2526            if i == 2 {
2527                newest_id = tx_id.clone();
2528            }
2529            let mut tx = create_test_transaction(&tx_id);
2530            tx.relayer_id = relayer_id.clone();
2531            tx.status = TransactionStatus::Pending;
2532            tx.created_at = timestamp.to_string();
2533            repo.create(tx).await.unwrap();
2534        }
2535
2536        // Request just 1 item with oldest_first: true
2537        let query = PaginationQuery {
2538            page: 1,
2539            per_page: 1,
2540        };
2541        let result = repo
2542            .find_by_status_paginated(
2543                &relayer_id,
2544                &[TransactionStatus::Pending],
2545                query.clone(),
2546                true,
2547            )
2548            .await
2549            .unwrap();
2550
2551        assert_eq!(result.total, 3);
2552        assert_eq!(result.items.len(), 1);
2553        assert_eq!(
2554            result.items[0].id, oldest_id,
2555            "With oldest_first=true and per_page=1, should return the oldest transaction"
2556        );
2557
2558        // Contrast with oldest_first: false
2559        let result = repo
2560            .find_by_status_paginated(&relayer_id, &[TransactionStatus::Pending], query, false)
2561            .await
2562            .unwrap();
2563
2564        assert_eq!(result.items.len(), 1);
2565        assert_eq!(
2566            result.items[0].id, newest_id,
2567            "With oldest_first=false and per_page=1, should return the newest transaction"
2568        );
2569    }
2570
2571    #[tokio::test]
2572    #[ignore = "Requires active Redis instance"]
2573    async fn test_find_by_nonce() {
2574        let repo = setup_test_repo().await;
2575        let random_id = Uuid::new_v4().to_string();
2576        let random_id2 = Uuid::new_v4().to_string();
2577        let relayer_id = Uuid::new_v4().to_string();
2578
2579        let tx1 = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
2580        let tx2 = create_test_transaction_with_nonce(&random_id2, 43, &relayer_id);
2581
2582        repo.create(tx1.clone()).await.unwrap();
2583        repo.create(tx2).await.unwrap();
2584
2585        // Test finding existing nonce
2586        let result = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
2587        assert!(result.is_some());
2588        assert_eq!(result.unwrap().id, random_id);
2589
2590        // Test finding non-existent nonce
2591        let result = repo.find_by_nonce(&relayer_id, 99).await.unwrap();
2592        assert!(result.is_none());
2593
2594        // Test finding nonce for non-existent relayer
2595        let result = repo.find_by_nonce("non-existent", 42).await.unwrap();
2596        assert!(result.is_none());
2597    }
2598
2599    #[tokio::test]
2600    #[ignore = "Requires active Redis instance"]
2601    async fn test_update_status() {
2602        let repo = setup_test_repo().await;
2603        let random_id = Uuid::new_v4().to_string();
2604        let tx = create_test_transaction(&random_id);
2605
2606        repo.create(tx).await.unwrap();
2607        let updated = repo
2608            .update_status(random_id.to_string(), TransactionStatus::Confirmed)
2609            .await
2610            .unwrap();
2611        assert_eq!(updated.status, TransactionStatus::Confirmed);
2612    }
2613
2614    #[tokio::test]
2615    #[ignore = "Requires active Redis instance"]
2616    async fn test_partial_update() {
2617        let repo = setup_test_repo().await;
2618        let random_id = Uuid::new_v4().to_string();
2619        let tx = create_test_transaction(&random_id);
2620
2621        repo.create(tx).await.unwrap();
2622
2623        let update = TransactionUpdateRequest {
2624            status: Some(TransactionStatus::Sent),
2625            status_reason: Some("Transaction sent".to_string()),
2626            sent_at: Some("2025-01-27T16:00:00.000000+00:00".to_string()),
2627            confirmed_at: None,
2628            network_data: None,
2629            hashes: None,
2630            is_canceled: None,
2631            priced_at: None,
2632            noop_count: None,
2633            delete_at: None,
2634        };
2635
2636        let updated = repo
2637            .partial_update(random_id.to_string(), update)
2638            .await
2639            .unwrap();
2640        assert_eq!(updated.status, TransactionStatus::Sent);
2641        assert_eq!(updated.status_reason, Some("Transaction sent".to_string()));
2642        assert_eq!(
2643            updated.sent_at,
2644            Some("2025-01-27T16:00:00.000000+00:00".to_string())
2645        );
2646    }
2647
2648    #[tokio::test]
2649    #[ignore = "Requires active Redis instance"]
2650    async fn test_set_sent_at() {
2651        let repo = setup_test_repo().await;
2652        let random_id = Uuid::new_v4().to_string();
2653        let tx = create_test_transaction(&random_id);
2654
2655        repo.create(tx).await.unwrap();
2656        let updated = repo
2657            .set_sent_at(
2658                random_id.to_string(),
2659                "2025-01-27T16:00:00.000000+00:00".to_string(),
2660            )
2661            .await
2662            .unwrap();
2663        assert_eq!(
2664            updated.sent_at,
2665            Some("2025-01-27T16:00:00.000000+00:00".to_string())
2666        );
2667    }
2668
2669    #[tokio::test]
2670    #[ignore = "Requires active Redis instance"]
2671    async fn test_set_confirmed_at() {
2672        let repo = setup_test_repo().await;
2673        let random_id = Uuid::new_v4().to_string();
2674        let tx = create_test_transaction(&random_id);
2675
2676        repo.create(tx).await.unwrap();
2677        let updated = repo
2678            .set_confirmed_at(
2679                random_id.to_string(),
2680                "2025-01-27T16:00:00.000000+00:00".to_string(),
2681            )
2682            .await
2683            .unwrap();
2684        assert_eq!(
2685            updated.confirmed_at,
2686            Some("2025-01-27T16:00:00.000000+00:00".to_string())
2687        );
2688    }
2689
2690    #[tokio::test]
2691    #[ignore = "Requires active Redis instance"]
2692    async fn test_update_network_data() {
2693        let repo = setup_test_repo().await;
2694        let random_id = Uuid::new_v4().to_string();
2695        let tx = create_test_transaction(&random_id);
2696
2697        repo.create(tx).await.unwrap();
2698
2699        let new_network_data = NetworkTransactionData::Evm(EvmTransactionData {
2700            gas_price: Some(2000000000),
2701            gas_limit: Some(42000),
2702            nonce: Some(2),
2703            value: U256::from_str("2000000000000000000").unwrap(),
2704            data: Some("0x1234".to_string()),
2705            from: "0xNewSender".to_string(),
2706            to: Some("0xNewRecipient".to_string()),
2707            chain_id: 1,
2708            signature: None,
2709            hash: Some("0xnewhash".to_string()),
2710            speed: Some(Speed::SafeLow),
2711            max_fee_per_gas: None,
2712            max_priority_fee_per_gas: None,
2713            raw: None,
2714        });
2715
2716        let updated = repo
2717            .update_network_data(random_id.to_string(), new_network_data.clone())
2718            .await
2719            .unwrap();
2720        assert_eq!(
2721            updated
2722                .network_data
2723                .get_evm_transaction_data()
2724                .unwrap()
2725                .hash,
2726            new_network_data.get_evm_transaction_data().unwrap().hash
2727        );
2728    }
2729
2730    #[tokio::test]
2731    #[ignore = "Requires active Redis instance"]
2732    async fn test_debug_implementation() {
2733        let repo = setup_test_repo().await;
2734        let debug_str = format!("{:?}", repo);
2735        assert!(debug_str.contains("RedisTransactionRepository"));
2736        assert!(debug_str.contains("test_prefix"));
2737    }
2738
2739    #[tokio::test]
2740    #[ignore = "Requires active Redis instance"]
2741    async fn test_error_handling_empty_id() {
2742        let repo = setup_test_repo().await;
2743
2744        let result = repo.get_by_id("".to_string()).await;
2745        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
2746
2747        let result = repo
2748            .update("".to_string(), create_test_transaction("test"))
2749            .await;
2750        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
2751
2752        let result = repo.delete_by_id("".to_string()).await;
2753        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
2754    }
2755
2756    #[tokio::test]
2757    #[ignore = "Requires active Redis instance"]
2758    async fn test_pagination_validation() {
2759        let repo = setup_test_repo().await;
2760
2761        let query = PaginationQuery {
2762            page: 1,
2763            per_page: 0,
2764        };
2765        let result = repo.list_paginated(query).await;
2766        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
2767    }
2768
2769    #[tokio::test]
2770    #[ignore = "Requires active Redis instance"]
2771    async fn test_index_consistency() {
2772        let repo = setup_test_repo().await;
2773        let random_id = Uuid::new_v4().to_string();
2774        let relayer_id = Uuid::new_v4().to_string();
2775        let tx = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
2776
2777        // Create transaction
2778        repo.create(tx.clone()).await.unwrap();
2779
2780        // Verify it can be found by nonce
2781        let found = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
2782        assert!(found.is_some());
2783
2784        // Update the transaction with a new nonce
2785        let mut updated_tx = tx.clone();
2786        if let NetworkTransactionData::Evm(ref mut evm_data) = updated_tx.network_data {
2787            evm_data.nonce = Some(43);
2788        }
2789
2790        repo.update(random_id.to_string(), updated_tx)
2791            .await
2792            .unwrap();
2793
2794        // Verify old nonce index is cleaned up
2795        let old_nonce_result = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
2796        assert!(old_nonce_result.is_none());
2797
2798        // Verify new nonce index works
2799        let new_nonce_result = repo.find_by_nonce(&relayer_id, 43).await.unwrap();
2800        assert!(new_nonce_result.is_some());
2801    }
2802
2803    #[tokio::test]
2804    #[ignore = "Requires active Redis instance"]
2805    async fn test_has_entries() {
2806        let repo = setup_test_repo().await;
2807        assert!(!repo.has_entries().await.unwrap());
2808
2809        let tx_id = uuid::Uuid::new_v4().to_string();
2810        let tx = create_test_transaction(&tx_id);
2811        repo.create(tx.clone()).await.unwrap();
2812
2813        assert!(repo.has_entries().await.unwrap());
2814    }
2815
2816    #[tokio::test]
2817    #[ignore = "Requires active Redis instance"]
2818    async fn test_drop_all_entries() {
2819        let repo = setup_test_repo().await;
2820        let tx_id = uuid::Uuid::new_v4().to_string();
2821        let tx = create_test_transaction(&tx_id);
2822        repo.create(tx.clone()).await.unwrap();
2823        assert!(repo.has_entries().await.unwrap());
2824
2825        repo.drop_all_entries().await.unwrap();
2826        assert!(!repo.has_entries().await.unwrap());
2827    }
2828
2829    // Tests for delete_at field setting on final status updates
2830    #[tokio::test]
2831    #[ignore = "Requires active Redis instance"]
2832    async fn test_update_status_sets_delete_at_for_final_statuses() {
2833        let _lock = ENV_MUTEX.lock().await;
2834
2835        use chrono::{DateTime, Duration, Utc};
2836        use std::env;
2837
2838        // Use a unique test environment variable to avoid conflicts
2839        env::set_var("TRANSACTION_EXPIRATION_HOURS", "6");
2840
2841        let repo = setup_test_repo().await;
2842
2843        let final_statuses = [
2844            TransactionStatus::Canceled,
2845            TransactionStatus::Confirmed,
2846            TransactionStatus::Failed,
2847            TransactionStatus::Expired,
2848        ];
2849
2850        for (i, status) in final_statuses.iter().enumerate() {
2851            let tx_id = format!("test-final-{}-{}", i, Uuid::new_v4());
2852            let mut tx = create_test_transaction(&tx_id);
2853
2854            // Ensure transaction has no delete_at initially and is in pending state
2855            tx.delete_at = None;
2856            tx.status = TransactionStatus::Pending;
2857
2858            repo.create(tx).await.unwrap();
2859
2860            let before_update = Utc::now();
2861
2862            // Update to final status
2863            let updated = repo
2864                .update_status(tx_id.clone(), status.clone())
2865                .await
2866                .unwrap();
2867
2868            // Should have delete_at set
2869            assert!(
2870                updated.delete_at.is_some(),
2871                "delete_at should be set for status: {:?}",
2872                status
2873            );
2874
2875            // Verify the timestamp is reasonable (approximately 6 hours from now)
2876            let delete_at_str = updated.delete_at.unwrap();
2877            let delete_at = DateTime::parse_from_rfc3339(&delete_at_str)
2878                .expect("delete_at should be valid RFC3339")
2879                .with_timezone(&Utc);
2880
2881            let duration_from_before = delete_at.signed_duration_since(before_update);
2882            let expected_duration = Duration::hours(6);
2883            let tolerance = Duration::minutes(5);
2884
2885            assert!(
2886                duration_from_before >= expected_duration - tolerance
2887                    && duration_from_before <= expected_duration + tolerance,
2888                "delete_at should be approximately 6 hours from now for status: {:?}. Duration: {:?}",
2889                status,
2890                duration_from_before
2891            );
2892        }
2893
2894        // Cleanup
2895        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
2896    }
2897
2898    #[tokio::test]
2899    #[ignore = "Requires active Redis instance"]
2900    async fn test_update_status_does_not_set_delete_at_for_non_final_statuses() {
2901        let _lock = ENV_MUTEX.lock().await;
2902
2903        use std::env;
2904
2905        env::set_var("TRANSACTION_EXPIRATION_HOURS", "4");
2906
2907        let repo = setup_test_repo().await;
2908
2909        let non_final_statuses = [
2910            TransactionStatus::Pending,
2911            TransactionStatus::Sent,
2912            TransactionStatus::Submitted,
2913            TransactionStatus::Mined,
2914        ];
2915
2916        for (i, status) in non_final_statuses.iter().enumerate() {
2917            let tx_id = format!("test-non-final-{}-{}", i, Uuid::new_v4());
2918            let mut tx = create_test_transaction(&tx_id);
2919            tx.delete_at = None;
2920            tx.status = TransactionStatus::Pending;
2921
2922            repo.create(tx).await.unwrap();
2923
2924            // Update to non-final status
2925            let updated = repo
2926                .update_status(tx_id.clone(), status.clone())
2927                .await
2928                .unwrap();
2929
2930            // Should NOT have delete_at set
2931            assert!(
2932                updated.delete_at.is_none(),
2933                "delete_at should NOT be set for status: {:?}",
2934                status
2935            );
2936        }
2937
2938        // Cleanup
2939        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
2940    }
2941
2942    #[tokio::test]
2943    #[ignore = "Requires active Redis instance"]
2944    async fn test_partial_update_sets_delete_at_for_final_statuses() {
2945        let _lock = ENV_MUTEX.lock().await;
2946
2947        use chrono::{DateTime, Duration, Utc};
2948        use std::env;
2949
2950        env::set_var("TRANSACTION_EXPIRATION_HOURS", "8");
2951
2952        let repo = setup_test_repo().await;
2953        let tx_id = format!("test-partial-final-{}", Uuid::new_v4());
2954        let mut tx = create_test_transaction(&tx_id);
2955        tx.delete_at = None;
2956        tx.status = TransactionStatus::Pending;
2957
2958        repo.create(tx).await.unwrap();
2959
2960        let before_update = Utc::now();
2961
2962        // Use partial_update to set status to Confirmed (final status)
2963        let update = TransactionUpdateRequest {
2964            status: Some(TransactionStatus::Confirmed),
2965            status_reason: Some("Transaction completed".to_string()),
2966            confirmed_at: Some("2023-01-01T12:05:00Z".to_string()),
2967            ..Default::default()
2968        };
2969
2970        let updated = repo.partial_update(tx_id.clone(), update).await.unwrap();
2971
2972        // Should have delete_at set
2973        assert!(
2974            updated.delete_at.is_some(),
2975            "delete_at should be set when updating to Confirmed status"
2976        );
2977
2978        // Verify the timestamp is reasonable (approximately 8 hours from now)
2979        let delete_at_str = updated.delete_at.unwrap();
2980        let delete_at = DateTime::parse_from_rfc3339(&delete_at_str)
2981            .expect("delete_at should be valid RFC3339")
2982            .with_timezone(&Utc);
2983
2984        let duration_from_before = delete_at.signed_duration_since(before_update);
2985        let expected_duration = Duration::hours(8);
2986        let tolerance = Duration::minutes(5);
2987
2988        assert!(
2989            duration_from_before >= expected_duration - tolerance
2990                && duration_from_before <= expected_duration + tolerance,
2991            "delete_at should be approximately 8 hours from now. Duration: {:?}",
2992            duration_from_before
2993        );
2994
2995        // Also verify other fields were updated
2996        assert_eq!(updated.status, TransactionStatus::Confirmed);
2997        assert_eq!(
2998            updated.status_reason,
2999            Some("Transaction completed".to_string())
3000        );
3001        assert_eq!(
3002            updated.confirmed_at,
3003            Some("2023-01-01T12:05:00Z".to_string())
3004        );
3005
3006        // Cleanup
3007        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
3008    }
3009
3010    #[tokio::test]
3011    #[ignore = "Requires active Redis instance"]
3012    async fn test_update_status_preserves_existing_delete_at() {
3013        let _lock = ENV_MUTEX.lock().await;
3014
3015        use std::env;
3016
3017        env::set_var("TRANSACTION_EXPIRATION_HOURS", "2");
3018
3019        let repo = setup_test_repo().await;
3020        let tx_id = format!("test-preserve-delete-at-{}", Uuid::new_v4());
3021        let mut tx = create_test_transaction(&tx_id);
3022
3023        // Set an existing delete_at value
3024        let existing_delete_at = "2025-01-01T12:00:00Z".to_string();
3025        tx.delete_at = Some(existing_delete_at.clone());
3026        tx.status = TransactionStatus::Pending;
3027
3028        repo.create(tx).await.unwrap();
3029
3030        // Update to final status
3031        let updated = repo
3032            .update_status(tx_id.clone(), TransactionStatus::Confirmed)
3033            .await
3034            .unwrap();
3035
3036        // Should preserve the existing delete_at value
3037        assert_eq!(
3038            updated.delete_at,
3039            Some(existing_delete_at),
3040            "Existing delete_at should be preserved when updating to final status"
3041        );
3042
3043        // Cleanup
3044        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
3045    }
3046    #[tokio::test]
3047    #[ignore = "Requires active Redis instance"]
3048    async fn test_partial_update_without_status_change_preserves_delete_at() {
3049        let _lock = ENV_MUTEX.lock().await;
3050
3051        use std::env;
3052
3053        env::set_var("TRANSACTION_EXPIRATION_HOURS", "3");
3054
3055        let repo = setup_test_repo().await;
3056        let tx_id = format!("test-preserve-no-status-{}", Uuid::new_v4());
3057        let mut tx = create_test_transaction(&tx_id);
3058        tx.delete_at = None;
3059        tx.status = TransactionStatus::Pending;
3060
3061        repo.create(tx).await.unwrap();
3062
3063        // First, update to final status to set delete_at
3064        let updated1 = repo
3065            .update_status(tx_id.clone(), TransactionStatus::Confirmed)
3066            .await
3067            .unwrap();
3068
3069        assert!(updated1.delete_at.is_some());
3070        let original_delete_at = updated1.delete_at.clone();
3071
3072        // Now update other fields without changing status
3073        let update = TransactionUpdateRequest {
3074            status: None, // No status change
3075            status_reason: Some("Updated reason".to_string()),
3076            confirmed_at: Some("2023-01-01T12:10:00Z".to_string()),
3077            ..Default::default()
3078        };
3079
3080        let updated2 = repo.partial_update(tx_id.clone(), update).await.unwrap();
3081
3082        // delete_at should be preserved
3083        assert_eq!(
3084            updated2.delete_at, original_delete_at,
3085            "delete_at should be preserved when status is not updated"
3086        );
3087
3088        // Other fields should be updated
3089        assert_eq!(updated2.status, TransactionStatus::Confirmed); // Unchanged
3090        assert_eq!(updated2.status_reason, Some("Updated reason".to_string()));
3091        assert_eq!(
3092            updated2.confirmed_at,
3093            Some("2023-01-01T12:10:00Z".to_string())
3094        );
3095
3096        // Cleanup
3097        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
3098    }
3099
3100    // Tests for delete_by_ids batch delete functionality
3101
3102    #[tokio::test]
3103    #[ignore = "Requires active Redis instance"]
3104    async fn test_delete_by_ids_empty_list() {
3105        let repo = setup_test_repo().await;
3106        let tx_id = format!("test-empty-{}", Uuid::new_v4());
3107
3108        // Create a transaction to ensure repo is not empty
3109        let tx = create_test_transaction(&tx_id);
3110        repo.create(tx).await.unwrap();
3111
3112        // Delete with empty list should succeed and not affect existing data
3113        let result = repo.delete_by_ids(vec![]).await.unwrap();
3114
3115        assert_eq!(result.deleted_count, 0);
3116        assert!(result.failed.is_empty());
3117
3118        // Original transaction should still exist
3119        assert!(repo.get_by_id(tx_id).await.is_ok());
3120    }
3121
3122    #[tokio::test]
3123    #[ignore = "Requires active Redis instance"]
3124    async fn test_delete_by_ids_single_transaction() {
3125        let repo = setup_test_repo().await;
3126        let tx_id = format!("test-single-{}", Uuid::new_v4());
3127
3128        let tx = create_test_transaction(&tx_id);
3129        repo.create(tx).await.unwrap();
3130
3131        let result = repo.delete_by_ids(vec![tx_id.clone()]).await.unwrap();
3132
3133        assert_eq!(result.deleted_count, 1);
3134        assert!(result.failed.is_empty());
3135
3136        // Verify transaction was deleted
3137        assert!(repo.get_by_id(tx_id).await.is_err());
3138    }
3139
3140    #[tokio::test]
3141    #[ignore = "Requires active Redis instance"]
3142    async fn test_delete_by_ids_multiple_transactions() {
3143        let repo = setup_test_repo().await;
3144        let base_id = Uuid::new_v4();
3145
3146        // Create multiple transactions
3147        let mut created_ids = Vec::new();
3148        for i in 1..=5 {
3149            let tx_id = format!("test-multi-{}-{}", base_id, i);
3150            let tx = create_test_transaction(&tx_id);
3151            repo.create(tx).await.unwrap();
3152            created_ids.push(tx_id);
3153        }
3154
3155        // Delete 3 of them
3156        let ids_to_delete = vec![
3157            created_ids[0].clone(),
3158            created_ids[2].clone(),
3159            created_ids[4].clone(),
3160        ];
3161        let result = repo.delete_by_ids(ids_to_delete).await.unwrap();
3162
3163        assert_eq!(result.deleted_count, 3);
3164        assert!(result.failed.is_empty());
3165
3166        // Verify correct transactions were deleted
3167        assert!(repo.get_by_id(created_ids[0].clone()).await.is_err());
3168        assert!(repo.get_by_id(created_ids[1].clone()).await.is_ok()); // Not deleted
3169        assert!(repo.get_by_id(created_ids[2].clone()).await.is_err());
3170        assert!(repo.get_by_id(created_ids[3].clone()).await.is_ok()); // Not deleted
3171        assert!(repo.get_by_id(created_ids[4].clone()).await.is_err());
3172    }
3173
3174    #[tokio::test]
3175    #[ignore = "Requires active Redis instance"]
3176    async fn test_delete_by_ids_nonexistent_transactions() {
3177        let repo = setup_test_repo().await;
3178        let base_id = Uuid::new_v4();
3179
3180        // Try to delete transactions that don't exist
3181        let ids_to_delete = vec![
3182            format!("nonexistent-{}-1", base_id),
3183            format!("nonexistent-{}-2", base_id),
3184        ];
3185        let result = repo.delete_by_ids(ids_to_delete.clone()).await.unwrap();
3186
3187        assert_eq!(result.deleted_count, 0);
3188        assert_eq!(result.failed.len(), 2);
3189
3190        // Verify error messages contain the IDs
3191        let failed_ids: Vec<&String> = result.failed.iter().map(|(id, _)| id).collect();
3192        assert!(failed_ids.contains(&&ids_to_delete[0]));
3193        assert!(failed_ids.contains(&&ids_to_delete[1]));
3194    }
3195
3196    #[tokio::test]
3197    #[ignore = "Requires active Redis instance"]
3198    async fn test_delete_by_ids_mixed_existing_and_nonexistent() {
3199        let repo = setup_test_repo().await;
3200        let base_id = Uuid::new_v4();
3201
3202        // Create some transactions
3203        let existing_ids: Vec<String> = (1..=3)
3204            .map(|i| format!("test-mixed-existing-{}-{}", base_id, i))
3205            .collect();
3206
3207        for id in &existing_ids {
3208            let tx = create_test_transaction(id);
3209            repo.create(tx).await.unwrap();
3210        }
3211
3212        let nonexistent_ids: Vec<String> = (1..=2)
3213            .map(|i| format!("test-mixed-nonexistent-{}-{}", base_id, i))
3214            .collect();
3215
3216        // Try to delete mix of existing and non-existing
3217        let ids_to_delete = vec![
3218            existing_ids[0].clone(),
3219            nonexistent_ids[0].clone(),
3220            existing_ids[1].clone(),
3221            nonexistent_ids[1].clone(),
3222        ];
3223        let result = repo.delete_by_ids(ids_to_delete).await.unwrap();
3224
3225        assert_eq!(result.deleted_count, 2);
3226        assert_eq!(result.failed.len(), 2);
3227
3228        // Verify existing transactions were deleted
3229        assert!(repo.get_by_id(existing_ids[0].clone()).await.is_err());
3230        assert!(repo.get_by_id(existing_ids[1].clone()).await.is_err());
3231
3232        // Verify remaining transaction still exists
3233        assert!(repo.get_by_id(existing_ids[2].clone()).await.is_ok());
3234    }
3235
3236    #[tokio::test]
3237    #[ignore = "Requires active Redis instance"]
3238    async fn test_delete_by_ids_removes_all_indexes() {
3239        let repo = setup_test_repo().await;
3240        let relayer_id = format!("relayer-{}", Uuid::new_v4());
3241        let tx_id = format!("test-indexes-{}", Uuid::new_v4());
3242
3243        // Create a transaction with specific status
3244        let mut tx = create_test_transaction(&tx_id);
3245        tx.relayer_id = relayer_id.clone();
3246        tx.status = TransactionStatus::Confirmed;
3247        repo.create(tx).await.unwrap();
3248
3249        // Verify transaction exists and is indexed
3250        let found = repo
3251            .find_by_status(&relayer_id, &[TransactionStatus::Confirmed])
3252            .await
3253            .unwrap();
3254        assert!(found.iter().any(|t| t.id == tx_id));
3255
3256        // Delete the transaction
3257        let result = repo.delete_by_ids(vec![tx_id.clone()]).await.unwrap();
3258        assert_eq!(result.deleted_count, 1);
3259
3260        // Verify transaction is no longer in status index
3261        let found_after = repo
3262            .find_by_status(&relayer_id, &[TransactionStatus::Confirmed])
3263            .await
3264            .unwrap();
3265        assert!(!found_after.iter().any(|t| t.id == tx_id));
3266
3267        // Verify transaction cannot be found
3268        assert!(repo.get_by_id(tx_id).await.is_err());
3269    }
3270
3271    #[tokio::test]
3272    #[ignore = "Requires active Redis instance"]
3273    async fn test_delete_by_ids_removes_nonce_index() {
3274        let repo = setup_test_repo().await;
3275        let relayer_id = format!("relayer-{}", Uuid::new_v4());
3276        let tx_id = format!("test-nonce-{}", Uuid::new_v4());
3277        let nonce = 12345u64;
3278
3279        // Create a transaction with a specific nonce
3280        let tx = create_test_transaction_with_nonce(&tx_id, nonce, &relayer_id);
3281        repo.create(tx).await.unwrap();
3282
3283        // Verify nonce index works
3284        let found = repo.find_by_nonce(&relayer_id, nonce).await.unwrap();
3285        assert!(found.is_some());
3286        assert_eq!(found.unwrap().id, tx_id);
3287
3288        // Delete the transaction
3289        let result = repo.delete_by_ids(vec![tx_id.clone()]).await.unwrap();
3290        assert_eq!(result.deleted_count, 1);
3291
3292        // Verify nonce index was cleaned up
3293        let found_after = repo.find_by_nonce(&relayer_id, nonce).await.unwrap();
3294        assert!(found_after.is_none());
3295    }
3296
3297    #[tokio::test]
3298    #[ignore = "Requires active Redis instance"]
3299    async fn test_delete_by_ids_large_batch() {
3300        let repo = setup_test_repo().await;
3301        let base_id = Uuid::new_v4();
3302
3303        // Create many transactions to test batch performance
3304        let count = 50;
3305        let mut created_ids = Vec::new();
3306
3307        for i in 0..count {
3308            let tx_id = format!("test-large-{}-{}", base_id, i);
3309            let tx = create_test_transaction(&tx_id);
3310            repo.create(tx).await.unwrap();
3311            created_ids.push(tx_id);
3312        }
3313
3314        // Delete all of them in one batch
3315        let result = repo.delete_by_ids(created_ids.clone()).await.unwrap();
3316
3317        assert_eq!(result.deleted_count, count);
3318        assert!(result.failed.is_empty());
3319
3320        // Verify all were deleted
3321        for id in created_ids {
3322            assert!(repo.get_by_id(id).await.is_err());
3323        }
3324    }
3325
3326    #[tokio::test]
3327    #[ignore = "Requires active Redis instance"]
3328    async fn test_delete_by_ids_preserves_other_relayer_transactions() {
3329        let repo = setup_test_repo().await;
3330        let relayer_1 = format!("relayer-1-{}", Uuid::new_v4());
3331        let relayer_2 = format!("relayer-2-{}", Uuid::new_v4());
3332        let tx_id_1 = format!("tx-relayer-1-{}", Uuid::new_v4());
3333        let tx_id_2 = format!("tx-relayer-2-{}", Uuid::new_v4());
3334
3335        // Create transactions for different relayers
3336        let tx1 = create_test_transaction_with_relayer(&tx_id_1, &relayer_1);
3337        let tx2 = create_test_transaction_with_relayer(&tx_id_2, &relayer_2);
3338
3339        repo.create(tx1).await.unwrap();
3340        repo.create(tx2).await.unwrap();
3341
3342        // Delete only relayer-1's transaction
3343        let result = repo.delete_by_ids(vec![tx_id_1.clone()]).await.unwrap();
3344
3345        assert_eq!(result.deleted_count, 1);
3346
3347        // relayer-1's transaction should be deleted
3348        assert!(repo.get_by_id(tx_id_1).await.is_err());
3349
3350        // relayer-2's transaction should still exist
3351        let remaining = repo.get_by_id(tx_id_2).await.unwrap();
3352        assert_eq!(remaining.relayer_id, relayer_2);
3353    }
3354}