openzeppelin_relayer/jobs/handlers/
system_cleanup_handler.rs

1//! System cleanup worker implementation for Redis queue metadata.
2//!
3//! This module implements a cleanup worker that removes stale job metadata from Redis.
4//! The job queue library stores job metadata in Redis that never gets automatically cleaned up.
5//! When jobs complete, keys accumulate in:
6//! - `{namespace}:done` - Sorted set of completed job IDs (score = timestamp)
7//! - `{namespace}:data` - Hash storing job payloads (field = job_id)
8//! - `{namespace}:result` - Hash storing job results (field = job_id)
9//! - `{namespace}:failed` - Sorted set of failed jobs
10//! - `{namespace}:dead` - Sorted set of dead-letter jobs
11//!
12//! This worker runs every 15 minutes to clean up this metadata and prevent Redis memory from growing
13//! indefinitely.
14//!
15//! ## Distributed Lock
16//!
17//! Since this runs on multiple service instances simultaneously (each with its own
18//! CronStream), a distributed lock is used to ensure only one instance processes
19//! the cleanup at a time. The lock has a 14-minute TTL (the cron runs every 15 minutes),
20//! ensuring the lock expires before the next scheduled run.
21
22use actix_web::web::ThinData;
23use apalis::prelude::{Attempt, Data, *};
24use deadpool_redis::Pool;
25use eyre::Result;
26use std::env;
27use std::sync::Arc;
28use std::time::Duration;
29use tracing::{debug, error, info, instrument, warn};
30
31use crate::{
32    constants::WORKER_SYSTEM_CLEANUP_RETRIES, jobs::handle_result, models::DefaultAppState,
33    utils::DistributedLock,
34};
35
36/// Distributed lock name for queue cleanup.
37/// Only one instance across the cluster should run cleanup at a time.
38const SYSTEM_CLEANUP_LOCK_NAME: &str = "system_queue_cleanup";
39
40/// TTL for the distributed lock (14 minutes).
41///
42/// This value should be:
43/// 1. Greater than the worst-case cleanup runtime to prevent concurrent execution
44/// 2. Less than the cron interval (15 minutes) to ensure availability for the next run
45const SYSTEM_CLEANUP_LOCK_TTL_SECS: u64 = 14 * 60;
46
47/// Age threshold for job metadata cleanup (10 minutes).
48/// Jobs older than this threshold will be cleaned up.
49const JOB_AGE_THRESHOLD_SECS: i64 = 10 * 60;
50
51/// Batch size for cleanup operations.
52/// Processing in batches prevents memory issues with large datasets.
53const CLEANUP_BATCH_SIZE: isize = 500;
54
55/// Queue names to clean up.
56/// These are the queue namespaces used by the relayer.
57const QUEUE_NAMES: &[&str] = &[
58    "transaction_request_queue",
59    "transaction_submission_queue",
60    "transaction_status_queue",
61    "transaction_status_queue_evm",
62    "transaction_status_queue_stellar",
63    "notification_queue",
64    "token_swap_request_queue",
65    "relayer_health_check_queue",
66];
67
68/// Sorted set suffixes that contain job IDs to clean up.
69const SORTED_SET_SUFFIXES: &[&str] = &[":done", ":failed", ":dead"];
70
71/// Represents a cron reminder job for triggering system cleanup operations.
72#[derive(Default, Debug, Clone)]
73pub struct SystemCleanupCronReminder();
74
75/// Handles periodic queue metadata cleanup jobs.
76///
77/// This function processes stale job metadata by:
78/// 1. Acquiring a distributed lock to prevent concurrent cleanup
79/// 2. Iterating through all queue namespaces
80/// 3. For each queue, finding and removing job IDs older than threshold
81/// 4. Cleaning up associated data from the `:data` hash
82///
83/// # Arguments
84/// * `job` - The cron reminder job triggering the cleanup
85/// * `data` - Application state containing repositories
86/// * `attempt` - Current attempt number for retry logic
87///
88/// # Returns
89/// * `Result<(), Error>` - Success or failure of cleanup processing
90#[instrument(
91    level = "debug",
92    skip(job, data),
93    fields(
94        job_type = "system_cleanup",
95        attempt = %attempt.current(),
96    ),
97    err
98)]
99pub async fn system_cleanup_handler(
100    job: SystemCleanupCronReminder,
101    data: Data<ThinData<DefaultAppState>>,
102    attempt: Attempt,
103) -> Result<(), Error> {
104    let result = handle_cleanup_request(job, data, attempt.clone()).await;
105
106    handle_result(
107        result,
108        attempt,
109        "SystemCleanup",
110        WORKER_SYSTEM_CLEANUP_RETRIES,
111    )
112}
113
114/// Handles the actual system cleanup request logic.
115///
116/// This function first attempts to acquire a distributed lock to ensure only
117/// one instance processes cleanup at a time. If the lock is already held by
118/// another instance, this returns early without doing any work.
119///
120/// Note: Queue metadata cleanup only runs when using Redis storage.
121/// In-memory mode skips cleanup since distributed locking is not needed.
122async fn handle_cleanup_request(
123    _job: SystemCleanupCronReminder,
124    data: Data<ThinData<DefaultAppState>>,
125    _attempt: Attempt,
126) -> Result<()> {
127    let (pool, key_prefix) = match data.transaction_repository().connection_info() {
128        Some((pool, prefix)) => (pool, prefix.to_string()),
129        None => {
130            debug!("in-memory repository detected, skipping system cleanup");
131            return Ok(());
132        }
133    };
134
135    let lock_key = format!("{key_prefix}:lock:{SYSTEM_CLEANUP_LOCK_NAME}");
136
137    let lock = DistributedLock::new(
138        pool.clone(),
139        &lock_key,
140        Duration::from_secs(SYSTEM_CLEANUP_LOCK_TTL_SECS),
141    );
142
143    let _lock_guard = match lock.try_acquire().await {
144        Ok(Some(guard)) => {
145            debug!(lock_key = %lock_key, "acquired distributed lock for system cleanup");
146            guard
147        }
148        Ok(None) => {
149            info!(lock_key = %lock_key, "system cleanup skipped - another instance is processing");
150            return Ok(());
151        }
152        Err(e) => {
153            // If we can't communicate with Redis for locking, skip cleanup to avoid
154            // potential concurrent execution across multiple instances
155            warn!(
156                error = %e,
157                lock_key = %lock_key,
158                "failed to acquire distributed lock, skipping cleanup"
159            );
160            return Ok(());
161        }
162    };
163
164    info!("executing queue metadata cleanup");
165
166    // Queue keys use REDIS_KEY_PREFIX if set, with ":queue:" suffix
167    // Format: {REDIS_KEY_PREFIX}:queue:{queue_name}:done, etc.
168    // If REDIS_KEY_PREFIX is not set, keys are just {queue_name}:done, etc.
169    let redis_key_prefix = env::var("REDIS_KEY_PREFIX")
170        .ok()
171        .filter(|v| !v.is_empty())
172        .map(|value| format!("{value}:queue:"))
173        .unwrap_or_default();
174
175    let cutoff_timestamp = chrono::Utc::now().timestamp() - JOB_AGE_THRESHOLD_SECS;
176
177    let mut total_cleaned = 0usize;
178    let mut total_errors = 0usize;
179
180    // Process each queue
181    for queue_name in QUEUE_NAMES {
182        let namespace = format!("{redis_key_prefix}{queue_name}");
183
184        match cleanup_queue(&pool, &namespace, cutoff_timestamp).await {
185            Ok(cleaned) => {
186                if cleaned > 0 {
187                    debug!(
188                        queue = %queue_name,
189                        cleaned_count = cleaned,
190                        "cleaned up stale job metadata"
191                    );
192                }
193                total_cleaned += cleaned;
194            }
195            Err(e) => {
196                error!(
197                    queue = %queue_name,
198                    error = %e,
199                    "failed to cleanup queue"
200                );
201                total_errors += 1;
202            }
203        }
204    }
205
206    info!(
207        total_cleaned,
208        total_errors,
209        queues_processed = QUEUE_NAMES.len(),
210        "system cleanup completed"
211    );
212
213    if total_errors > 0 {
214        Err(eyre::eyre!(
215            "System cleanup completed with {} errors",
216            total_errors
217        ))
218    } else {
219        Ok(())
220    }
221}
222
223/// Cleans up stale job metadata for a single queue namespace.
224///
225/// # Arguments
226/// * `pool` - Redis connection pool
227/// * `namespace` - The queue namespace (e.g., "oz-relayer:queue:transaction_request_queue")
228/// * `cutoff_timestamp` - Unix timestamp; jobs older than this will be cleaned up
229///
230/// # Returns
231/// * `Result<usize>` - Number of jobs cleaned up
232async fn cleanup_queue(pool: &Arc<Pool>, namespace: &str, cutoff_timestamp: i64) -> Result<usize> {
233    let mut total_cleaned = 0usize;
234    let data_key = format!("{namespace}:data");
235    let result_key = format!("{data_key}::result");
236
237    // Clean up each sorted set (done, failed, dead) and associated hash entries
238    for suffix in SORTED_SET_SUFFIXES {
239        let sorted_set_key = format!("{namespace}{suffix}");
240        let cleaned = cleanup_sorted_set_and_hashes(
241            pool,
242            &sorted_set_key,
243            &data_key,
244            &result_key,
245            cutoff_timestamp,
246        )
247        .await?;
248        total_cleaned += cleaned;
249    }
250
251    Ok(total_cleaned)
252}
253
254/// Cleans up job IDs from a sorted set and their associated data/result hashes.
255///
256/// Uses ZRANGEBYSCORE to find old job IDs, then removes them from the
257/// sorted set and both the data and result hashes in a pipeline for efficiency.
258///
259/// # Arguments
260/// * `pool` - Redis connection pool
261/// * `sorted_set_key` - Key of the sorted set (e.g., "queue:transaction_request_queue:done")
262/// * `data_key` - Key of the data hash (e.g., "queue:transaction_request_queue:data")
263/// * `result_key` - Key of the result hash (e.g., "queue:transaction_request_queue:result")
264/// * `cutoff_timestamp` - Unix timestamp; jobs with score older than this will be cleaned up
265///
266/// # Returns
267/// * `Result<usize>` - Number of jobs cleaned up
268async fn cleanup_sorted_set_and_hashes(
269    pool: &Arc<Pool>,
270    sorted_set_key: &str,
271    data_key: &str,
272    result_key: &str,
273    cutoff_timestamp: i64,
274) -> Result<usize> {
275    let mut total_cleaned = 0usize;
276    let mut conn = pool.get().await?;
277
278    loop {
279        // Get batch of old job IDs from sorted set
280        // ZRANGEBYSCORE key -inf cutoff LIMIT 0 batch_size
281        let job_ids: Vec<String> = redis::cmd("ZRANGEBYSCORE")
282            .arg(sorted_set_key)
283            .arg("-inf")
284            .arg(cutoff_timestamp)
285            .arg("LIMIT")
286            .arg(0)
287            .arg(CLEANUP_BATCH_SIZE)
288            .query_async(&mut conn)
289            .await?;
290
291        if job_ids.is_empty() {
292            break;
293        }
294
295        let batch_size = job_ids.len();
296
297        // Use pipeline to remove from sorted set and both hashes atomically
298        let mut pipe = redis::pipe();
299
300        // ZREM sorted_set_key job_id1 job_id2 ...
301        pipe.cmd("ZREM").arg(sorted_set_key);
302        for job_id in &job_ids {
303            pipe.arg(job_id);
304        }
305        pipe.ignore();
306
307        // HDEL data_key job_id1 job_id2 ...
308        pipe.cmd("HDEL").arg(data_key);
309        for job_id in &job_ids {
310            pipe.arg(job_id);
311        }
312        pipe.ignore();
313
314        // HDEL result_key job_id1 job_id2 ...
315        pipe.cmd("HDEL").arg(result_key);
316        for job_id in &job_ids {
317            pipe.arg(job_id);
318        }
319        pipe.ignore();
320
321        pipe.query_async::<()>(&mut conn).await?;
322
323        total_cleaned += batch_size;
324
325        // If we got fewer than batch size, we're done
326        if batch_size < CLEANUP_BATCH_SIZE as usize {
327            break;
328        }
329    }
330
331    Ok(total_cleaned)
332}
333
334#[cfg(test)]
335mod tests {
336    use super::*;
337
338    #[test]
339    fn test_queue_names_not_empty() {
340        assert!(!QUEUE_NAMES.is_empty());
341    }
342
343    #[test]
344    fn test_sorted_set_suffixes() {
345        assert!(SORTED_SET_SUFFIXES.contains(&":done"));
346        assert!(SORTED_SET_SUFFIXES.contains(&":failed"));
347        assert!(SORTED_SET_SUFFIXES.contains(&":dead"));
348    }
349
350    #[test]
351    fn test_constants() {
352        assert_eq!(SYSTEM_CLEANUP_LOCK_TTL_SECS, 14 * 60); // 14 minutes
353        assert_eq!(JOB_AGE_THRESHOLD_SECS, 10 * 60); // 10 minutes
354        assert_eq!(CLEANUP_BATCH_SIZE, 500);
355    }
356
357    #[test]
358    fn test_namespace_format_without_prefix() {
359        // When REDIS_KEY_PREFIX is not set, queue keys are at root level
360        let redis_key_prefix = "";
361        let queue_name = "transaction_request_queue";
362        let namespace = format!("{redis_key_prefix}{queue_name}");
363        assert_eq!(namespace, "transaction_request_queue");
364    }
365
366    #[test]
367    fn test_namespace_format_with_prefix() {
368        // When REDIS_KEY_PREFIX is set, queue keys include the prefix
369        let redis_key_prefix = "oz-relayer:queue:";
370        let queue_name = "transaction_request_queue";
371        let namespace = format!("{redis_key_prefix}{queue_name}");
372        assert_eq!(namespace, "oz-relayer:queue:transaction_request_queue");
373    }
374
375    #[test]
376    fn test_sorted_set_key_format() {
377        // Without prefix
378        let namespace = "transaction_request_queue";
379        let sorted_set_key = format!("{namespace}:done");
380        assert_eq!(sorted_set_key, "transaction_request_queue:done");
381
382        // With prefix
383        let namespace_with_prefix = "oz-relayer:queue:transaction_request_queue";
384        let sorted_set_key_with_prefix = format!("{namespace_with_prefix}:done");
385        assert_eq!(
386            sorted_set_key_with_prefix,
387            "oz-relayer:queue:transaction_request_queue:done"
388        );
389    }
390
391    #[test]
392    fn test_data_key_format() {
393        // Without prefix
394        let namespace = "transaction_request_queue";
395        let data_key = format!("{namespace}:data");
396        assert_eq!(data_key, "transaction_request_queue:data");
397
398        // With prefix
399        let namespace_with_prefix = "oz-relayer:queue:transaction_request_queue";
400        let data_key_with_prefix = format!("{namespace_with_prefix}:data");
401        assert_eq!(
402            data_key_with_prefix,
403            "oz-relayer:queue:transaction_request_queue:data"
404        );
405    }
406
407    #[test]
408    fn test_result_key_format() {
409        // Without prefix
410        let namespace = "transaction_request_queue";
411        let result_key = format!("{namespace}:result");
412        assert_eq!(result_key, "transaction_request_queue:result");
413
414        // With prefix
415        let namespace_with_prefix = "oz-relayer:queue:transaction_request_queue";
416        let result_key_with_prefix = format!("{namespace_with_prefix}:result");
417        assert_eq!(
418            result_key_with_prefix,
419            "oz-relayer:queue:transaction_request_queue:result"
420        );
421    }
422
423    #[test]
424    fn test_lock_key_format() {
425        let prefix = "oz-relayer";
426        let lock_key = format!("{prefix}:lock:{SYSTEM_CLEANUP_LOCK_NAME}");
427        assert_eq!(lock_key, "oz-relayer:lock:system_queue_cleanup");
428    }
429
430    #[test]
431    fn test_cutoff_timestamp_calculation() {
432        let now = chrono::Utc::now().timestamp();
433        let cutoff = now - JOB_AGE_THRESHOLD_SECS;
434        assert!(cutoff < now);
435        assert_eq!(now - cutoff, JOB_AGE_THRESHOLD_SECS);
436    }
437}