fc_rpc/cache/
mod.rs

1// This file is part of Frontier.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19mod lru_cache;
20
21use std::{
22	collections::{BTreeMap, HashMap},
23	marker::PhantomData,
24	sync::{Arc, Mutex},
25};
26
27use ethereum::BlockV3 as EthereumBlock;
28use ethereum_types::U256;
29use futures::StreamExt;
30use tokio::sync::{mpsc, oneshot};
31// Substrate
32use sc_client_api::{
33	backend::{Backend, StorageProvider},
34	client::BlockchainEvents,
35};
36use sc_service::SpawnTaskHandle;
37use sp_api::ProvideRuntimeApi;
38use sp_blockchain::HeaderBackend;
39use sp_runtime::traits::{Block as BlockT, Header as HeaderT, UniqueSaturatedInto};
40// Frontier
41use fc_rpc_core::types::*;
42use fc_storage::StorageOverride;
43use fp_rpc::{EthereumRuntimeRPCApi, TransactionStatus};
44
45use self::lru_cache::LRUCacheByteLimited;
46
47type WaitList<Hash, T> = HashMap<Hash, Vec<oneshot::Sender<Option<T>>>>;
48
49enum EthBlockDataCacheMessage<B: BlockT> {
50	RequestCurrentBlock {
51		block_hash: B::Hash,
52		response_tx: oneshot::Sender<Option<EthereumBlock>>,
53	},
54	FetchedCurrentBlock {
55		block_hash: B::Hash,
56		block: Option<EthereumBlock>,
57	},
58
59	RequestCurrentTransactionStatuses {
60		block_hash: B::Hash,
61		response_tx: oneshot::Sender<Option<Vec<TransactionStatus>>>,
62	},
63	FetchedCurrentTransactionStatuses {
64		block_hash: B::Hash,
65		statuses: Option<Vec<TransactionStatus>>,
66	},
67}
68
69/// Manage LRU caches for block data and their transaction statuses.
70/// These are large and take a lot of time to fetch from the database.
71/// Storing them in an LRU cache will allow to reduce database accesses
72/// when many subsequent requests are related to the same blocks.
73pub struct EthBlockDataCacheTask<B: BlockT>(mpsc::Sender<EthBlockDataCacheMessage<B>>);
74
75impl<B: BlockT> EthBlockDataCacheTask<B> {
76	pub fn new(
77		spawn_handle: SpawnTaskHandle,
78		storage_override: Arc<dyn StorageOverride<B>>,
79		blocks_cache_max_size: usize,
80		statuses_cache_max_size: usize,
81		prometheus_registry: Option<prometheus_endpoint::Registry>,
82	) -> Self {
83		let (task_tx, mut task_rx) = mpsc::channel(100);
84		let outer_task_tx = task_tx.clone();
85		let outer_spawn_handle = spawn_handle.clone();
86
87		outer_spawn_handle.spawn("EthBlockDataCacheTask", None, async move {
88			let mut blocks_cache = LRUCacheByteLimited::<B::Hash, EthereumBlock>::new(
89				"blocks_cache",
90				blocks_cache_max_size as u64,
91				prometheus_registry.clone(),
92			);
93			let mut statuses_cache = LRUCacheByteLimited::<B::Hash, Vec<TransactionStatus>>::new(
94				"statuses_cache",
95				statuses_cache_max_size as u64,
96				prometheus_registry,
97			);
98
99			let mut awaiting_blocks =
100				HashMap::<B::Hash, Vec<oneshot::Sender<Option<EthereumBlock>>>>::new();
101			let mut awaiting_statuses =
102				HashMap::<B::Hash, Vec<oneshot::Sender<Option<Vec<TransactionStatus>>>>>::new();
103
104			// Handle all incoming messages.
105			// Exits when there are no more senders.
106			// Any long computation should be spawned in a separate task
107			// to keep this task handle messages as soon as possible.
108			while let Some(message) = task_rx.recv().await {
109				use EthBlockDataCacheMessage::*;
110				match message {
111					RequestCurrentBlock {
112						block_hash,
113						response_tx,
114					} => Self::request_current(
115						&spawn_handle,
116						&mut blocks_cache,
117						&mut awaiting_blocks,
118						storage_override.clone(),
119						block_hash,
120						response_tx,
121						task_tx.clone(),
122						move |handler| FetchedCurrentBlock {
123							block_hash,
124							block: handler.current_block(block_hash),
125						},
126					),
127					FetchedCurrentBlock { block_hash, block } => {
128						if let Some(wait_list) = awaiting_blocks.remove(&block_hash) {
129							for sender in wait_list {
130								let _ = sender.send(block.clone());
131							}
132						}
133
134						if let Some(block) = block {
135							blocks_cache.put(block_hash, block);
136						}
137					}
138
139					RequestCurrentTransactionStatuses {
140						block_hash,
141						response_tx,
142					} => Self::request_current(
143						&spawn_handle,
144						&mut statuses_cache,
145						&mut awaiting_statuses,
146						storage_override.clone(),
147						block_hash,
148						response_tx,
149						task_tx.clone(),
150						move |handler| FetchedCurrentTransactionStatuses {
151							block_hash,
152							statuses: handler.current_transaction_statuses(block_hash),
153						},
154					),
155					FetchedCurrentTransactionStatuses {
156						block_hash,
157						statuses,
158					} => {
159						if let Some(wait_list) = awaiting_statuses.remove(&block_hash) {
160							for sender in wait_list {
161								let _ = sender.send(statuses.clone());
162							}
163						}
164
165						if let Some(statuses) = statuses {
166							statuses_cache.put(block_hash, statuses);
167						}
168					}
169				}
170			}
171		});
172
173		Self(outer_task_tx)
174	}
175
176	fn request_current<T, F>(
177		spawn_handle: &SpawnTaskHandle,
178		cache: &mut LRUCacheByteLimited<B::Hash, T>,
179		wait_list: &mut WaitList<B::Hash, T>,
180		storage_override: Arc<dyn StorageOverride<B>>,
181		block_hash: B::Hash,
182		response_tx: oneshot::Sender<Option<T>>,
183		task_tx: mpsc::Sender<EthBlockDataCacheMessage<B>>,
184		handler_call: F,
185	) where
186		T: Clone + scale_codec::Encode,
187		F: FnOnce(&dyn StorageOverride<B>) -> EthBlockDataCacheMessage<B>,
188		F: Send + 'static,
189	{
190		// Data is cached, we respond immediately.
191		if let Some(data) = cache.get(&block_hash).cloned() {
192			let _ = response_tx.send(Some(data));
193			return;
194		}
195
196		// Another request already triggered caching but the
197		// response is not known yet, we add the sender to the waiting
198		// list.
199		if let Some(waiting) = wait_list.get_mut(&block_hash) {
200			waiting.push(response_tx);
201			return;
202		}
203
204		// Data is neither cached nor already requested, so we start fetching
205		// the data.
206		wait_list.insert(block_hash, vec![response_tx]);
207
208		spawn_handle.spawn("EthBlockDataCacheTask Worker", None, async move {
209			let message = handler_call(&*storage_override);
210			let _ = task_tx.send(message).await;
211		});
212	}
213
214	/// Cache for `handler.current_block`.
215	pub async fn current_block(&self, block_hash: B::Hash) -> Option<EthereumBlock> {
216		let (response_tx, response_rx) = oneshot::channel();
217
218		self.0
219			.send(EthBlockDataCacheMessage::RequestCurrentBlock {
220				block_hash,
221				response_tx,
222			})
223			.await
224			.ok()?;
225
226		response_rx.await.ok()?
227	}
228
229	/// Cache for `handler.current_transaction_statuses`.
230	pub async fn current_transaction_statuses(
231		&self,
232		block_hash: B::Hash,
233	) -> Option<Vec<TransactionStatus>> {
234		let (response_tx, response_rx) = oneshot::channel();
235
236		self.0
237			.send(
238				EthBlockDataCacheMessage::RequestCurrentTransactionStatuses {
239					block_hash,
240					response_tx,
241				},
242			)
243			.await
244			.ok()?;
245
246		response_rx.await.ok()?
247	}
248}
249
250pub struct EthTask<B, C, BE>(PhantomData<(B, C, BE)>);
251
252impl<B, C, BE> EthTask<B, C, BE>
253where
254	B: BlockT,
255	C: ProvideRuntimeApi<B>,
256	C::Api: EthereumRuntimeRPCApi<B>,
257	C: BlockchainEvents<B> + 'static,
258	C: HeaderBackend<B> + StorageProvider<B, BE>,
259	BE: Backend<B> + 'static,
260{
261	pub async fn filter_pool_task(
262		client: Arc<C>,
263		filter_pool: Arc<Mutex<BTreeMap<U256, FilterPoolItem>>>,
264		retain_threshold: u64,
265	) {
266		let mut notification_st = client.import_notification_stream();
267
268		while let Some(notification) = notification_st.next().await {
269			if let Ok(filter_pool) = &mut filter_pool.lock() {
270				let imported_number: u64 = UniqueSaturatedInto::<u64>::unique_saturated_into(
271					*notification.header.number(),
272				);
273
274				filter_pool.retain(|_, v| v.at_block + retain_threshold > imported_number);
275			}
276		}
277	}
278
279	pub async fn fee_history_task(
280		client: Arc<C>,
281		storage_override: Arc<dyn StorageOverride<B>>,
282		fee_history_cache: FeeHistoryCache,
283		block_limit: u64,
284	) {
285		struct TransactionHelper {
286			gas_used: u64,
287			effective_reward: u64,
288		}
289		// Calculates the cache for a single block
290		#[rustfmt::skip]
291			let fee_history_cache_item = |hash: B::Hash| -> (
292			FeeHistoryCacheItem,
293			Option<u64>
294		) {
295			// Evenly spaced percentile list from 0.0 to 100.0 with a 0.5 resolution.
296			// This means we cache 200 percentile points.
297			// Later in request handling we will approximate by rounding percentiles that
298			// fall in between with `(round(n*2)/2)`.
299			let reward_percentiles: Vec<f64> = {
300				let mut percentile: f64 = 0.0;
301				(0..201)
302					.map(|_| {
303						let val = percentile;
304						percentile += 0.5;
305						val
306					})
307					.collect()
308			};
309
310			let block = storage_override.current_block(hash);
311			let mut block_number: Option<u64> = None;
312			let base_fee = client.runtime_api().gas_price(hash).unwrap_or_default();
313			let receipts = storage_override.current_receipts(hash);
314			let mut result = FeeHistoryCacheItem {
315				base_fee: UniqueSaturatedInto::<u64>::unique_saturated_into(base_fee),
316				gas_used_ratio: 0f64,
317				rewards: Vec::new(),
318			};
319			if let (Some(block), Some(receipts)) = (block, receipts) {
320				block_number = Some(UniqueSaturatedInto::<u64>::unique_saturated_into(block.header.number));
321				let gas_used = UniqueSaturatedInto::<u64>::unique_saturated_into(block.header.gas_used) as f64;
322				let gas_limit = UniqueSaturatedInto::<u64>::unique_saturated_into(block.header.gas_limit) as f64;
323				result.gas_used_ratio = gas_used / gas_limit;
324
325				let mut previous_cumulative_gas = U256::zero();
326				let used_gas = |current: U256, previous: &mut U256| -> u64 {
327					let r = UniqueSaturatedInto::<u64>::unique_saturated_into(current.saturating_sub(*previous));
328					*previous = current;
329					r
330				};
331				// Build a list of relevant transaction information.
332				let mut transactions: Vec<TransactionHelper> = receipts
333					.iter()
334					.enumerate()
335					.map(|(i, receipt)| TransactionHelper {
336						gas_used: match receipt {
337							ethereum::ReceiptV4::Legacy(d) | ethereum::ReceiptV4::EIP2930(d) | ethereum::ReceiptV4::EIP1559(d) | ethereum::ReceiptV4::EIP7702(d) => used_gas(d.used_gas, &mut previous_cumulative_gas),
338						},
339						effective_reward: match block.transactions.get(i) {
340							Some(ethereum::TransactionV3::Legacy(t)) => {
341								UniqueSaturatedInto::<u64>::unique_saturated_into(t.gas_price.saturating_sub(base_fee))
342							}
343							Some(ethereum::TransactionV3::EIP2930(t)) => {
344								UniqueSaturatedInto::<u64>::unique_saturated_into(t.gas_price.saturating_sub(base_fee))
345							}
346							Some(ethereum::TransactionV3::EIP1559(t)) => UniqueSaturatedInto::<u64>::unique_saturated_into(
347									t
348										.max_priority_fee_per_gas
349										.min(t.max_fee_per_gas.saturating_sub(base_fee))
350							),
351							Some(ethereum::TransactionV3::EIP7702(t)) => UniqueSaturatedInto::<u64>::unique_saturated_into(
352									t
353										.max_priority_fee_per_gas
354										.min(t.max_fee_per_gas.saturating_sub(base_fee))
355							),
356							None => 0,
357						},
358					})
359					.collect();
360				// Sort ASC by effective reward.
361				transactions.sort_by(|a, b| a.effective_reward.cmp(&b.effective_reward));
362
363				// Calculate percentile rewards.
364				result.rewards = reward_percentiles
365					.into_iter()
366					.filter_map(|p| {
367						let target_gas = (p * gas_used / 100f64) as u64;
368						let mut sum_gas = 0;
369						for tx in &transactions {
370							sum_gas += tx.gas_used;
371							if target_gas <= sum_gas {
372								return Some(tx.effective_reward);
373							}
374						}
375						None
376					})
377					.collect();
378			} else {
379				result.rewards = reward_percentiles.iter().map(|_| 0).collect();
380			}
381			(result, block_number)
382		};
383
384		// Commits the result to cache
385		let commit_if_any = |item: FeeHistoryCacheItem, key: Option<u64>| {
386			if let (Some(block_number), Ok(fee_history_cache)) =
387				(key, &mut fee_history_cache.lock())
388			{
389				fee_history_cache.insert(block_number, item);
390				// We want to remain within the configured cache bounds.
391				// The first key out of bounds.
392				let first_out = block_number.saturating_sub(block_limit);
393				// Out of bounds size.
394				let to_remove = (fee_history_cache.len() as u64).saturating_sub(block_limit);
395				// Remove all cache data before `block_limit`.
396				for i in 0..to_remove {
397					// Cannot overflow.
398					let key = first_out - i;
399					fee_history_cache.remove(&key);
400				}
401			}
402		};
403
404		let mut notification_st = client.import_notification_stream();
405
406		while let Some(notification) = notification_st.next().await {
407			if notification.is_new_best {
408				// In case a re-org happened on import.
409				if let Some(tree_route) = notification.tree_route {
410					if let Ok(fee_history_cache) = &mut fee_history_cache.lock() {
411						// Remove retracted.
412						let _lock = tree_route.retracted().iter().map(|hash_and_number| {
413							let n = UniqueSaturatedInto::<u64>::unique_saturated_into(
414								hash_and_number.number,
415							);
416							fee_history_cache.remove(&n);
417						});
418						// Insert enacted.
419						let _ = tree_route.enacted().iter().map(|hash_and_number| {
420							let (result, block_number) =
421								fee_history_cache_item(hash_and_number.hash);
422							commit_if_any(result, block_number);
423						});
424					}
425				}
426				// Cache the imported block.
427				let (result, block_number) = fee_history_cache_item(notification.hash);
428				commit_if_any(result, block_number);
429			}
430		}
431	}
432}