openzeppelin_relayer/jobs/handlers/
transaction_request_handler.rs1use actix_web::web::ThinData;
6use apalis::prelude::{Attempt, Context, Data, TaskId, Worker, *};
7use apalis_redis::RedisContext;
8use tracing::instrument;
9
10use crate::{
11 constants::WORKER_TRANSACTION_REQUEST_RETRIES,
12 domain::{get_relayer_transaction, get_transaction_by_id, Transaction},
13 jobs::{handle_result, Job, TransactionRequest},
14 models::DefaultAppState,
15 observability::request_id::set_request_id,
16};
17
18#[instrument(
19 level = "debug",
20 skip(job, state, _worker, _ctx),
21 fields(
22 request_id = ?job.request_id,
23 job_id = %job.message_id,
24 job_type = %job.job_type.to_string(),
25 attempt = %attempt.current(),
26 tx_id = %job.data.transaction_id,
27 relayer_id = %job.data.relayer_id,
28 task_id = %task_id.to_string(),
29 )
30)]
31pub async fn transaction_request_handler(
32 job: Job<TransactionRequest>,
33 state: Data<ThinData<DefaultAppState>>,
34 attempt: Attempt,
35 _worker: Worker<Context>,
36 task_id: TaskId,
37 _ctx: RedisContext,
38) -> Result<(), Error> {
39 if let Some(request_id) = job.request_id.clone() {
40 set_request_id(request_id);
41 }
42
43 tracing::debug!(
44 tx_id = %job.data.transaction_id,
45 relayer_id = %job.data.relayer_id,
46 "handling transaction request"
47 );
48
49 let result = handle_request(job.data, state.clone()).await;
50
51 handle_result(
52 result,
53 attempt,
54 "Transaction Request",
55 WORKER_TRANSACTION_REQUEST_RETRIES,
56 )
57}
58
59async fn handle_request(
60 request: TransactionRequest,
61 state: Data<ThinData<DefaultAppState>>,
62) -> eyre::Result<()> {
63 let relayer_transaction = get_relayer_transaction(request.relayer_id, &state).await?;
64
65 let transaction = get_transaction_by_id(request.transaction_id.clone(), &state).await?;
66
67 tracing::debug!(
68 tx_id = %transaction.id,
69 relayer_id = %transaction.relayer_id,
70 status = ?transaction.status,
71 "preparing transaction"
72 );
73
74 let prepared = relayer_transaction.prepare_transaction(transaction).await?;
75
76 tracing::debug!(
77 tx_id = %prepared.id,
78 relayer_id = %prepared.relayer_id,
79 status = ?prepared.status,
80 "transaction prepared"
81 );
82
83 Ok(())
84}
85
86#[cfg(test)]
87mod tests {
88 use super::*;
89 use apalis::prelude::Attempt;
90
91 #[tokio::test]
92 async fn test_handler_result_processing() {
93 let request = TransactionRequest::new("tx123", "relayer-1");
98 let job = Job::new(crate::jobs::JobType::TransactionRequest, request);
99
100 let attempt = Attempt::default();
102
103 assert_eq!(job.data.transaction_id, "tx123");
106 assert_eq!(job.data.relayer_id, "relayer-1");
107 assert_eq!(attempt.current(), 0);
108 }
109
110 }