1mod lru_cache;
20
21use std::{
22 collections::{BTreeMap, HashMap},
23 marker::PhantomData,
24 sync::{Arc, Mutex},
25};
26
27use ethereum::BlockV3 as EthereumBlock;
28use ethereum_types::U256;
29use futures::StreamExt;
30use tokio::sync::{mpsc, oneshot};
31use sc_client_api::{
33 backend::{Backend, StorageProvider},
34 client::BlockchainEvents,
35};
36use sc_service::SpawnTaskHandle;
37use sp_api::ProvideRuntimeApi;
38use sp_blockchain::HeaderBackend;
39use sp_runtime::traits::{Block as BlockT, Header as HeaderT, UniqueSaturatedInto};
40use fc_rpc_core::types::*;
42use fc_storage::StorageOverride;
43use fp_rpc::{EthereumRuntimeRPCApi, TransactionStatus};
44
45use self::lru_cache::LRUCacheByteLimited;
46
47type WaitList<Hash, T> = HashMap<Hash, Vec<oneshot::Sender<Option<T>>>>;
48
49enum EthBlockDataCacheMessage<B: BlockT> {
50 RequestCurrentBlock {
51 block_hash: B::Hash,
52 response_tx: oneshot::Sender<Option<EthereumBlock>>,
53 },
54 FetchedCurrentBlock {
55 block_hash: B::Hash,
56 block: Option<EthereumBlock>,
57 },
58
59 RequestCurrentTransactionStatuses {
60 block_hash: B::Hash,
61 response_tx: oneshot::Sender<Option<Vec<TransactionStatus>>>,
62 },
63 FetchedCurrentTransactionStatuses {
64 block_hash: B::Hash,
65 statuses: Option<Vec<TransactionStatus>>,
66 },
67}
68
69pub struct EthBlockDataCacheTask<B: BlockT>(mpsc::Sender<EthBlockDataCacheMessage<B>>);
74
75impl<B: BlockT> EthBlockDataCacheTask<B> {
76 pub fn new(
77 spawn_handle: SpawnTaskHandle,
78 storage_override: Arc<dyn StorageOverride<B>>,
79 blocks_cache_max_size: usize,
80 statuses_cache_max_size: usize,
81 prometheus_registry: Option<prometheus_endpoint::Registry>,
82 ) -> Self {
83 let (task_tx, mut task_rx) = mpsc::channel(100);
84 let outer_task_tx = task_tx.clone();
85 let outer_spawn_handle = spawn_handle.clone();
86
87 outer_spawn_handle.spawn("EthBlockDataCacheTask", None, async move {
88 let mut blocks_cache = LRUCacheByteLimited::<B::Hash, EthereumBlock>::new(
89 "blocks_cache",
90 blocks_cache_max_size as u64,
91 prometheus_registry.clone(),
92 );
93 let mut statuses_cache = LRUCacheByteLimited::<B::Hash, Vec<TransactionStatus>>::new(
94 "statuses_cache",
95 statuses_cache_max_size as u64,
96 prometheus_registry,
97 );
98
99 let mut awaiting_blocks =
100 HashMap::<B::Hash, Vec<oneshot::Sender<Option<EthereumBlock>>>>::new();
101 let mut awaiting_statuses =
102 HashMap::<B::Hash, Vec<oneshot::Sender<Option<Vec<TransactionStatus>>>>>::new();
103
104 while let Some(message) = task_rx.recv().await {
109 use EthBlockDataCacheMessage::*;
110 match message {
111 RequestCurrentBlock {
112 block_hash,
113 response_tx,
114 } => Self::request_current(
115 &spawn_handle,
116 &mut blocks_cache,
117 &mut awaiting_blocks,
118 storage_override.clone(),
119 block_hash,
120 response_tx,
121 task_tx.clone(),
122 move |handler| FetchedCurrentBlock {
123 block_hash,
124 block: handler.current_block(block_hash),
125 },
126 ),
127 FetchedCurrentBlock { block_hash, block } => {
128 if let Some(wait_list) = awaiting_blocks.remove(&block_hash) {
129 for sender in wait_list {
130 let _ = sender.send(block.clone());
131 }
132 }
133
134 if let Some(block) = block {
135 blocks_cache.put(block_hash, block);
136 }
137 }
138
139 RequestCurrentTransactionStatuses {
140 block_hash,
141 response_tx,
142 } => Self::request_current(
143 &spawn_handle,
144 &mut statuses_cache,
145 &mut awaiting_statuses,
146 storage_override.clone(),
147 block_hash,
148 response_tx,
149 task_tx.clone(),
150 move |handler| FetchedCurrentTransactionStatuses {
151 block_hash,
152 statuses: handler.current_transaction_statuses(block_hash),
153 },
154 ),
155 FetchedCurrentTransactionStatuses {
156 block_hash,
157 statuses,
158 } => {
159 if let Some(wait_list) = awaiting_statuses.remove(&block_hash) {
160 for sender in wait_list {
161 let _ = sender.send(statuses.clone());
162 }
163 }
164
165 if let Some(statuses) = statuses {
166 statuses_cache.put(block_hash, statuses);
167 }
168 }
169 }
170 }
171 });
172
173 Self(outer_task_tx)
174 }
175
176 fn request_current<T, F>(
177 spawn_handle: &SpawnTaskHandle,
178 cache: &mut LRUCacheByteLimited<B::Hash, T>,
179 wait_list: &mut WaitList<B::Hash, T>,
180 storage_override: Arc<dyn StorageOverride<B>>,
181 block_hash: B::Hash,
182 response_tx: oneshot::Sender<Option<T>>,
183 task_tx: mpsc::Sender<EthBlockDataCacheMessage<B>>,
184 handler_call: F,
185 ) where
186 T: Clone + scale_codec::Encode,
187 F: FnOnce(&dyn StorageOverride<B>) -> EthBlockDataCacheMessage<B>,
188 F: Send + 'static,
189 {
190 if let Some(data) = cache.get(&block_hash).cloned() {
192 let _ = response_tx.send(Some(data));
193 return;
194 }
195
196 if let Some(waiting) = wait_list.get_mut(&block_hash) {
200 waiting.push(response_tx);
201 return;
202 }
203
204 wait_list.insert(block_hash, vec![response_tx]);
207
208 spawn_handle.spawn("EthBlockDataCacheTask Worker", None, async move {
209 let message = handler_call(&*storage_override);
210 let _ = task_tx.send(message).await;
211 });
212 }
213
214 pub async fn current_block(&self, block_hash: B::Hash) -> Option<EthereumBlock> {
216 let (response_tx, response_rx) = oneshot::channel();
217
218 self.0
219 .send(EthBlockDataCacheMessage::RequestCurrentBlock {
220 block_hash,
221 response_tx,
222 })
223 .await
224 .ok()?;
225
226 response_rx.await.ok()?
227 }
228
229 pub async fn current_transaction_statuses(
231 &self,
232 block_hash: B::Hash,
233 ) -> Option<Vec<TransactionStatus>> {
234 let (response_tx, response_rx) = oneshot::channel();
235
236 self.0
237 .send(
238 EthBlockDataCacheMessage::RequestCurrentTransactionStatuses {
239 block_hash,
240 response_tx,
241 },
242 )
243 .await
244 .ok()?;
245
246 response_rx.await.ok()?
247 }
248}
249
250pub struct EthTask<B, C, BE>(PhantomData<(B, C, BE)>);
251
252impl<B, C, BE> EthTask<B, C, BE>
253where
254 B: BlockT,
255 C: ProvideRuntimeApi<B>,
256 C::Api: EthereumRuntimeRPCApi<B>,
257 C: BlockchainEvents<B> + 'static,
258 C: HeaderBackend<B> + StorageProvider<B, BE>,
259 BE: Backend<B> + 'static,
260{
261 pub async fn filter_pool_task(
262 client: Arc<C>,
263 filter_pool: Arc<Mutex<BTreeMap<U256, FilterPoolItem>>>,
264 retain_threshold: u64,
265 ) {
266 let mut notification_st = client.import_notification_stream();
267
268 while let Some(notification) = notification_st.next().await {
269 if let Ok(filter_pool) = &mut filter_pool.lock() {
270 let imported_number: u64 = UniqueSaturatedInto::<u64>::unique_saturated_into(
271 *notification.header.number(),
272 );
273
274 filter_pool.retain(|_, v| v.at_block + retain_threshold > imported_number);
275 }
276 }
277 }
278
279 pub async fn fee_history_task(
280 client: Arc<C>,
281 storage_override: Arc<dyn StorageOverride<B>>,
282 fee_history_cache: FeeHistoryCache,
283 block_limit: u64,
284 ) {
285 struct TransactionHelper {
286 gas_used: u64,
287 effective_reward: u64,
288 }
289 #[rustfmt::skip]
291 let fee_history_cache_item = |hash: B::Hash| -> (
292 FeeHistoryCacheItem,
293 Option<u64>
294 ) {
295 let reward_percentiles: Vec<f64> = {
300 let mut percentile: f64 = 0.0;
301 (0..201)
302 .map(|_| {
303 let val = percentile;
304 percentile += 0.5;
305 val
306 })
307 .collect()
308 };
309
310 let block = storage_override.current_block(hash);
311 let mut block_number: Option<u64> = None;
312 let base_fee = client.runtime_api().gas_price(hash).unwrap_or_default();
313 let receipts = storage_override.current_receipts(hash);
314 let mut result = FeeHistoryCacheItem {
315 base_fee: UniqueSaturatedInto::<u64>::unique_saturated_into(base_fee),
316 gas_used_ratio: 0f64,
317 rewards: Vec::new(),
318 };
319 if let (Some(block), Some(receipts)) = (block, receipts) {
320 block_number = Some(UniqueSaturatedInto::<u64>::unique_saturated_into(block.header.number));
321 let gas_used = UniqueSaturatedInto::<u64>::unique_saturated_into(block.header.gas_used) as f64;
322 let gas_limit = UniqueSaturatedInto::<u64>::unique_saturated_into(block.header.gas_limit) as f64;
323 result.gas_used_ratio = gas_used / gas_limit;
324
325 let mut previous_cumulative_gas = U256::zero();
326 let used_gas = |current: U256, previous: &mut U256| -> u64 {
327 let r = UniqueSaturatedInto::<u64>::unique_saturated_into(current.saturating_sub(*previous));
328 *previous = current;
329 r
330 };
331 let mut transactions: Vec<TransactionHelper> = receipts
333 .iter()
334 .enumerate()
335 .map(|(i, receipt)| TransactionHelper {
336 gas_used: match receipt {
337 ethereum::ReceiptV4::Legacy(d) | ethereum::ReceiptV4::EIP2930(d) | ethereum::ReceiptV4::EIP1559(d) | ethereum::ReceiptV4::EIP7702(d) => used_gas(d.used_gas, &mut previous_cumulative_gas),
338 },
339 effective_reward: match block.transactions.get(i) {
340 Some(ethereum::TransactionV3::Legacy(t)) => {
341 UniqueSaturatedInto::<u64>::unique_saturated_into(t.gas_price.saturating_sub(base_fee))
342 }
343 Some(ethereum::TransactionV3::EIP2930(t)) => {
344 UniqueSaturatedInto::<u64>::unique_saturated_into(t.gas_price.saturating_sub(base_fee))
345 }
346 Some(ethereum::TransactionV3::EIP1559(t)) => UniqueSaturatedInto::<u64>::unique_saturated_into(
347 t
348 .max_priority_fee_per_gas
349 .min(t.max_fee_per_gas.saturating_sub(base_fee))
350 ),
351 Some(ethereum::TransactionV3::EIP7702(t)) => UniqueSaturatedInto::<u64>::unique_saturated_into(
352 t
353 .max_priority_fee_per_gas
354 .min(t.max_fee_per_gas.saturating_sub(base_fee))
355 ),
356 None => 0,
357 },
358 })
359 .collect();
360 transactions.sort_by(|a, b| a.effective_reward.cmp(&b.effective_reward));
362
363 result.rewards = reward_percentiles
365 .into_iter()
366 .filter_map(|p| {
367 let target_gas = (p * gas_used / 100f64) as u64;
368 let mut sum_gas = 0;
369 for tx in &transactions {
370 sum_gas += tx.gas_used;
371 if target_gas <= sum_gas {
372 return Some(tx.effective_reward);
373 }
374 }
375 None
376 })
377 .collect();
378 } else {
379 result.rewards = reward_percentiles.iter().map(|_| 0).collect();
380 }
381 (result, block_number)
382 };
383
384 let commit_if_any = |item: FeeHistoryCacheItem, key: Option<u64>| {
386 if let (Some(block_number), Ok(fee_history_cache)) =
387 (key, &mut fee_history_cache.lock())
388 {
389 fee_history_cache.insert(block_number, item);
390 let first_out = block_number.saturating_sub(block_limit);
393 let to_remove = (fee_history_cache.len() as u64).saturating_sub(block_limit);
395 for i in 0..to_remove {
397 let key = first_out - i;
399 fee_history_cache.remove(&key);
400 }
401 }
402 };
403
404 let mut notification_st = client.import_notification_stream();
405
406 while let Some(notification) = notification_st.next().await {
407 if notification.is_new_best {
408 if let Some(tree_route) = notification.tree_route {
410 if let Ok(fee_history_cache) = &mut fee_history_cache.lock() {
411 let _lock = tree_route.retracted().iter().map(|hash_and_number| {
413 let n = UniqueSaturatedInto::<u64>::unique_saturated_into(
414 hash_and_number.number,
415 );
416 fee_history_cache.remove(&n);
417 });
418 let _ = tree_route.enacted().iter().map(|hash_and_number| {
420 let (result, block_number) =
421 fee_history_cache_item(hash_and_number.hash);
422 commit_if_any(result, block_number);
423 });
424 }
425 }
426 let (result, block_number) = fee_history_cache_item(notification.hash);
428 commit_if_any(result, block_number);
429 }
430 }
431 }
432}