fc_rpc/
eth_pubsub.rs

1// This file is part of Frontier.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use std::{marker::PhantomData, sync::Arc};
20
21use ethereum::TransactionV3 as EthereumTransaction;
22use futures::{future, FutureExt as _, StreamExt as _};
23use jsonrpsee::{core::traits::IdProvider, server::PendingSubscriptionSink};
24// Substrate
25use sc_client_api::{
26	backend::{Backend, StorageProvider},
27	client::BlockchainEvents,
28};
29use sc_network_sync::SyncingService;
30use sc_rpc::{
31	utils::{BoundedVecDeque, PendingSubscription, Subscription},
32	SubscriptionTaskExecutor,
33};
34use sc_service::config::RpcSubscriptionIdProvider;
35use sc_transaction_pool_api::{InPoolTransaction, TransactionPool, TxHash};
36use sp_api::{ApiExt, ProvideRuntimeApi};
37use sp_blockchain::HeaderBackend;
38use sp_consensus::SyncOracle;
39use sp_runtime::traits::{Block as BlockT, UniqueSaturatedInto};
40// Frontier
41use fc_mapping_sync::{EthereumBlockNotification, EthereumBlockNotificationSinks};
42use fc_rpc_core::{
43	types::{
44		pubsub::{Kind, Params, PubSubResult, PubSubSyncing, SyncingStatus},
45		FilteredParams,
46	},
47	EthPubSubApiServer,
48};
49use fc_storage::StorageOverride;
50use fp_rpc::EthereumRuntimeRPCApi;
51
52#[derive(Clone, Debug)]
53pub struct EthereumSubIdProvider;
54impl IdProvider for EthereumSubIdProvider {
55	fn next_id(&self) -> jsonrpsee::types::SubscriptionId<'static> {
56		format!("0x{}", hex::encode(rand::random::<u128>().to_le_bytes())).into()
57	}
58}
59impl RpcSubscriptionIdProvider for EthereumSubIdProvider {}
60
61/// Eth pub-sub API implementation.
62pub struct EthPubSub<B: BlockT, P, C, BE> {
63	pool: Arc<P>,
64	client: Arc<C>,
65	sync: Arc<SyncingService<B>>,
66	executor: SubscriptionTaskExecutor,
67	storage_override: Arc<dyn StorageOverride<B>>,
68	starting_block: u64,
69	pubsub_notification_sinks: Arc<EthereumBlockNotificationSinks<EthereumBlockNotification<B>>>,
70	_marker: PhantomData<BE>,
71}
72
73impl<B: BlockT, P, C, BE> Clone for EthPubSub<B, P, C, BE> {
74	fn clone(&self) -> Self {
75		Self {
76			pool: self.pool.clone(),
77			client: self.client.clone(),
78			sync: self.sync.clone(),
79			executor: self.executor.clone(),
80			storage_override: self.storage_override.clone(),
81			starting_block: self.starting_block,
82			pubsub_notification_sinks: self.pubsub_notification_sinks.clone(),
83			_marker: PhantomData::<BE>,
84		}
85	}
86}
87
88impl<B: BlockT, P, C, BE> EthPubSub<B, P, C, BE>
89where
90	P: TransactionPool<Block = B, Hash = B::Hash> + 'static,
91	C: ProvideRuntimeApi<B>,
92	C::Api: EthereumRuntimeRPCApi<B>,
93	C: HeaderBackend<B> + StorageProvider<B, BE>,
94	BE: Backend<B> + 'static,
95{
96	pub fn new(
97		pool: Arc<P>,
98		client: Arc<C>,
99		sync: Arc<SyncingService<B>>,
100		executor: SubscriptionTaskExecutor,
101		storage_override: Arc<dyn StorageOverride<B>>,
102		pubsub_notification_sinks: Arc<
103			EthereumBlockNotificationSinks<EthereumBlockNotification<B>>,
104		>,
105	) -> Self {
106		// Capture the best block as seen on initialization. Used for syncing subscriptions.
107		let best_number = client.info().best_number;
108		let starting_block = UniqueSaturatedInto::<u64>::unique_saturated_into(best_number);
109		Self {
110			pool,
111			client,
112			sync,
113			executor,
114			storage_override,
115			starting_block,
116			pubsub_notification_sinks,
117			_marker: PhantomData,
118		}
119	}
120
121	fn notify_header(
122		&self,
123		notification: EthereumBlockNotification<B>,
124	) -> future::Ready<Option<PubSubResult>> {
125		let res = if notification.is_new_best {
126			self.storage_override.current_block(notification.hash)
127		} else {
128			None
129		};
130		future::ready(res.map(PubSubResult::header))
131	}
132
133	fn notify_logs(
134		&self,
135		notification: EthereumBlockNotification<B>,
136		params: &FilteredParams,
137	) -> future::Ready<Option<impl Iterator<Item = PubSubResult>>> {
138		let res = if notification.is_new_best {
139			let substrate_hash = notification.hash;
140
141			let block = self.storage_override.current_block(substrate_hash);
142			let receipts = self.storage_override.current_receipts(substrate_hash);
143
144			match (block, receipts) {
145				(Some(block), Some(receipts)) => Some((block, receipts)),
146				_ => None,
147			}
148		} else {
149			None
150		};
151		future::ready(res.map(|(block, receipts)| PubSubResult::logs(block, receipts, params)))
152	}
153
154	fn pending_transactions(&self, hash: &TxHash<P>) -> future::Ready<Option<PubSubResult>> {
155		let res = if let Some(xt) = self.pool.ready_transaction(hash) {
156			let best_block = self.client.info().best_hash;
157
158			let api = self.client.runtime_api();
159
160			let api_version = if let Ok(Some(api_version)) =
161				api.api_version::<dyn EthereumRuntimeRPCApi<B>>(best_block)
162			{
163				api_version
164			} else {
165				return future::ready(None);
166			};
167
168			let xts = vec![xt.data().as_ref().clone()];
169
170			let txs: Option<Vec<EthereumTransaction>> = if api_version > 1 {
171				api.extrinsic_filter(best_block, xts).ok()
172			} else {
173				#[allow(deprecated)]
174				if let Ok(legacy) = api.extrinsic_filter_before_version_2(best_block, xts) {
175					Some(legacy.into_iter().map(|tx| tx.into()).collect())
176				} else {
177					None
178				}
179			};
180
181			match txs {
182				Some(txs) => {
183					if txs.len() == 1 {
184						Some(txs[0].clone())
185					} else {
186						None
187					}
188				}
189				_ => None,
190			}
191		} else {
192			None
193		};
194		future::ready(res.map(|tx| PubSubResult::transaction_hash(&tx)))
195	}
196
197	async fn syncing_status(&self) -> PubSubSyncing {
198		if self.sync.is_major_syncing() {
199			// Best imported block.
200			let current_number = self.client.info().best_number;
201			// Get the target block to sync.
202			let highest_number = self
203				.sync
204				.status()
205				.await
206				.ok()
207				.and_then(|status| status.best_seen_block);
208
209			PubSubSyncing::Syncing(SyncingStatus {
210				starting_block: self.starting_block,
211				current_block: UniqueSaturatedInto::<u64>::unique_saturated_into(current_number),
212				highest_block: highest_number
213					.map(UniqueSaturatedInto::<u64>::unique_saturated_into),
214			})
215		} else {
216			PubSubSyncing::Synced(false)
217		}
218	}
219}
220
221impl<B: BlockT, P, C, BE> EthPubSubApiServer for EthPubSub<B, P, C, BE>
222where
223	B: BlockT,
224	P: TransactionPool<Block = B, Hash = B::Hash> + 'static,
225	C: ProvideRuntimeApi<B>,
226	C::Api: EthereumRuntimeRPCApi<B>,
227	C: BlockchainEvents<B> + 'static,
228	C: HeaderBackend<B> + StorageProvider<B, BE>,
229	BE: Backend<B> + 'static,
230{
231	fn subscribe(&self, pending: PendingSubscriptionSink, kind: Kind, params: Option<Params>) {
232		let filtered_params = match params {
233			Some(Params::Logs(filter)) => FilteredParams::new(Some(filter)),
234			_ => FilteredParams::default(),
235		};
236
237		let pubsub = self.clone();
238		// Everytime a new subscription is created, a new mpsc channel is added to the sink pool.
239		let (inner_sink, block_notification_stream) =
240			sc_utils::mpsc::tracing_unbounded("pubsub_notification_stream", 100_000);
241		self.pubsub_notification_sinks.lock().push(inner_sink);
242
243		let fut = async move {
244			match kind {
245				Kind::NewHeads => {
246					let stream = block_notification_stream
247						.filter_map(move |notification| pubsub.notify_header(notification));
248					PendingSubscription::from(pending)
249						.pipe_from_stream(stream, BoundedVecDeque::new(16))
250						.await
251				}
252				Kind::Logs => {
253					let stream = block_notification_stream
254						.filter_map(move |notification| {
255							pubsub.notify_logs(notification, &filtered_params)
256						})
257						.flat_map(futures::stream::iter);
258					PendingSubscription::from(pending)
259						.pipe_from_stream(stream, BoundedVecDeque::new(16))
260						.await
261				}
262				Kind::NewPendingTransactions => {
263					let pool = pubsub.pool.clone();
264					let stream = pool
265						.import_notification_stream()
266						.filter_map(move |hash| pubsub.pending_transactions(&hash));
267					PendingSubscription::from(pending)
268						.pipe_from_stream(stream, BoundedVecDeque::new(16))
269						.await;
270				}
271				Kind::Syncing => {
272					let Ok(sink) = pending.accept().await else {
273						return;
274					};
275					// On connection subscriber expects a value.
276					// Because import notifications are only emitted when the node is synced or
277					// in case of reorg, the first event is emitted right away.
278					let syncing_status = pubsub.syncing_status().await;
279					let subscription = Subscription::from(sink);
280					let _ = subscription
281						.send(&PubSubResult::SyncingStatus(syncing_status))
282						.await;
283
284					// When the node is not under a major syncing (i.e. from genesis), react
285					// normally to import notifications.
286					//
287					// Only send new notifications down the pipe when the syncing status changed.
288					let mut stream = pubsub.client.import_notification_stream();
289					let mut last_syncing_status = pubsub.sync.is_major_syncing();
290					while (stream.next().await).is_some() {
291						let syncing_status = pubsub.sync.is_major_syncing();
292						if syncing_status != last_syncing_status {
293							let syncing_status = pubsub.syncing_status().await;
294							let _ = subscription
295								.send(&PubSubResult::SyncingStatus(syncing_status))
296								.await;
297						}
298						last_syncing_status = syncing_status;
299					}
300				}
301			}
302		}
303		.boxed();
304
305		self.executor
306			.spawn("frontier-rpc-subscription", Some("rpc"), fut);
307	}
308}