use std::{marker::PhantomData, sync::Arc};
use ethereum::TransactionV2 as EthereumTransaction;
use futures::{future, FutureExt as _, StreamExt as _};
use jsonrpsee::{core::traits::IdProvider, server::PendingSubscriptionSink};
use sc_client_api::{
backend::{Backend, StorageProvider},
client::BlockchainEvents,
};
use sc_network_sync::SyncingService;
use sc_rpc::{
utils::{pipe_from_stream, to_sub_message},
SubscriptionTaskExecutor,
};
use sc_transaction_pool_api::{InPoolTransaction, TransactionPool, TxHash};
use sp_api::{ApiExt, ProvideRuntimeApi};
use sp_blockchain::HeaderBackend;
use sp_consensus::SyncOracle;
use sp_runtime::traits::{Block as BlockT, UniqueSaturatedInto};
use fc_mapping_sync::{EthereumBlockNotification, EthereumBlockNotificationSinks};
use fc_rpc_core::{
types::{
pubsub::{Kind, Params, PubSubResult, PubSubSyncing, SyncingStatus},
FilteredParams,
},
EthPubSubApiServer,
};
use fc_storage::OverrideHandle;
use fp_rpc::EthereumRuntimeRPCApi;
#[derive(Debug)]
pub struct EthereumSubIdProvider;
impl IdProvider for EthereumSubIdProvider {
fn next_id(&self) -> jsonrpsee::types::SubscriptionId<'static> {
format!("0x{}", hex::encode(rand::random::<u128>().to_le_bytes())).into()
}
}
pub struct EthPubSub<B: BlockT, P, C, BE> {
pool: Arc<P>,
client: Arc<C>,
sync: Arc<SyncingService<B>>,
executor: SubscriptionTaskExecutor,
overrides: Arc<OverrideHandle<B>>,
starting_block: u64,
pubsub_notification_sinks: Arc<EthereumBlockNotificationSinks<EthereumBlockNotification<B>>>,
_marker: PhantomData<BE>,
}
impl<B: BlockT, P, C, BE> Clone for EthPubSub<B, P, C, BE> {
fn clone(&self) -> Self {
Self {
pool: self.pool.clone(),
client: self.client.clone(),
sync: self.sync.clone(),
executor: self.executor.clone(),
overrides: self.overrides.clone(),
starting_block: self.starting_block,
pubsub_notification_sinks: self.pubsub_notification_sinks.clone(),
_marker: PhantomData::<BE>,
}
}
}
impl<B: BlockT, P, C, BE> EthPubSub<B, P, C, BE>
where
P: TransactionPool<Block = B> + 'static,
C: ProvideRuntimeApi<B>,
C::Api: EthereumRuntimeRPCApi<B>,
C: HeaderBackend<B> + StorageProvider<B, BE>,
BE: Backend<B> + 'static,
{
pub fn new(
pool: Arc<P>,
client: Arc<C>,
sync: Arc<SyncingService<B>>,
executor: SubscriptionTaskExecutor,
overrides: Arc<OverrideHandle<B>>,
pubsub_notification_sinks: Arc<
EthereumBlockNotificationSinks<EthereumBlockNotification<B>>,
>,
) -> Self {
let best_number = client.info().best_number;
let starting_block = UniqueSaturatedInto::<u64>::unique_saturated_into(best_number);
Self {
pool,
client,
sync,
executor,
overrides,
starting_block,
pubsub_notification_sinks,
_marker: PhantomData,
}
}
fn notify_header(
&self,
notification: EthereumBlockNotification<B>,
) -> future::Ready<Option<PubSubResult>> {
let res = if notification.is_new_best {
let schema = fc_storage::onchain_storage_schema(&*self.client, notification.hash);
let handler = self
.overrides
.schemas
.get(&schema)
.unwrap_or(&self.overrides.fallback);
handler.current_block(notification.hash)
} else {
None
};
future::ready(res.map(PubSubResult::header))
}
fn notify_logs(
&self,
notification: EthereumBlockNotification<B>,
params: &FilteredParams,
) -> future::Ready<Option<impl Iterator<Item = PubSubResult>>> {
let res = if notification.is_new_best {
let substrate_hash = notification.hash;
let schema = fc_storage::onchain_storage_schema(&*self.client, substrate_hash);
let handler = self
.overrides
.schemas
.get(&schema)
.unwrap_or(&self.overrides.fallback);
let block = handler.current_block(substrate_hash);
let receipts = handler.current_receipts(substrate_hash);
match (block, receipts) {
(Some(block), Some(receipts)) => Some((block, receipts)),
_ => None,
}
} else {
None
};
future::ready(res.map(|(block, receipts)| PubSubResult::logs(block, receipts, params)))
}
fn pending_transaction(&self, hash: &TxHash<P>) -> future::Ready<Option<PubSubResult>> {
let res = if let Some(xt) = self.pool.ready_transaction(hash) {
let best_block = self.client.info().best_hash;
let api = self.client.runtime_api();
let api_version = if let Ok(Some(api_version)) =
api.api_version::<dyn EthereumRuntimeRPCApi<B>>(best_block)
{
api_version
} else {
return future::ready(None);
};
let xts = vec![xt.data().clone()];
let txs: Option<Vec<EthereumTransaction>> = if api_version > 1 {
api.extrinsic_filter(best_block, xts).ok()
} else {
#[allow(deprecated)]
if let Ok(legacy) = api.extrinsic_filter_before_version_2(best_block, xts) {
Some(legacy.into_iter().map(|tx| tx.into()).collect())
} else {
None
}
};
match txs {
Some(txs) => {
if txs.len() == 1 {
Some(txs[0].clone())
} else {
None
}
}
_ => None,
}
} else {
None
};
future::ready(res.map(|tx| PubSubResult::transaction_hash(&tx)))
}
async fn syncing_status(&self) -> PubSubSyncing {
if self.sync.is_major_syncing() {
let current_number = self.client.info().best_number;
let highest_number = self.sync.best_seen_block().await.ok().flatten();
PubSubSyncing::Syncing(SyncingStatus {
starting_block: self.starting_block,
current_block: UniqueSaturatedInto::<u64>::unique_saturated_into(current_number),
highest_block: highest_number
.map(UniqueSaturatedInto::<u64>::unique_saturated_into),
})
} else {
PubSubSyncing::Synced(false)
}
}
}
impl<B: BlockT, P, C, BE> EthPubSubApiServer for EthPubSub<B, P, C, BE>
where
B: BlockT,
P: TransactionPool<Block = B> + 'static,
C: ProvideRuntimeApi<B>,
C::Api: EthereumRuntimeRPCApi<B>,
C: BlockchainEvents<B> + 'static,
C: HeaderBackend<B> + StorageProvider<B, BE>,
BE: Backend<B> + 'static,
{
fn subscribe(&self, pending: PendingSubscriptionSink, kind: Kind, params: Option<Params>) {
let filtered_params = match params {
Some(Params::Logs(filter)) => FilteredParams::new(Some(filter)),
_ => FilteredParams::default(),
};
let pubsub = self.clone();
let (inner_sink, block_notification_stream) =
sc_utils::mpsc::tracing_unbounded("pubsub_notification_stream", 100_000);
self.pubsub_notification_sinks.lock().push(inner_sink);
let fut = async move {
match kind {
Kind::NewHeads => {
let stream = block_notification_stream
.filter_map(move |notification| pubsub.notify_header(notification));
pipe_from_stream(pending, stream).await
}
Kind::Logs => {
let stream = block_notification_stream
.filter_map(move |notification| {
pubsub.notify_logs(notification, &filtered_params)
})
.flat_map(futures::stream::iter);
pipe_from_stream(pending, stream).await
}
Kind::NewPendingTransactions => {
let pool = pubsub.pool.clone();
let stream = pool
.import_notification_stream()
.filter_map(move |hash| pubsub.pending_transaction(&hash));
pipe_from_stream(pending, stream).await;
}
Kind::Syncing => {
let Ok(sink) = pending.accept().await else {
return;
};
let syncing_status = pubsub.syncing_status().await;
let msg = to_sub_message(&sink, &PubSubResult::SyncingStatus(syncing_status));
let _ = sink.send(msg).await;
let mut stream = pubsub.client.import_notification_stream();
let mut last_syncing_status = pubsub.sync.is_major_syncing();
while (stream.next().await).is_some() {
let syncing_status = pubsub.sync.is_major_syncing();
if syncing_status != last_syncing_status {
let syncing_status = pubsub.syncing_status().await;
let msg =
to_sub_message(&sink, &PubSubResult::SyncingStatus(syncing_status));
let _ = sink.send(msg).await;
}
last_syncing_status = syncing_status;
}
}
}
}
.boxed();
self.executor
.spawn("frontier-rpc-subscription", Some("rpc"), fut);
}
}