fc_rpc/
eth_pubsub.rs

1// This file is part of Frontier.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use std::{marker::PhantomData, sync::Arc};
20
21use ethereum::TransactionV3 as EthereumTransaction;
22use futures::{future, FutureExt as _, StreamExt as _};
23use jsonrpsee::{core::traits::IdProvider, server::PendingSubscriptionSink};
24// Substrate
25use sc_client_api::{
26	backend::{Backend, StorageProvider},
27	client::BlockchainEvents,
28};
29use sc_network_sync::SyncingService;
30use sc_rpc::{
31	utils::{BoundedVecDeque, PendingSubscription, Subscription},
32	SubscriptionTaskExecutor,
33};
34use sc_service::config::RpcSubscriptionIdProvider;
35use sc_transaction_pool_api::{InPoolTransaction, TransactionPool, TxHash};
36use sp_api::{ApiExt, ProvideRuntimeApi};
37use sp_blockchain::HeaderBackend;
38use sp_consensus::SyncOracle;
39use sp_runtime::traits::{Block as BlockT, UniqueSaturatedInto};
40// Frontier
41use fc_mapping_sync::{EthereumBlockNotification, EthereumBlockNotificationSinks};
42use fc_rpc_core::{
43	types::{
44		pubsub::{Kind, Params, PubSubResult, PubSubSyncing, SyncingStatus},
45		FilteredParams,
46	},
47	EthPubSubApiServer,
48};
49use fc_storage::StorageOverride;
50use fp_rpc::EthereumRuntimeRPCApi;
51
52#[derive(Clone, Debug)]
53pub struct EthereumSubIdProvider;
54impl IdProvider for EthereumSubIdProvider {
55	fn next_id(&self) -> jsonrpsee::types::SubscriptionId<'static> {
56		format!("0x{}", hex::encode(rand::random::<u128>().to_le_bytes())).into()
57	}
58}
59impl RpcSubscriptionIdProvider for EthereumSubIdProvider {}
60
61/// Eth pub-sub API implementation.
62pub struct EthPubSub<B: BlockT, P, C, BE> {
63	pool: Arc<P>,
64	client: Arc<C>,
65	sync: Arc<SyncingService<B>>,
66	executor: SubscriptionTaskExecutor,
67	storage_override: Arc<dyn StorageOverride<B>>,
68	starting_block: u64,
69	pubsub_notification_sinks: Arc<EthereumBlockNotificationSinks<EthereumBlockNotification<B>>>,
70	_marker: PhantomData<BE>,
71}
72
73impl<B: BlockT, P, C, BE> Clone for EthPubSub<B, P, C, BE> {
74	fn clone(&self) -> Self {
75		Self {
76			pool: self.pool.clone(),
77			client: self.client.clone(),
78			sync: self.sync.clone(),
79			executor: self.executor.clone(),
80			storage_override: self.storage_override.clone(),
81			starting_block: self.starting_block,
82			pubsub_notification_sinks: self.pubsub_notification_sinks.clone(),
83			_marker: PhantomData::<BE>,
84		}
85	}
86}
87
88impl<B: BlockT, P, C, BE> EthPubSub<B, P, C, BE>
89where
90	P: TransactionPool<Block = B, Hash = B::Hash> + 'static,
91	C: ProvideRuntimeApi<B>,
92	C::Api: EthereumRuntimeRPCApi<B>,
93	C: HeaderBackend<B> + StorageProvider<B, BE>,
94	BE: Backend<B> + 'static,
95{
96	pub fn new(
97		pool: Arc<P>,
98		client: Arc<C>,
99		sync: Arc<SyncingService<B>>,
100		executor: SubscriptionTaskExecutor,
101		storage_override: Arc<dyn StorageOverride<B>>,
102		pubsub_notification_sinks: Arc<
103			EthereumBlockNotificationSinks<EthereumBlockNotification<B>>,
104		>,
105	) -> Self {
106		// Capture the best block as seen on initialization. Used for syncing subscriptions.
107		let best_number = client.info().best_number;
108		let starting_block = UniqueSaturatedInto::<u64>::unique_saturated_into(best_number);
109		Self {
110			pool,
111			client,
112			sync,
113			executor,
114			storage_override,
115			starting_block,
116			pubsub_notification_sinks,
117			_marker: PhantomData,
118		}
119	}
120
121	fn notify_header(
122		&self,
123		notification: EthereumBlockNotification<B>,
124	) -> future::Ready<Option<PubSubResult>> {
125		let res = if notification.is_new_best {
126			self.storage_override.current_block(notification.hash)
127		} else {
128			None
129		};
130		future::ready(res.map(PubSubResult::header))
131	}
132
133	fn notify_logs(
134		&self,
135		notification: EthereumBlockNotification<B>,
136		params: &FilteredParams,
137	) -> future::Ready<Option<impl Iterator<Item = PubSubResult>>> {
138		let res = if notification.is_new_best {
139			let substrate_hash = notification.hash;
140
141			let block = self.storage_override.current_block(substrate_hash);
142			let statuses = self
143				.storage_override
144				.current_transaction_statuses(substrate_hash);
145
146			match (block, statuses) {
147				(Some(block), Some(statuses)) => Some((block, statuses)),
148				_ => None,
149			}
150		} else {
151			None
152		};
153
154		future::ready(res.map(|(block, statuses)| {
155			let logs = crate::eth::filter::filter_block_logs(&params.filter, block, statuses);
156
157			logs.clone()
158				.into_iter()
159				.map(|log| PubSubResult::Log(Box::new(log.clone())))
160		}))
161	}
162
163	fn pending_transactions(&self, hash: &TxHash<P>) -> future::Ready<Option<PubSubResult>> {
164		let res = if let Some(xt) = self.pool.ready_transaction(hash) {
165			let best_block = self.client.info().best_hash;
166
167			let api = self.client.runtime_api();
168
169			let api_version = if let Ok(Some(api_version)) =
170				api.api_version::<dyn EthereumRuntimeRPCApi<B>>(best_block)
171			{
172				api_version
173			} else {
174				return future::ready(None);
175			};
176
177			let xts = vec![xt.data().as_ref().clone()];
178
179			let txs: Option<Vec<EthereumTransaction>> = if api_version > 1 {
180				api.extrinsic_filter(best_block, xts).ok()
181			} else {
182				#[allow(deprecated)]
183				if let Ok(legacy) = api.extrinsic_filter_before_version_2(best_block, xts) {
184					Some(legacy.into_iter().map(|tx| tx.into()).collect())
185				} else {
186					None
187				}
188			};
189
190			match txs {
191				Some(txs) => {
192					if txs.len() == 1 {
193						Some(txs[0].clone())
194					} else {
195						None
196					}
197				}
198				_ => None,
199			}
200		} else {
201			None
202		};
203		future::ready(res.map(|tx| PubSubResult::transaction_hash(&tx)))
204	}
205
206	async fn syncing_status(&self) -> PubSubSyncing {
207		if self.sync.is_major_syncing() {
208			// Best imported block.
209			let current_number = self.client.info().best_number;
210			// Get the target block to sync.
211			let highest_number = self
212				.sync
213				.status()
214				.await
215				.ok()
216				.and_then(|status| status.best_seen_block);
217
218			PubSubSyncing::Syncing(SyncingStatus {
219				starting_block: self.starting_block,
220				current_block: UniqueSaturatedInto::<u64>::unique_saturated_into(current_number),
221				highest_block: highest_number
222					.map(UniqueSaturatedInto::<u64>::unique_saturated_into),
223			})
224		} else {
225			PubSubSyncing::Synced(false)
226		}
227	}
228}
229
230impl<B: BlockT, P, C, BE> EthPubSubApiServer for EthPubSub<B, P, C, BE>
231where
232	B: BlockT,
233	P: TransactionPool<Block = B, Hash = B::Hash> + 'static,
234	C: ProvideRuntimeApi<B>,
235	C::Api: EthereumRuntimeRPCApi<B>,
236	C: BlockchainEvents<B> + 'static,
237	C: HeaderBackend<B> + StorageProvider<B, BE>,
238	BE: Backend<B> + 'static,
239{
240	fn subscribe(&self, pending: PendingSubscriptionSink, kind: Kind, params: Option<Params>) {
241		let filtered_params = match params {
242			Some(Params::Logs(filter)) => FilteredParams::new(filter),
243			_ => FilteredParams::default(),
244		};
245
246		let pubsub = self.clone();
247		// Everytime a new subscription is created, a new mpsc channel is added to the sink pool.
248		let (inner_sink, block_notification_stream) =
249			sc_utils::mpsc::tracing_unbounded("pubsub_notification_stream", 100_000);
250		self.pubsub_notification_sinks.lock().push(inner_sink);
251
252		let fut = async move {
253			match kind {
254				Kind::NewHeads => {
255					let stream = block_notification_stream
256						.filter_map(move |notification| pubsub.notify_header(notification));
257					PendingSubscription::from(pending)
258						.pipe_from_stream(stream, BoundedVecDeque::new(16))
259						.await
260				}
261				Kind::Logs => {
262					let stream = block_notification_stream
263						.filter_map(move |notification| {
264							pubsub.notify_logs(notification, &filtered_params)
265						})
266						.flat_map(futures::stream::iter);
267					PendingSubscription::from(pending)
268						.pipe_from_stream(stream, BoundedVecDeque::new(16))
269						.await
270				}
271				Kind::NewPendingTransactions => {
272					let pool = pubsub.pool.clone();
273					let stream = pool
274						.import_notification_stream()
275						.filter_map(move |hash| pubsub.pending_transactions(&hash));
276					PendingSubscription::from(pending)
277						.pipe_from_stream(stream, BoundedVecDeque::new(16))
278						.await;
279				}
280				Kind::Syncing => {
281					let Ok(sink) = pending.accept().await else {
282						return;
283					};
284					// On connection subscriber expects a value.
285					// Because import notifications are only emitted when the node is synced or
286					// in case of reorg, the first event is emitted right away.
287					let syncing_status = pubsub.syncing_status().await;
288					let subscription = Subscription::from(sink);
289					let _ = subscription
290						.send(&PubSubResult::SyncingStatus(syncing_status))
291						.await;
292
293					// When the node is not under a major syncing (i.e. from genesis), react
294					// normally to import notifications.
295					//
296					// Only send new notifications down the pipe when the syncing status changed.
297					let mut stream = pubsub.client.import_notification_stream();
298					let mut last_syncing_status = pubsub.sync.is_major_syncing();
299					while (stream.next().await).is_some() {
300						let syncing_status = pubsub.sync.is_major_syncing();
301						if syncing_status != last_syncing_status {
302							let syncing_status = pubsub.syncing_status().await;
303							let _ = subscription
304								.send(&PubSubResult::SyncingStatus(syncing_status))
305								.await;
306						}
307						last_syncing_status = syncing_status;
308					}
309				}
310			}
311		}
312		.boxed();
313
314		self.executor
315			.spawn("frontier-rpc-subscription", Some("rpc"), fut);
316	}
317}