openzeppelin_relayer/jobs/handlers/
notification_handler.rs

1//! Notification handling worker implementation.
2//!
3//! This module implements the notification handling worker that processes
4//! notification jobs from the queue.
5
6use actix_web::web::ThinData;
7use apalis::prelude::{Attempt, Data, TaskId, *};
8use eyre::Result;
9use tracing::{debug, instrument};
10
11use crate::{
12    constants::WORKER_NOTIFICATION_SENDER_RETRIES,
13    jobs::{handle_result, Job, NotificationSend},
14    models::DefaultAppState,
15    observability::request_id::set_request_id,
16    repositories::Repository,
17    services::WebhookNotificationService,
18};
19
20/// Handles incoming notification jobs from the queue.
21///
22/// # Arguments
23/// * `job` - The notification job containing recipient and message details
24/// * `context` - Application state containing notification services
25///
26/// # Returns
27/// * `Result<(), Error>` - Success or failure of notification processing
28#[instrument(
29    level = "debug",
30    skip(job, context),
31    fields(
32        request_id = ?job.request_id,
33        job_id = %job.message_id,
34        job_type = %job.job_type.to_string(),
35        attempt = %attempt.current(),
36        task_id = %task_id.to_string(),
37        notification_id = %job.data.notification_id,
38    )
39)]
40pub async fn notification_handler(
41    job: Job<NotificationSend>,
42    context: Data<ThinData<DefaultAppState>>,
43    attempt: Attempt,
44    task_id: TaskId,
45) -> Result<(), Error> {
46    if let Some(request_id) = job.request_id.clone() {
47        set_request_id(request_id);
48    }
49
50    debug!(
51        notification_id = %job.data.notification_id,
52        "handling notification"
53    );
54
55    let result = handle_request(job.data, context).await;
56
57    handle_result(
58        result,
59        attempt,
60        "Notification",
61        WORKER_NOTIFICATION_SENDER_RETRIES,
62    )
63}
64
65async fn handle_request(
66    request: NotificationSend,
67    context: Data<ThinData<DefaultAppState>>,
68) -> Result<()> {
69    debug!(
70        notification_id = %request.notification_id,
71        "sending notification"
72    );
73    let notification = context
74        .notification_repository
75        .get_by_id(request.notification_id.clone())
76        .await?;
77
78    let notification_service =
79        WebhookNotificationService::new(notification.url, notification.signing_key);
80
81    notification_service
82        .send_notification(request.notification)
83        .await?;
84
85    debug!(
86        notification_id = %request.notification_id,
87        "notification sent successfully"
88    );
89
90    Ok(())
91}
92
93#[cfg(test)]
94mod tests {
95    use super::*;
96    use crate::models::{
97        EvmTransactionResponse, NetworkType, RelayerDisabledPayload, RelayerEvmPolicy,
98        RelayerNetworkPolicyResponse, RelayerResponse, TransactionResponse, TransactionStatus,
99        WebhookNotification, WebhookPayload, U256,
100    };
101
102    #[tokio::test]
103    async fn test_notification_job_creation() {
104        // Create a basic notification webhook payload
105        let payload = WebhookPayload::Transaction(TransactionResponse::Evm(Box::new(
106            EvmTransactionResponse {
107                id: "tx123".to_string(),
108                hash: Some("0x123".to_string()),
109                status: TransactionStatus::Confirmed,
110                status_reason: None,
111                created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
112                sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
113                confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
114                gas_price: Some(1000000000),
115                gas_limit: Some(21000),
116                nonce: Some(1),
117                value: U256::from(1000000000000000000_u64),
118                from: "0xabc".to_string(),
119                to: Some("0xdef".to_string()),
120                relayer_id: "relayer-1".to_string(),
121                data: None,
122                max_fee_per_gas: None,
123                max_priority_fee_per_gas: None,
124                signature: None,
125                speed: None,
126            },
127        )));
128
129        // Create a notification
130        let notification = WebhookNotification::new("test_event".to_string(), payload);
131        let notification_job =
132            NotificationSend::new("notification-1".to_string(), notification.clone());
133
134        // Create the job
135        let job = Job::new(crate::jobs::JobType::NotificationSend, notification_job);
136
137        // Test the job structure
138        assert_eq!(job.data.notification_id, "notification-1");
139        assert_eq!(job.data.notification.event, "test_event");
140    }
141
142    #[tokio::test]
143    async fn test_notification_job_with_different_payloads() {
144        // Test with different payload types
145
146        let transaction_payload = WebhookPayload::Transaction(TransactionResponse::Evm(Box::new(
147            EvmTransactionResponse {
148                id: "tx123".to_string(),
149                hash: Some("0x123".to_string()),
150                status: TransactionStatus::Confirmed,
151                status_reason: None,
152                created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
153                sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
154                confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
155                gas_price: Some(1000000000),
156                gas_limit: Some(21000),
157                nonce: Some(1),
158                value: U256::from(1000000000000000000_u64),
159                from: "0xabc".to_string(),
160                to: Some("0xdef".to_string()),
161                relayer_id: "relayer-1".to_string(),
162                data: None,
163                max_fee_per_gas: None,
164                max_priority_fee_per_gas: None,
165                signature: None,
166                speed: None,
167            },
168        )));
169
170        let string_notification =
171            WebhookNotification::new("transaction_payload".to_string(), transaction_payload);
172        let job = NotificationSend::new("notification-string".to_string(), string_notification);
173        assert_eq!(job.notification.event, "transaction_payload");
174
175        let relayer_disabled = WebhookPayload::RelayerDisabled(Box::new(RelayerDisabledPayload {
176            relayer: RelayerResponse {
177                id: "relayer-1".to_string(),
178                name: "relayer-1".to_string(),
179                network: "ethereum".to_string(),
180                network_type: NetworkType::Evm,
181                paused: false,
182                policies: Some(RelayerNetworkPolicyResponse::Evm(
183                    RelayerEvmPolicy {
184                        gas_price_cap: None,
185                        whitelist_receivers: None,
186                        eip1559_pricing: None,
187                        private_transactions: Some(false),
188                        min_balance: Some(0),
189                        gas_limit_estimation: None,
190                    }
191                    .into(),
192                )),
193                signer_id: "signer-1".to_string(),
194                notification_id: None,
195                custom_rpc_urls: None,
196                address: Some("0xabc".to_string()),
197                system_disabled: Some(false),
198                ..Default::default()
199            },
200            disable_reason: "test".to_string(),
201        }));
202        let object_notification =
203            WebhookNotification::new("object_event".to_string(), relayer_disabled);
204        let job = NotificationSend::new("notification-object".to_string(), object_notification);
205        assert_eq!(job.notification.event, "object_event");
206    }
207}