openzeppelin_relayer/jobs/handlers/
token_swap_request_handler.rs

1//! Unified swap request handling worker implementation.
2//!
3//! This module implements the token swap request handling worker that processes
4//! swap jobs from the queue for all supported networks (Solana and Stellar).
5
6use actix_web::web::ThinData;
7use apalis::prelude::{Attempt, Data, Error, TaskId};
8use eyre::Result as EyreResult;
9use tracing::{debug, info, instrument};
10
11use crate::{
12    constants::WORKER_TOKEN_SWAP_REQUEST_RETRIES,
13    domain::get_network_relayer,
14    jobs::{handle_result, Job, TokenSwapRequest},
15    models::DefaultAppState,
16    observability::request_id::set_request_id,
17};
18
19/// Handles incoming swap jobs from the queue.
20///
21/// # Arguments
22/// * `job` - The swap job containing relayer ID
23/// * `context` - Application state containing services
24///
25/// # Returns
26/// * `Result<(), Error>` - Success or failure of swap processing
27#[instrument(
28    level = "debug",
29    skip(job, context),
30    fields(
31        request_id = ?job.request_id,
32        job_id = %job.message_id,
33        job_type = %job.job_type.to_string(),
34        attempt = %attempt.current(),
35        relayer_id = %job.data.relayer_id,
36        task_id = %task_id.to_string(),
37    )
38)]
39pub async fn token_swap_request_handler(
40    job: Job<TokenSwapRequest>,
41    context: Data<ThinData<DefaultAppState>>,
42    attempt: Attempt,
43    task_id: TaskId,
44) -> std::result::Result<(), Error> {
45    if let Some(request_id) = job.request_id.clone() {
46        set_request_id(request_id);
47    }
48
49    debug!(
50        relayer_id = %job.data.relayer_id,
51        "handling token swap request"
52    );
53
54    let result = handle_request(job.data, context).await;
55
56    handle_result(
57        result,
58        attempt,
59        "TokenSwapRequest",
60        WORKER_TOKEN_SWAP_REQUEST_RETRIES,
61    )
62}
63
64#[derive(Default, Debug, Clone)]
65pub struct TokenSwapCronReminder();
66
67/// Handles incoming swap jobs from the cron queue.
68#[instrument(
69    level = "info",
70    skip(_job, data, relayer_id),
71    fields(
72        job_type = "token_swap_cron",
73        attempt = %attempt.current(),
74    ),
75    err
76)]
77pub async fn token_swap_cron_handler(
78    _job: TokenSwapCronReminder,
79    relayer_id: Data<String>,
80    data: Data<ThinData<DefaultAppState>>,
81    attempt: Attempt,
82) -> std::result::Result<(), Error> {
83    info!(
84        relayer_id = %*relayer_id,
85        "handling token swap cron request"
86    );
87
88    let result = handle_request(
89        TokenSwapRequest {
90            relayer_id: relayer_id.to_string(),
91        },
92        data,
93    )
94    .await;
95
96    handle_result(
97        result,
98        attempt,
99        "TokenSwapRequest",
100        WORKER_TOKEN_SWAP_REQUEST_RETRIES,
101    )
102}
103
104async fn handle_request(
105    request: TokenSwapRequest,
106    context: Data<ThinData<DefaultAppState>>,
107) -> EyreResult<()> {
108    debug!(
109        relayer_id = %request.relayer_id,
110        "processing token swap"
111    );
112
113    let relayer = get_network_relayer(request.relayer_id.clone(), &context).await?;
114
115    relayer
116        .handle_token_swap_request(request.relayer_id.clone())
117        .await
118        .map_err(|e| eyre::eyre!("Failed to handle token swap request: {}", e))?;
119
120    debug!(
121        relayer_id = %request.relayer_id,
122        "token swap request completed"
123    );
124
125    Ok(())
126}
127
128#[cfg(test)]
129mod tests {}