1use std::{marker::PhantomData, sync::Arc};
20
21use ethereum::TransactionV3 as EthereumTransaction;
22use futures::{future, FutureExt as _, StreamExt as _};
23use jsonrpsee::{core::traits::IdProvider, server::PendingSubscriptionSink};
24use sc_client_api::{
26 backend::{Backend, StorageProvider},
27 client::BlockchainEvents,
28};
29use sc_network_sync::SyncingService;
30use sc_rpc::{
31 utils::{BoundedVecDeque, PendingSubscription, Subscription},
32 SubscriptionTaskExecutor,
33};
34use sc_service::config::RpcSubscriptionIdProvider;
35use sc_transaction_pool_api::{InPoolTransaction, TransactionPool, TxHash};
36use sp_api::{ApiExt, ProvideRuntimeApi};
37use sp_blockchain::HeaderBackend;
38use sp_consensus::SyncOracle;
39use sp_runtime::traits::{Block as BlockT, UniqueSaturatedInto};
40use fc_mapping_sync::{EthereumBlockNotification, EthereumBlockNotificationSinks};
42use fc_rpc_core::{
43 types::{
44 pubsub::{Kind, Params, PubSubResult, PubSubSyncing, SyncingStatus},
45 FilteredParams,
46 },
47 EthPubSubApiServer,
48};
49use fc_storage::StorageOverride;
50use fp_rpc::EthereumRuntimeRPCApi;
51
52#[derive(Clone, Debug)]
53pub struct EthereumSubIdProvider;
54impl IdProvider for EthereumSubIdProvider {
55 fn next_id(&self) -> jsonrpsee::types::SubscriptionId<'static> {
56 format!("0x{}", hex::encode(rand::random::<u128>().to_le_bytes())).into()
57 }
58}
59impl RpcSubscriptionIdProvider for EthereumSubIdProvider {}
60
61pub struct EthPubSub<B: BlockT, P, C, BE> {
63 pool: Arc<P>,
64 client: Arc<C>,
65 sync: Arc<SyncingService<B>>,
66 executor: SubscriptionTaskExecutor,
67 storage_override: Arc<dyn StorageOverride<B>>,
68 starting_block: u64,
69 pubsub_notification_sinks: Arc<EthereumBlockNotificationSinks<EthereumBlockNotification<B>>>,
70 _marker: PhantomData<BE>,
71}
72
73impl<B: BlockT, P, C, BE> Clone for EthPubSub<B, P, C, BE> {
74 fn clone(&self) -> Self {
75 Self {
76 pool: self.pool.clone(),
77 client: self.client.clone(),
78 sync: self.sync.clone(),
79 executor: self.executor.clone(),
80 storage_override: self.storage_override.clone(),
81 starting_block: self.starting_block,
82 pubsub_notification_sinks: self.pubsub_notification_sinks.clone(),
83 _marker: PhantomData::<BE>,
84 }
85 }
86}
87
88impl<B: BlockT, P, C, BE> EthPubSub<B, P, C, BE>
89where
90 P: TransactionPool<Block = B, Hash = B::Hash> + 'static,
91 C: ProvideRuntimeApi<B>,
92 C::Api: EthereumRuntimeRPCApi<B>,
93 C: HeaderBackend<B> + StorageProvider<B, BE>,
94 BE: Backend<B> + 'static,
95{
96 pub fn new(
97 pool: Arc<P>,
98 client: Arc<C>,
99 sync: Arc<SyncingService<B>>,
100 executor: SubscriptionTaskExecutor,
101 storage_override: Arc<dyn StorageOverride<B>>,
102 pubsub_notification_sinks: Arc<
103 EthereumBlockNotificationSinks<EthereumBlockNotification<B>>,
104 >,
105 ) -> Self {
106 let best_number = client.info().best_number;
108 let starting_block = UniqueSaturatedInto::<u64>::unique_saturated_into(best_number);
109 Self {
110 pool,
111 client,
112 sync,
113 executor,
114 storage_override,
115 starting_block,
116 pubsub_notification_sinks,
117 _marker: PhantomData,
118 }
119 }
120
121 fn notify_header(
122 &self,
123 notification: EthereumBlockNotification<B>,
124 ) -> future::Ready<Option<PubSubResult>> {
125 let res = if notification.is_new_best {
126 self.storage_override.current_block(notification.hash)
127 } else {
128 None
129 };
130 future::ready(res.map(PubSubResult::header))
131 }
132
133 fn notify_logs(
134 &self,
135 notification: EthereumBlockNotification<B>,
136 params: &FilteredParams,
137 ) -> future::Ready<Option<impl Iterator<Item = PubSubResult>>> {
138 let res = if notification.is_new_best {
139 let substrate_hash = notification.hash;
140
141 let block = self.storage_override.current_block(substrate_hash);
142 let receipts = self.storage_override.current_receipts(substrate_hash);
143
144 match (block, receipts) {
145 (Some(block), Some(receipts)) => Some((block, receipts)),
146 _ => None,
147 }
148 } else {
149 None
150 };
151 future::ready(res.map(|(block, receipts)| PubSubResult::logs(block, receipts, params)))
152 }
153
154 fn pending_transactions(&self, hash: &TxHash<P>) -> future::Ready<Option<PubSubResult>> {
155 let res = if let Some(xt) = self.pool.ready_transaction(hash) {
156 let best_block = self.client.info().best_hash;
157
158 let api = self.client.runtime_api();
159
160 let api_version = if let Ok(Some(api_version)) =
161 api.api_version::<dyn EthereumRuntimeRPCApi<B>>(best_block)
162 {
163 api_version
164 } else {
165 return future::ready(None);
166 };
167
168 let xts = vec![xt.data().as_ref().clone()];
169
170 let txs: Option<Vec<EthereumTransaction>> = if api_version > 1 {
171 api.extrinsic_filter(best_block, xts).ok()
172 } else {
173 #[allow(deprecated)]
174 if let Ok(legacy) = api.extrinsic_filter_before_version_2(best_block, xts) {
175 Some(legacy.into_iter().map(|tx| tx.into()).collect())
176 } else {
177 None
178 }
179 };
180
181 match txs {
182 Some(txs) => {
183 if txs.len() == 1 {
184 Some(txs[0].clone())
185 } else {
186 None
187 }
188 }
189 _ => None,
190 }
191 } else {
192 None
193 };
194 future::ready(res.map(|tx| PubSubResult::transaction_hash(&tx)))
195 }
196
197 async fn syncing_status(&self) -> PubSubSyncing {
198 if self.sync.is_major_syncing() {
199 let current_number = self.client.info().best_number;
201 let highest_number = self
203 .sync
204 .status()
205 .await
206 .ok()
207 .and_then(|status| status.best_seen_block);
208
209 PubSubSyncing::Syncing(SyncingStatus {
210 starting_block: self.starting_block,
211 current_block: UniqueSaturatedInto::<u64>::unique_saturated_into(current_number),
212 highest_block: highest_number
213 .map(UniqueSaturatedInto::<u64>::unique_saturated_into),
214 })
215 } else {
216 PubSubSyncing::Synced(false)
217 }
218 }
219}
220
221impl<B: BlockT, P, C, BE> EthPubSubApiServer for EthPubSub<B, P, C, BE>
222where
223 B: BlockT,
224 P: TransactionPool<Block = B, Hash = B::Hash> + 'static,
225 C: ProvideRuntimeApi<B>,
226 C::Api: EthereumRuntimeRPCApi<B>,
227 C: BlockchainEvents<B> + 'static,
228 C: HeaderBackend<B> + StorageProvider<B, BE>,
229 BE: Backend<B> + 'static,
230{
231 fn subscribe(&self, pending: PendingSubscriptionSink, kind: Kind, params: Option<Params>) {
232 let filtered_params = match params {
233 Some(Params::Logs(filter)) => FilteredParams::new(Some(filter)),
234 _ => FilteredParams::default(),
235 };
236
237 let pubsub = self.clone();
238 let (inner_sink, block_notification_stream) =
240 sc_utils::mpsc::tracing_unbounded("pubsub_notification_stream", 100_000);
241 self.pubsub_notification_sinks.lock().push(inner_sink);
242
243 let fut = async move {
244 match kind {
245 Kind::NewHeads => {
246 let stream = block_notification_stream
247 .filter_map(move |notification| pubsub.notify_header(notification));
248 PendingSubscription::from(pending)
249 .pipe_from_stream(stream, BoundedVecDeque::new(16))
250 .await
251 }
252 Kind::Logs => {
253 let stream = block_notification_stream
254 .filter_map(move |notification| {
255 pubsub.notify_logs(notification, &filtered_params)
256 })
257 .flat_map(futures::stream::iter);
258 PendingSubscription::from(pending)
259 .pipe_from_stream(stream, BoundedVecDeque::new(16))
260 .await
261 }
262 Kind::NewPendingTransactions => {
263 let pool = pubsub.pool.clone();
264 let stream = pool
265 .import_notification_stream()
266 .filter_map(move |hash| pubsub.pending_transactions(&hash));
267 PendingSubscription::from(pending)
268 .pipe_from_stream(stream, BoundedVecDeque::new(16))
269 .await;
270 }
271 Kind::Syncing => {
272 let Ok(sink) = pending.accept().await else {
273 return;
274 };
275 let syncing_status = pubsub.syncing_status().await;
279 let subscription = Subscription::from(sink);
280 let _ = subscription
281 .send(&PubSubResult::SyncingStatus(syncing_status))
282 .await;
283
284 let mut stream = pubsub.client.import_notification_stream();
289 let mut last_syncing_status = pubsub.sync.is_major_syncing();
290 while (stream.next().await).is_some() {
291 let syncing_status = pubsub.sync.is_major_syncing();
292 if syncing_status != last_syncing_status {
293 let syncing_status = pubsub.syncing_status().await;
294 let _ = subscription
295 .send(&PubSubResult::SyncingStatus(syncing_status))
296 .await;
297 }
298 last_syncing_status = syncing_status;
299 }
300 }
301 }
302 }
303 .boxed();
304
305 self.executor
306 .spawn("frontier-rpc-subscription", Some("rpc"), fut);
307 }
308}