openzeppelin_relayer/jobs/handlers/
notification_handler.rs1use actix_web::web::ThinData;
7use apalis::prelude::{Attempt, Data, TaskId, *};
8use eyre::Result;
9use tracing::{debug, instrument};
10
11use crate::{
12 constants::WORKER_NOTIFICATION_SENDER_RETRIES,
13 jobs::{handle_result, Job, NotificationSend},
14 models::DefaultAppState,
15 observability::request_id::set_request_id,
16 repositories::Repository,
17 services::WebhookNotificationService,
18};
19
20#[instrument(
29 level = "debug",
30 skip(job, context),
31 fields(
32 request_id = ?job.request_id,
33 job_id = %job.message_id,
34 job_type = %job.job_type.to_string(),
35 attempt = %attempt.current(),
36 task_id = %task_id.to_string(),
37 notification_id = %job.data.notification_id,
38 )
39)]
40pub async fn notification_handler(
41 job: Job<NotificationSend>,
42 context: Data<ThinData<DefaultAppState>>,
43 attempt: Attempt,
44 task_id: TaskId,
45) -> Result<(), Error> {
46 if let Some(request_id) = job.request_id.clone() {
47 set_request_id(request_id);
48 }
49
50 debug!(
51 notification_id = %job.data.notification_id,
52 "handling notification"
53 );
54
55 let result = handle_request(job.data, context).await;
56
57 handle_result(
58 result,
59 attempt,
60 "Notification",
61 WORKER_NOTIFICATION_SENDER_RETRIES,
62 )
63}
64
65async fn handle_request(
66 request: NotificationSend,
67 context: Data<ThinData<DefaultAppState>>,
68) -> Result<()> {
69 debug!(
70 notification_id = %request.notification_id,
71 "sending notification"
72 );
73 let notification = context
74 .notification_repository
75 .get_by_id(request.notification_id.clone())
76 .await?;
77
78 let notification_service =
79 WebhookNotificationService::new(notification.url, notification.signing_key);
80
81 notification_service
82 .send_notification(request.notification)
83 .await?;
84
85 debug!(
86 notification_id = %request.notification_id,
87 "notification sent successfully"
88 );
89
90 Ok(())
91}
92
93#[cfg(test)]
94mod tests {
95 use super::*;
96 use crate::models::{
97 EvmTransactionResponse, NetworkType, RelayerDisabledPayload, RelayerEvmPolicy,
98 RelayerNetworkPolicyResponse, RelayerResponse, TransactionResponse, TransactionStatus,
99 WebhookNotification, WebhookPayload, U256,
100 };
101
102 #[tokio::test]
103 async fn test_notification_job_creation() {
104 let payload = WebhookPayload::Transaction(TransactionResponse::Evm(Box::new(
106 EvmTransactionResponse {
107 id: "tx123".to_string(),
108 hash: Some("0x123".to_string()),
109 status: TransactionStatus::Confirmed,
110 status_reason: None,
111 created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
112 sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
113 confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
114 gas_price: Some(1000000000),
115 gas_limit: Some(21000),
116 nonce: Some(1),
117 value: U256::from(1000000000000000000_u64),
118 from: "0xabc".to_string(),
119 to: Some("0xdef".to_string()),
120 relayer_id: "relayer-1".to_string(),
121 data: None,
122 max_fee_per_gas: None,
123 max_priority_fee_per_gas: None,
124 signature: None,
125 speed: None,
126 },
127 )));
128
129 let notification = WebhookNotification::new("test_event".to_string(), payload);
131 let notification_job =
132 NotificationSend::new("notification-1".to_string(), notification.clone());
133
134 let job = Job::new(crate::jobs::JobType::NotificationSend, notification_job);
136
137 assert_eq!(job.data.notification_id, "notification-1");
139 assert_eq!(job.data.notification.event, "test_event");
140 }
141
142 #[tokio::test]
143 async fn test_notification_job_with_different_payloads() {
144 let transaction_payload = WebhookPayload::Transaction(TransactionResponse::Evm(Box::new(
147 EvmTransactionResponse {
148 id: "tx123".to_string(),
149 hash: Some("0x123".to_string()),
150 status: TransactionStatus::Confirmed,
151 status_reason: None,
152 created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
153 sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
154 confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
155 gas_price: Some(1000000000),
156 gas_limit: Some(21000),
157 nonce: Some(1),
158 value: U256::from(1000000000000000000_u64),
159 from: "0xabc".to_string(),
160 to: Some("0xdef".to_string()),
161 relayer_id: "relayer-1".to_string(),
162 data: None,
163 max_fee_per_gas: None,
164 max_priority_fee_per_gas: None,
165 signature: None,
166 speed: None,
167 },
168 )));
169
170 let string_notification =
171 WebhookNotification::new("transaction_payload".to_string(), transaction_payload);
172 let job = NotificationSend::new("notification-string".to_string(), string_notification);
173 assert_eq!(job.notification.event, "transaction_payload");
174
175 let relayer_disabled = WebhookPayload::RelayerDisabled(Box::new(RelayerDisabledPayload {
176 relayer: RelayerResponse {
177 id: "relayer-1".to_string(),
178 name: "relayer-1".to_string(),
179 network: "ethereum".to_string(),
180 network_type: NetworkType::Evm,
181 paused: false,
182 policies: Some(RelayerNetworkPolicyResponse::Evm(
183 RelayerEvmPolicy {
184 gas_price_cap: None,
185 whitelist_receivers: None,
186 eip1559_pricing: None,
187 private_transactions: Some(false),
188 min_balance: Some(0),
189 gas_limit_estimation: None,
190 }
191 .into(),
192 )),
193 signer_id: "signer-1".to_string(),
194 notification_id: None,
195 custom_rpc_urls: None,
196 address: Some("0xabc".to_string()),
197 system_disabled: Some(false),
198 ..Default::default()
199 },
200 disable_reason: "test".to_string(),
201 }));
202 let object_notification =
203 WebhookNotification::new("object_event".to_string(), relayer_disabled);
204 let job = NotificationSend::new("notification-object".to_string(), object_notification);
205 assert_eq!(job.notification.event, "object_event");
206 }
207}