openzeppelin_relayer/jobs/handlers/
system_cleanup_handler.rs1use actix_web::web::ThinData;
23use apalis::prelude::{Attempt, Data, *};
24use deadpool_redis::Pool;
25use eyre::Result;
26use std::env;
27use std::sync::Arc;
28use std::time::Duration;
29use tracing::{debug, error, info, instrument, warn};
30
31use crate::{
32 constants::WORKER_SYSTEM_CLEANUP_RETRIES, jobs::handle_result, models::DefaultAppState,
33 utils::DistributedLock,
34};
35
36const SYSTEM_CLEANUP_LOCK_NAME: &str = "system_queue_cleanup";
39
40const SYSTEM_CLEANUP_LOCK_TTL_SECS: u64 = 14 * 60;
46
47const JOB_AGE_THRESHOLD_SECS: i64 = 10 * 60;
50
51const CLEANUP_BATCH_SIZE: isize = 500;
54
55const QUEUE_NAMES: &[&str] = &[
58 "transaction_request_queue",
59 "transaction_submission_queue",
60 "transaction_status_queue",
61 "transaction_status_queue_evm",
62 "transaction_status_queue_stellar",
63 "notification_queue",
64 "token_swap_request_queue",
65 "relayer_health_check_queue",
66];
67
68const SORTED_SET_SUFFIXES: &[&str] = &[":done", ":failed", ":dead"];
70
71#[derive(Default, Debug, Clone)]
73pub struct SystemCleanupCronReminder();
74
75#[instrument(
91 level = "debug",
92 skip(job, data),
93 fields(
94 job_type = "system_cleanup",
95 attempt = %attempt.current(),
96 ),
97 err
98)]
99pub async fn system_cleanup_handler(
100 job: SystemCleanupCronReminder,
101 data: Data<ThinData<DefaultAppState>>,
102 attempt: Attempt,
103) -> Result<(), Error> {
104 let result = handle_cleanup_request(job, data, attempt.clone()).await;
105
106 handle_result(
107 result,
108 attempt,
109 "SystemCleanup",
110 WORKER_SYSTEM_CLEANUP_RETRIES,
111 )
112}
113
114async fn handle_cleanup_request(
123 _job: SystemCleanupCronReminder,
124 data: Data<ThinData<DefaultAppState>>,
125 _attempt: Attempt,
126) -> Result<()> {
127 let (pool, key_prefix) = match data.transaction_repository().connection_info() {
128 Some((pool, prefix)) => (pool, prefix.to_string()),
129 None => {
130 debug!("in-memory repository detected, skipping system cleanup");
131 return Ok(());
132 }
133 };
134
135 let lock_key = format!("{key_prefix}:lock:{SYSTEM_CLEANUP_LOCK_NAME}");
136
137 let lock = DistributedLock::new(
138 pool.clone(),
139 &lock_key,
140 Duration::from_secs(SYSTEM_CLEANUP_LOCK_TTL_SECS),
141 );
142
143 let _lock_guard = match lock.try_acquire().await {
144 Ok(Some(guard)) => {
145 debug!(lock_key = %lock_key, "acquired distributed lock for system cleanup");
146 guard
147 }
148 Ok(None) => {
149 info!(lock_key = %lock_key, "system cleanup skipped - another instance is processing");
150 return Ok(());
151 }
152 Err(e) => {
153 warn!(
156 error = %e,
157 lock_key = %lock_key,
158 "failed to acquire distributed lock, skipping cleanup"
159 );
160 return Ok(());
161 }
162 };
163
164 info!("executing queue metadata cleanup");
165
166 let redis_key_prefix = env::var("REDIS_KEY_PREFIX")
170 .ok()
171 .filter(|v| !v.is_empty())
172 .map(|value| format!("{value}:queue:"))
173 .unwrap_or_default();
174
175 let cutoff_timestamp = chrono::Utc::now().timestamp() - JOB_AGE_THRESHOLD_SECS;
176
177 let mut total_cleaned = 0usize;
178 let mut total_errors = 0usize;
179
180 for queue_name in QUEUE_NAMES {
182 let namespace = format!("{redis_key_prefix}{queue_name}");
183
184 match cleanup_queue(&pool, &namespace, cutoff_timestamp).await {
185 Ok(cleaned) => {
186 if cleaned > 0 {
187 debug!(
188 queue = %queue_name,
189 cleaned_count = cleaned,
190 "cleaned up stale job metadata"
191 );
192 }
193 total_cleaned += cleaned;
194 }
195 Err(e) => {
196 error!(
197 queue = %queue_name,
198 error = %e,
199 "failed to cleanup queue"
200 );
201 total_errors += 1;
202 }
203 }
204 }
205
206 info!(
207 total_cleaned,
208 total_errors,
209 queues_processed = QUEUE_NAMES.len(),
210 "system cleanup completed"
211 );
212
213 if total_errors > 0 {
214 Err(eyre::eyre!(
215 "System cleanup completed with {} errors",
216 total_errors
217 ))
218 } else {
219 Ok(())
220 }
221}
222
223async fn cleanup_queue(pool: &Arc<Pool>, namespace: &str, cutoff_timestamp: i64) -> Result<usize> {
233 let mut total_cleaned = 0usize;
234 let data_key = format!("{namespace}:data");
235 let result_key = format!("{data_key}::result");
236
237 for suffix in SORTED_SET_SUFFIXES {
239 let sorted_set_key = format!("{namespace}{suffix}");
240 let cleaned = cleanup_sorted_set_and_hashes(
241 pool,
242 &sorted_set_key,
243 &data_key,
244 &result_key,
245 cutoff_timestamp,
246 )
247 .await?;
248 total_cleaned += cleaned;
249 }
250
251 Ok(total_cleaned)
252}
253
254async fn cleanup_sorted_set_and_hashes(
269 pool: &Arc<Pool>,
270 sorted_set_key: &str,
271 data_key: &str,
272 result_key: &str,
273 cutoff_timestamp: i64,
274) -> Result<usize> {
275 let mut total_cleaned = 0usize;
276 let mut conn = pool.get().await?;
277
278 loop {
279 let job_ids: Vec<String> = redis::cmd("ZRANGEBYSCORE")
282 .arg(sorted_set_key)
283 .arg("-inf")
284 .arg(cutoff_timestamp)
285 .arg("LIMIT")
286 .arg(0)
287 .arg(CLEANUP_BATCH_SIZE)
288 .query_async(&mut conn)
289 .await?;
290
291 if job_ids.is_empty() {
292 break;
293 }
294
295 let batch_size = job_ids.len();
296
297 let mut pipe = redis::pipe();
299
300 pipe.cmd("ZREM").arg(sorted_set_key);
302 for job_id in &job_ids {
303 pipe.arg(job_id);
304 }
305 pipe.ignore();
306
307 pipe.cmd("HDEL").arg(data_key);
309 for job_id in &job_ids {
310 pipe.arg(job_id);
311 }
312 pipe.ignore();
313
314 pipe.cmd("HDEL").arg(result_key);
316 for job_id in &job_ids {
317 pipe.arg(job_id);
318 }
319 pipe.ignore();
320
321 pipe.query_async::<()>(&mut conn).await?;
322
323 total_cleaned += batch_size;
324
325 if batch_size < CLEANUP_BATCH_SIZE as usize {
327 break;
328 }
329 }
330
331 Ok(total_cleaned)
332}
333
334#[cfg(test)]
335mod tests {
336 use super::*;
337
338 #[test]
339 fn test_queue_names_not_empty() {
340 assert!(!QUEUE_NAMES.is_empty());
341 }
342
343 #[test]
344 fn test_sorted_set_suffixes() {
345 assert!(SORTED_SET_SUFFIXES.contains(&":done"));
346 assert!(SORTED_SET_SUFFIXES.contains(&":failed"));
347 assert!(SORTED_SET_SUFFIXES.contains(&":dead"));
348 }
349
350 #[test]
351 fn test_constants() {
352 assert_eq!(SYSTEM_CLEANUP_LOCK_TTL_SECS, 14 * 60); assert_eq!(JOB_AGE_THRESHOLD_SECS, 10 * 60); assert_eq!(CLEANUP_BATCH_SIZE, 500);
355 }
356
357 #[test]
358 fn test_namespace_format_without_prefix() {
359 let redis_key_prefix = "";
361 let queue_name = "transaction_request_queue";
362 let namespace = format!("{redis_key_prefix}{queue_name}");
363 assert_eq!(namespace, "transaction_request_queue");
364 }
365
366 #[test]
367 fn test_namespace_format_with_prefix() {
368 let redis_key_prefix = "oz-relayer:queue:";
370 let queue_name = "transaction_request_queue";
371 let namespace = format!("{redis_key_prefix}{queue_name}");
372 assert_eq!(namespace, "oz-relayer:queue:transaction_request_queue");
373 }
374
375 #[test]
376 fn test_sorted_set_key_format() {
377 let namespace = "transaction_request_queue";
379 let sorted_set_key = format!("{namespace}:done");
380 assert_eq!(sorted_set_key, "transaction_request_queue:done");
381
382 let namespace_with_prefix = "oz-relayer:queue:transaction_request_queue";
384 let sorted_set_key_with_prefix = format!("{namespace_with_prefix}:done");
385 assert_eq!(
386 sorted_set_key_with_prefix,
387 "oz-relayer:queue:transaction_request_queue:done"
388 );
389 }
390
391 #[test]
392 fn test_data_key_format() {
393 let namespace = "transaction_request_queue";
395 let data_key = format!("{namespace}:data");
396 assert_eq!(data_key, "transaction_request_queue:data");
397
398 let namespace_with_prefix = "oz-relayer:queue:transaction_request_queue";
400 let data_key_with_prefix = format!("{namespace_with_prefix}:data");
401 assert_eq!(
402 data_key_with_prefix,
403 "oz-relayer:queue:transaction_request_queue:data"
404 );
405 }
406
407 #[test]
408 fn test_result_key_format() {
409 let namespace = "transaction_request_queue";
411 let result_key = format!("{namespace}:result");
412 assert_eq!(result_key, "transaction_request_queue:result");
413
414 let namespace_with_prefix = "oz-relayer:queue:transaction_request_queue";
416 let result_key_with_prefix = format!("{namespace_with_prefix}:result");
417 assert_eq!(
418 result_key_with_prefix,
419 "oz-relayer:queue:transaction_request_queue:result"
420 );
421 }
422
423 #[test]
424 fn test_lock_key_format() {
425 let prefix = "oz-relayer";
426 let lock_key = format!("{prefix}:lock:{SYSTEM_CLEANUP_LOCK_NAME}");
427 assert_eq!(lock_key, "oz-relayer:lock:system_queue_cleanup");
428 }
429
430 #[test]
431 fn test_cutoff_timestamp_calculation() {
432 let now = chrono::Utc::now().timestamp();
433 let cutoff = now - JOB_AGE_THRESHOLD_SECS;
434 assert!(cutoff < now);
435 assert_eq!(now - cutoff, JOB_AGE_THRESHOLD_SECS);
436 }
437}