openzeppelin_relayer/jobs/handlers/
transaction_request_handler.rs

1//! Transaction request handler for processing incoming transaction jobs.
2//!
3//! Handles the validation and preparation of transactions before they are
4//! submitted to the network
5use actix_web::web::ThinData;
6use apalis::prelude::{Attempt, Context, Data, TaskId, Worker, *};
7use apalis_redis::RedisContext;
8use tracing::instrument;
9
10use crate::{
11    constants::WORKER_TRANSACTION_REQUEST_RETRIES,
12    domain::{get_relayer_transaction, get_transaction_by_id, Transaction},
13    jobs::{handle_result, Job, TransactionRequest},
14    models::DefaultAppState,
15    observability::request_id::set_request_id,
16};
17
18#[instrument(
19    level = "debug",
20    skip(job, state, _worker, _ctx),
21    fields(
22        request_id = ?job.request_id,
23        job_id = %job.message_id,
24        job_type = %job.job_type.to_string(),
25        attempt = %attempt.current(),
26        tx_id = %job.data.transaction_id,
27        relayer_id = %job.data.relayer_id,
28        task_id = %task_id.to_string(),
29    )
30)]
31pub async fn transaction_request_handler(
32    job: Job<TransactionRequest>,
33    state: Data<ThinData<DefaultAppState>>,
34    attempt: Attempt,
35    _worker: Worker<Context>,
36    task_id: TaskId,
37    _ctx: RedisContext,
38) -> Result<(), Error> {
39    if let Some(request_id) = job.request_id.clone() {
40        set_request_id(request_id);
41    }
42
43    tracing::debug!(
44        tx_id = %job.data.transaction_id,
45        relayer_id = %job.data.relayer_id,
46        "handling transaction request"
47    );
48
49    let result = handle_request(job.data, state.clone()).await;
50
51    handle_result(
52        result,
53        attempt,
54        "Transaction Request",
55        WORKER_TRANSACTION_REQUEST_RETRIES,
56    )
57}
58
59async fn handle_request(
60    request: TransactionRequest,
61    state: Data<ThinData<DefaultAppState>>,
62) -> eyre::Result<()> {
63    let relayer_transaction = get_relayer_transaction(request.relayer_id, &state).await?;
64
65    let transaction = get_transaction_by_id(request.transaction_id.clone(), &state).await?;
66
67    tracing::debug!(
68        tx_id = %transaction.id,
69        relayer_id = %transaction.relayer_id,
70        status = ?transaction.status,
71        "preparing transaction"
72    );
73
74    let prepared = relayer_transaction.prepare_transaction(transaction).await?;
75
76    tracing::debug!(
77        tx_id = %prepared.id,
78        relayer_id = %prepared.relayer_id,
79        status = ?prepared.status,
80        "transaction prepared"
81    );
82
83    Ok(())
84}
85
86#[cfg(test)]
87mod tests {
88    use super::*;
89    use apalis::prelude::Attempt;
90
91    #[tokio::test]
92    async fn test_handler_result_processing() {
93        // This test focuses only on the interaction with handle_result
94        // which we can test without mocking the entire state
95
96        // Create a minimal job
97        let request = TransactionRequest::new("tx123", "relayer-1");
98        let job = Job::new(crate::jobs::JobType::TransactionRequest, request);
99
100        // Create a test attempt
101        let attempt = Attempt::default();
102
103        // We cannot fully test the transaction_request_handler without extensive mocking
104        // of the domain layer, but we can verify our test setup is correct
105        assert_eq!(job.data.transaction_id, "tx123");
106        assert_eq!(job.data.relayer_id, "relayer-1");
107        assert_eq!(attempt.current(), 0);
108    }
109
110    // Note: Fully testing the functionality would require either:
111    // 1. Dependency injection for all external dependencies
112    // 2. Feature flags to enable mock implementations
113    // 3. Integration tests with a real or test database
114
115    // For now, these tests serve as placeholders to be expanded
116    // when the appropriate testing infrastructure is in place.
117}