1use std::{marker::PhantomData, sync::Arc};
20
21use ethereum::TransactionV3 as EthereumTransaction;
22use futures::{future, FutureExt as _, StreamExt as _};
23use jsonrpsee::{core::traits::IdProvider, server::PendingSubscriptionSink};
24use sc_client_api::{
26 backend::{Backend, StorageProvider},
27 client::BlockchainEvents,
28};
29use sc_network_sync::SyncingService;
30use sc_rpc::{
31 utils::{BoundedVecDeque, PendingSubscription, Subscription},
32 SubscriptionTaskExecutor,
33};
34use sc_service::config::RpcSubscriptionIdProvider;
35use sc_transaction_pool_api::{InPoolTransaction, TransactionPool, TxHash};
36use sp_api::{ApiExt, ProvideRuntimeApi};
37use sp_blockchain::HeaderBackend;
38use sp_consensus::SyncOracle;
39use sp_runtime::traits::{Block as BlockT, UniqueSaturatedInto};
40use fc_mapping_sync::{EthereumBlockNotification, EthereumBlockNotificationSinks};
42use fc_rpc_core::{
43 types::{
44 pubsub::{Kind, Params, PubSubResult, PubSubSyncing, SyncingStatus},
45 FilteredParams,
46 },
47 EthPubSubApiServer,
48};
49use fc_storage::StorageOverride;
50use fp_rpc::EthereumRuntimeRPCApi;
51
52#[derive(Clone, Debug)]
53pub struct EthereumSubIdProvider;
54impl IdProvider for EthereumSubIdProvider {
55 fn next_id(&self) -> jsonrpsee::types::SubscriptionId<'static> {
56 format!("0x{}", hex::encode(rand::random::<u128>().to_le_bytes())).into()
57 }
58}
59impl RpcSubscriptionIdProvider for EthereumSubIdProvider {}
60
61pub struct EthPubSub<B: BlockT, P, C, BE> {
63 pool: Arc<P>,
64 client: Arc<C>,
65 sync: Arc<SyncingService<B>>,
66 executor: SubscriptionTaskExecutor,
67 storage_override: Arc<dyn StorageOverride<B>>,
68 starting_block: u64,
69 pubsub_notification_sinks: Arc<EthereumBlockNotificationSinks<EthereumBlockNotification<B>>>,
70 _marker: PhantomData<BE>,
71}
72
73impl<B: BlockT, P, C, BE> Clone for EthPubSub<B, P, C, BE> {
74 fn clone(&self) -> Self {
75 Self {
76 pool: self.pool.clone(),
77 client: self.client.clone(),
78 sync: self.sync.clone(),
79 executor: self.executor.clone(),
80 storage_override: self.storage_override.clone(),
81 starting_block: self.starting_block,
82 pubsub_notification_sinks: self.pubsub_notification_sinks.clone(),
83 _marker: PhantomData::<BE>,
84 }
85 }
86}
87
88impl<B: BlockT, P, C, BE> EthPubSub<B, P, C, BE>
89where
90 P: TransactionPool<Block = B, Hash = B::Hash> + 'static,
91 C: ProvideRuntimeApi<B>,
92 C::Api: EthereumRuntimeRPCApi<B>,
93 C: HeaderBackend<B> + StorageProvider<B, BE>,
94 BE: Backend<B> + 'static,
95{
96 pub fn new(
97 pool: Arc<P>,
98 client: Arc<C>,
99 sync: Arc<SyncingService<B>>,
100 executor: SubscriptionTaskExecutor,
101 storage_override: Arc<dyn StorageOverride<B>>,
102 pubsub_notification_sinks: Arc<
103 EthereumBlockNotificationSinks<EthereumBlockNotification<B>>,
104 >,
105 ) -> Self {
106 let best_number = client.info().best_number;
108 let starting_block = UniqueSaturatedInto::<u64>::unique_saturated_into(best_number);
109 Self {
110 pool,
111 client,
112 sync,
113 executor,
114 storage_override,
115 starting_block,
116 pubsub_notification_sinks,
117 _marker: PhantomData,
118 }
119 }
120
121 fn notify_header(
122 &self,
123 notification: EthereumBlockNotification<B>,
124 ) -> future::Ready<Option<PubSubResult>> {
125 let res = if notification.is_new_best {
126 self.storage_override.current_block(notification.hash)
127 } else {
128 None
129 };
130 future::ready(res.map(PubSubResult::header))
131 }
132
133 fn notify_logs(
134 &self,
135 notification: EthereumBlockNotification<B>,
136 params: &FilteredParams,
137 ) -> future::Ready<Option<impl Iterator<Item = PubSubResult>>> {
138 let res = if notification.is_new_best {
139 let substrate_hash = notification.hash;
140
141 let block = self.storage_override.current_block(substrate_hash);
142 let statuses = self
143 .storage_override
144 .current_transaction_statuses(substrate_hash);
145
146 match (block, statuses) {
147 (Some(block), Some(statuses)) => Some((block, statuses)),
148 _ => None,
149 }
150 } else {
151 None
152 };
153
154 future::ready(res.map(|(block, statuses)| {
155 let logs = crate::eth::filter::filter_block_logs(¶ms.filter, block, statuses);
156
157 logs.clone()
158 .into_iter()
159 .map(|log| PubSubResult::Log(Box::new(log.clone())))
160 }))
161 }
162
163 fn pending_transactions(&self, hash: &TxHash<P>) -> future::Ready<Option<PubSubResult>> {
164 let res = if let Some(xt) = self.pool.ready_transaction(hash) {
165 let best_block = self.client.info().best_hash;
166
167 let api = self.client.runtime_api();
168
169 let api_version = if let Ok(Some(api_version)) =
170 api.api_version::<dyn EthereumRuntimeRPCApi<B>>(best_block)
171 {
172 api_version
173 } else {
174 return future::ready(None);
175 };
176
177 let xts = vec![xt.data().as_ref().clone()];
178
179 let txs: Option<Vec<EthereumTransaction>> = if api_version > 1 {
180 api.extrinsic_filter(best_block, xts).ok()
181 } else {
182 #[allow(deprecated)]
183 if let Ok(legacy) = api.extrinsic_filter_before_version_2(best_block, xts) {
184 Some(legacy.into_iter().map(|tx| tx.into()).collect())
185 } else {
186 None
187 }
188 };
189
190 match txs {
191 Some(txs) => {
192 if txs.len() == 1 {
193 Some(txs[0].clone())
194 } else {
195 None
196 }
197 }
198 _ => None,
199 }
200 } else {
201 None
202 };
203 future::ready(res.map(|tx| PubSubResult::transaction_hash(&tx)))
204 }
205
206 async fn syncing_status(&self) -> PubSubSyncing {
207 if self.sync.is_major_syncing() {
208 let current_number = self.client.info().best_number;
210 let highest_number = self
212 .sync
213 .status()
214 .await
215 .ok()
216 .and_then(|status| status.best_seen_block);
217
218 PubSubSyncing::Syncing(SyncingStatus {
219 starting_block: self.starting_block,
220 current_block: UniqueSaturatedInto::<u64>::unique_saturated_into(current_number),
221 highest_block: highest_number
222 .map(UniqueSaturatedInto::<u64>::unique_saturated_into),
223 })
224 } else {
225 PubSubSyncing::Synced(false)
226 }
227 }
228}
229
230impl<B: BlockT, P, C, BE> EthPubSubApiServer for EthPubSub<B, P, C, BE>
231where
232 B: BlockT,
233 P: TransactionPool<Block = B, Hash = B::Hash> + 'static,
234 C: ProvideRuntimeApi<B>,
235 C::Api: EthereumRuntimeRPCApi<B>,
236 C: BlockchainEvents<B> + 'static,
237 C: HeaderBackend<B> + StorageProvider<B, BE>,
238 BE: Backend<B> + 'static,
239{
240 fn subscribe(&self, pending: PendingSubscriptionSink, kind: Kind, params: Option<Params>) {
241 let filtered_params = match params {
242 Some(Params::Logs(filter)) => FilteredParams::new(filter),
243 _ => FilteredParams::default(),
244 };
245
246 let pubsub = self.clone();
247 let (inner_sink, block_notification_stream) =
249 sc_utils::mpsc::tracing_unbounded("pubsub_notification_stream", 100_000);
250 self.pubsub_notification_sinks.lock().push(inner_sink);
251
252 let fut = async move {
253 match kind {
254 Kind::NewHeads => {
255 let stream = block_notification_stream
256 .filter_map(move |notification| pubsub.notify_header(notification));
257 PendingSubscription::from(pending)
258 .pipe_from_stream(stream, BoundedVecDeque::new(16))
259 .await
260 }
261 Kind::Logs => {
262 let stream = block_notification_stream
263 .filter_map(move |notification| {
264 pubsub.notify_logs(notification, &filtered_params)
265 })
266 .flat_map(futures::stream::iter);
267 PendingSubscription::from(pending)
268 .pipe_from_stream(stream, BoundedVecDeque::new(16))
269 .await
270 }
271 Kind::NewPendingTransactions => {
272 let pool = pubsub.pool.clone();
273 let stream = pool
274 .import_notification_stream()
275 .filter_map(move |hash| pubsub.pending_transactions(&hash));
276 PendingSubscription::from(pending)
277 .pipe_from_stream(stream, BoundedVecDeque::new(16))
278 .await;
279 }
280 Kind::Syncing => {
281 let Ok(sink) = pending.accept().await else {
282 return;
283 };
284 let syncing_status = pubsub.syncing_status().await;
288 let subscription = Subscription::from(sink);
289 let _ = subscription
290 .send(&PubSubResult::SyncingStatus(syncing_status))
291 .await;
292
293 let mut stream = pubsub.client.import_notification_stream();
298 let mut last_syncing_status = pubsub.sync.is_major_syncing();
299 while (stream.next().await).is_some() {
300 let syncing_status = pubsub.sync.is_major_syncing();
301 if syncing_status != last_syncing_status {
302 let syncing_status = pubsub.syncing_status().await;
303 let _ = subscription
304 .send(&PubSubResult::SyncingStatus(syncing_status))
305 .await;
306 }
307 last_syncing_status = syncing_status;
308 }
309 }
310 }
311 }
312 .boxed();
313
314 self.executor
315 .spawn("frontier-rpc-subscription", Some("rpc"), fut);
316 }
317}