1use std::{
20 collections::{BTreeMap, HashSet},
21 marker::PhantomData,
22 sync::Arc,
23 time::{Duration, Instant},
24};
25
26use ethereum::BlockV3 as EthereumBlock;
27use ethereum_types::{H256, U256};
28use jsonrpsee::core::{async_trait, RpcResult};
29use sc_client_api::backend::{Backend, StorageProvider};
31use sc_transaction_pool_api::{InPoolTransaction, TransactionPool};
32use sp_api::ProvideRuntimeApi;
33use sp_blockchain::HeaderBackend;
34use sp_core::hashing::keccak_256;
35use sp_runtime::{
36 generic::BlockId,
37 traits::{Block as BlockT, NumberFor, One, Saturating, UniqueSaturatedInto},
38};
39use fc_rpc_core::{types::*, EthFilterApiServer};
41use fp_rpc::{EthereumRuntimeRPCApi, TransactionStatus};
42
43use crate::{cache::EthBlockDataCacheTask, frontier_backend_client, internal_err};
44
45pub struct EthFilter<B: BlockT, C, BE, P> {
46 client: Arc<C>,
47 backend: Arc<dyn fc_api::Backend<B>>,
48 graph: Arc<P>,
49 filter_pool: FilterPool,
50 max_stored_filters: usize,
51 max_past_logs: u32,
52 block_data_cache: Arc<EthBlockDataCacheTask<B>>,
53 _marker: PhantomData<BE>,
54}
55
56impl<B: BlockT, C, BE, P: TransactionPool> EthFilter<B, C, BE, P> {
57 pub fn new(
58 client: Arc<C>,
59 backend: Arc<dyn fc_api::Backend<B>>,
60 graph: Arc<P>,
61 filter_pool: FilterPool,
62 max_stored_filters: usize,
63 max_past_logs: u32,
64 block_data_cache: Arc<EthBlockDataCacheTask<B>>,
65 ) -> Self {
66 Self {
67 client,
68 backend,
69 graph,
70 filter_pool,
71 max_stored_filters,
72 max_past_logs,
73 block_data_cache,
74 _marker: PhantomData,
75 }
76 }
77}
78
79impl<B, C, BE, P> EthFilter<B, C, BE, P>
80where
81 B: BlockT,
82 C: ProvideRuntimeApi<B>,
83 C::Api: EthereumRuntimeRPCApi<B>,
84 C: HeaderBackend<B> + 'static,
85 P: TransactionPool<Block = B, Hash = B::Hash> + 'static,
86{
87 fn create_filter(&self, filter_type: FilterType) -> RpcResult<U256> {
88 let info = self.client.info();
89 let best_hash = info.best_hash;
90 let best_number = UniqueSaturatedInto::<u64>::unique_saturated_into(info.best_number);
91 let pool = self.filter_pool.clone();
92 let response = if let Ok(locked) = &mut pool.lock() {
93 if locked.len() >= self.max_stored_filters {
94 return Err(internal_err(format!(
95 "Filter pool is full (limit {:?}).",
96 self.max_stored_filters
97 )));
98 }
99 let next_back = {
100 let mut iter = locked.iter();
101 iter.next_back()
102 };
103 let last_key = match next_back {
104 Some((k, _)) => *k,
105 None => U256::zero(),
106 };
107
108 let pending_transaction_hashes = if let FilterType::PendingTransaction = filter_type {
109 let txs_ready = self
110 .graph
111 .ready()
112 .map(|in_pool_tx| in_pool_tx.data().as_ref().clone())
113 .collect();
114 let api = self.client.runtime_api();
116 api.extrinsic_filter(best_hash, txs_ready)
117 .map_err(|err| {
118 internal_err(format!("fetch ready transactions failed: {err:?}"))
119 })?
120 .into_iter()
121 .map(|tx| tx.hash())
122 .collect::<HashSet<_>>()
123 } else {
124 HashSet::new()
125 };
126
127 let key = last_key.checked_add(U256::one()).unwrap();
129 locked.insert(
130 key,
131 FilterPoolItem {
132 last_poll: BlockNumberOrHash::Num(best_number),
133 filter_type,
134 at_block: best_number,
135 pending_transaction_hashes,
136 },
137 );
138 Ok(key)
139 } else {
140 Err(internal_err("Filter pool is not available."))
141 };
142 response
143 }
144}
145
146#[async_trait]
147impl<B, C, BE, P> EthFilterApiServer for EthFilter<B, C, BE, P>
148where
149 B: BlockT,
150 C: ProvideRuntimeApi<B>,
151 C::Api: EthereumRuntimeRPCApi<B>,
152 C: HeaderBackend<B> + StorageProvider<B, BE> + 'static,
153 BE: Backend<B> + 'static,
154 P: TransactionPool<Block = B, Hash = B::Hash> + 'static,
155{
156 fn new_filter(&self, filter: Filter) -> RpcResult<U256> {
157 self.create_filter(FilterType::Log(filter))
158 }
159
160 fn new_block_filter(&self) -> RpcResult<U256> {
161 self.create_filter(FilterType::Block)
162 }
163
164 fn new_pending_transaction_filter(&self) -> RpcResult<U256> {
165 self.create_filter(FilterType::PendingTransaction)
166 }
167
168 async fn filter_changes(&self, index: Index) -> RpcResult<FilterChanges> {
169 enum FuturePath<B: BlockT> {
178 Block {
179 last: u64,
180 next: u64,
181 },
182 PendingTransaction {
183 new_hashes: Vec<H256>,
184 },
185 Log {
186 filter: Filter,
187 from_number: NumberFor<B>,
188 current_number: NumberFor<B>,
189 },
190 Error(jsonrpsee::types::ErrorObjectOwned),
191 }
192
193 let key = U256::from(index.value());
194 let info = self.client.info();
195 let best_hash = info.best_hash;
196 let best_number = UniqueSaturatedInto::<u64>::unique_saturated_into(info.best_number);
197 let pool = self.filter_pool.clone();
198 let path = if let Ok(locked) = &mut pool.lock() {
200 if let Some(pool_item) = locked.get(&key).cloned() {
202 match &pool_item.filter_type {
203 FilterType::Block => {
205 let last = pool_item.last_poll.to_min_block_num().unwrap();
206 let next = best_number + 1;
207 locked.insert(
209 key,
210 FilterPoolItem {
211 last_poll: BlockNumberOrHash::Num(next),
212 filter_type: pool_item.filter_type.clone(),
213 at_block: pool_item.at_block,
214 pending_transaction_hashes: HashSet::new(),
215 },
216 );
217
218 FuturePath::<B>::Block { last, next }
219 }
220 FilterType::PendingTransaction => {
221 let previous_hashes = pool_item.pending_transaction_hashes;
222 let txs_ready = self
223 .graph
224 .ready()
225 .map(|in_pool_tx| in_pool_tx.data().as_ref().clone())
226 .collect();
227 let api = self.client.runtime_api();
229 let current_hashes = api
230 .extrinsic_filter(best_hash, txs_ready)
231 .map_err(|err| {
232 internal_err(format!("fetch ready transactions failed: {err:?}"))
233 })?
234 .into_iter()
235 .map(|tx| tx.hash())
236 .collect::<HashSet<_>>();
237
238 locked.insert(
240 key,
241 FilterPoolItem {
242 last_poll: BlockNumberOrHash::Num(best_number + 1),
243 filter_type: pool_item.filter_type.clone(),
244 at_block: pool_item.at_block,
245 pending_transaction_hashes: current_hashes.clone(),
246 },
247 );
248
249 let mew_hashes = current_hashes
250 .difference(&previous_hashes)
251 .collect::<HashSet<&H256>>();
252 FuturePath::PendingTransaction {
253 new_hashes: mew_hashes.into_iter().copied().collect(),
254 }
255 }
256 FilterType::Log(filter) => {
258 locked.insert(
260 key,
261 FilterPoolItem {
262 last_poll: BlockNumberOrHash::Num(best_number + 1),
263 filter_type: pool_item.filter_type.clone(),
264 at_block: pool_item.at_block,
265 pending_transaction_hashes: HashSet::new(),
266 },
267 );
268
269 let best_number = self.client.info().best_number;
271 let mut current_number = filter
272 .to_block
273 .and_then(|v| v.to_min_block_num())
274 .map(|s| s.unique_saturated_into())
275 .unwrap_or(best_number);
276
277 if current_number > best_number {
278 current_number = best_number;
279 }
280
281 let last_poll = pool_item
283 .last_poll
284 .to_min_block_num()
285 .unwrap()
286 .unique_saturated_into();
287
288 let filter_from = filter
289 .from_block
290 .and_then(|v| v.to_min_block_num())
291 .map(|s| s.unique_saturated_into())
292 .unwrap_or(last_poll);
293
294 let from_number = std::cmp::max(last_poll, filter_from);
295
296 FuturePath::Log {
298 filter: filter.clone(),
299 from_number,
300 current_number,
301 }
302 }
303 }
304 } else {
305 FuturePath::Error(internal_err(format!("Filter id {key:?} does not exist.")))
306 }
307 } else {
308 FuturePath::Error(internal_err("Filter pool is not available."))
309 };
310
311 let client = Arc::clone(&self.client);
312 let backend = Arc::clone(&self.backend);
313 let block_data_cache = Arc::clone(&self.block_data_cache);
314 let max_past_logs = self.max_past_logs;
315
316 match path {
317 FuturePath::Error(err) => Err(err),
318 FuturePath::Block { last, next } => {
319 let mut ethereum_hashes: Vec<H256> = Vec::new();
320 for n in last..next {
321 let id = BlockId::Number(n.unique_saturated_into());
322 let substrate_hash = client
323 .expect_block_hash_from_id(&id)
324 .map_err(|_| internal_err(format!("Expect block number from id: {id}")))?;
325
326 let block = block_data_cache.current_block(substrate_hash).await;
327 if let Some(block) = block {
328 ethereum_hashes.push(block.header.hash())
329 }
330 }
331 Ok(FilterChanges::Hashes(ethereum_hashes))
332 }
333 FuturePath::PendingTransaction { new_hashes } => Ok(FilterChanges::Hashes(new_hashes)),
334 FuturePath::Log {
335 filter,
336 from_number,
337 current_number,
338 } => {
339 let logs = if backend.is_indexed() {
340 filter_range_logs_indexed(
341 client.as_ref(),
342 backend.log_indexer(),
343 &block_data_cache,
344 max_past_logs,
345 &filter,
346 from_number,
347 current_number,
348 )
349 .await?
350 } else {
351 filter_range_logs(
352 client.as_ref(),
353 &block_data_cache,
354 max_past_logs,
355 &filter,
356 from_number,
357 current_number,
358 )
359 .await?
360 };
361
362 Ok(FilterChanges::Logs(logs))
363 }
364 }
365 }
366
367 async fn filter_logs(&self, index: Index) -> RpcResult<Vec<Log>> {
368 let key = U256::from(index.value());
369 let pool = self.filter_pool.clone();
370
371 let filter_result: RpcResult<Filter> = (|| {
374 let pool = pool
375 .lock()
376 .map_err(|_| internal_err("Filter pool is not available."))?;
377
378 let pool_item = pool
379 .get(&key)
380 .ok_or_else(|| internal_err(format!("Filter id {key:?} does not exist.")))?;
381
382 match &pool_item.filter_type {
383 FilterType::Log(filter) => Ok(filter.clone()),
384 _ => Err(internal_err(format!(
385 "Filter id {key:?} is not a Log filter."
386 ))),
387 }
388 })();
389
390 let client = Arc::clone(&self.client);
391 let backend = Arc::clone(&self.backend);
392 let block_data_cache = Arc::clone(&self.block_data_cache);
393 let max_past_logs = self.max_past_logs;
394
395 let filter = filter_result?;
396
397 let best_number = client.info().best_number;
398 let mut current_number = filter
399 .to_block
400 .and_then(|v| v.to_min_block_num())
401 .map(|s| s.unique_saturated_into())
402 .unwrap_or(best_number);
403
404 if current_number > best_number {
405 current_number = best_number;
406 }
407
408 let from_number = filter
409 .from_block
410 .and_then(|v| v.to_min_block_num())
411 .map(|s| s.unique_saturated_into())
412 .unwrap_or(best_number);
413
414 let logs = if backend.is_indexed() {
415 filter_range_logs_indexed(
416 client.as_ref(),
417 backend.log_indexer(),
418 &block_data_cache,
419 max_past_logs,
420 &filter,
421 from_number,
422 current_number,
423 )
424 .await?
425 } else {
426 filter_range_logs(
427 client.as_ref(),
428 &block_data_cache,
429 max_past_logs,
430 &filter,
431 from_number,
432 current_number,
433 )
434 .await?
435 };
436 Ok(logs)
437 }
438
439 fn uninstall_filter(&self, index: Index) -> RpcResult<bool> {
440 let key = U256::from(index.value());
441 let pool = self.filter_pool.clone();
442 let response = if let Ok(locked) = &mut pool.lock() {
444 if locked.remove(&key).is_some() {
445 Ok(true)
446 } else {
447 Err(internal_err(format!("Filter id {key:?} does not exist.")))
448 }
449 } else {
450 Err(internal_err("Filter pool is not available."))
451 };
452 response
453 }
454
455 async fn logs(&self, filter: Filter) -> RpcResult<Vec<Log>> {
456 let client = Arc::clone(&self.client);
457 let block_data_cache = Arc::clone(&self.block_data_cache);
458 let backend = Arc::clone(&self.backend);
459 let max_past_logs = self.max_past_logs;
460
461 let mut logs = Vec::new();
462 if let Some(hash) = filter.block_hash {
463 let substrate_hash = match frontier_backend_client::load_hash::<B, C>(
464 client.as_ref(),
465 backend.as_ref(),
466 hash,
467 )
468 .await
469 .map_err(|err| internal_err(format!("{err:?}")))?
470 {
471 Some(hash) => hash,
472 _ => return Err(crate::err(-32000, "unknown block", None)),
473 };
474
475 let block = block_data_cache.current_block(substrate_hash).await;
476 let statuses = block_data_cache
477 .current_transaction_statuses(substrate_hash)
478 .await;
479 if let (Some(block), Some(statuses)) = (block, statuses) {
480 logs = filter_block_logs(&filter, block, statuses);
481 }
482 } else {
483 let best_number = client.info().best_number;
484 let mut current_number = filter
485 .to_block
486 .and_then(|v| v.to_min_block_num())
487 .map(|s| s.unique_saturated_into())
488 .unwrap_or(best_number);
489
490 if current_number > best_number {
491 current_number = best_number;
492 }
493
494 let from_number = filter
495 .from_block
496 .and_then(|v| v.to_min_block_num())
497 .map(|s| s.unique_saturated_into())
498 .unwrap_or(best_number);
499
500 logs = if backend.is_indexed() {
501 filter_range_logs_indexed(
502 client.as_ref(),
503 backend.log_indexer(),
504 &block_data_cache,
505 max_past_logs,
506 &filter,
507 from_number,
508 current_number,
509 )
510 .await?
511 } else {
512 filter_range_logs(
513 client.as_ref(),
514 &block_data_cache,
515 max_past_logs,
516 &filter,
517 from_number,
518 current_number,
519 )
520 .await?
521 };
522 }
523 Ok(logs)
524 }
525}
526
527async fn filter_range_logs_indexed<B, C, BE>(
528 _client: &C,
529 backend: &dyn fc_api::LogIndexerBackend<B>,
530 block_data_cache: &EthBlockDataCacheTask<B>,
531 max_past_logs: u32,
532 filter: &Filter,
533 from: NumberFor<B>,
534 to: NumberFor<B>,
535) -> RpcResult<Vec<Log>>
536where
537 B: BlockT,
538 C: ProvideRuntimeApi<B>,
539 C::Api: EthereumRuntimeRPCApi<B>,
540 C: HeaderBackend<B> + StorageProvider<B, BE> + 'static,
541 BE: Backend<B> + 'static,
542{
543 let timer_start = Instant::now();
544 let timer_prepare = Instant::now();
545
546 let max_duration = Duration::from_secs(10);
548 let begin_request = Instant::now();
549
550 let addresses = match &filter.address {
552 Some(VariadicValue::Single(item)) => vec![*item],
553 Some(VariadicValue::Multiple(items)) => items.clone(),
554 _ => vec![],
555 };
556 let topics = filter
557 .topics
558 .iter()
559 .map(|flat| match flat {
560 VariadicValue::Single(item) => vec![*item],
561 VariadicValue::Multiple(items) => items.clone(),
562 _ => vec![],
563 })
564 .collect::<Vec<Vec<H256>>>();
565
566 let time_prepare = timer_prepare.elapsed().as_millis();
567 let timer_fetch = Instant::now();
568
569 let mut logs_to_return = Vec::new();
570 if let Ok(logs) = backend
571 .filter_logs(
572 UniqueSaturatedInto::<u64>::unique_saturated_into(from),
573 UniqueSaturatedInto::<u64>::unique_saturated_into(to),
574 addresses,
575 topics,
576 )
577 .await
578 {
579 let time_fetch = timer_fetch.elapsed().as_millis();
580 let timer_post = Instant::now();
581
582 let mut statuses_cache: BTreeMap<B::Hash, Option<Vec<TransactionStatus>>> = BTreeMap::new();
583
584 for log in logs.iter() {
585 let substrate_hash = log.substrate_block_hash;
586
587 let ethereum_block_hash = log.ethereum_block_hash;
588 let block_number = log.block_number;
589 let db_transaction_index = log.transaction_index;
590 let db_log_index = log.log_index;
591
592 let statuses = if let Some(statuses) = statuses_cache.get(&log.substrate_block_hash) {
593 statuses.clone()
594 } else {
595 let statuses = block_data_cache
596 .current_transaction_statuses(substrate_hash)
597 .await;
598 statuses_cache.insert(log.substrate_block_hash, statuses.clone());
599 statuses
600 };
601 if let Some(statuses) = statuses {
602 let mut block_log_index: u32 = 0;
603 for status in statuses.iter() {
604 let mut transaction_log_index: u32 = 0;
605 let transaction_hash = status.transaction_hash;
606 let transaction_index = status.transaction_index;
607 for ethereum_log in &status.logs {
608 if transaction_index == db_transaction_index
609 && transaction_log_index == db_log_index
610 {
611 logs_to_return.push(Log {
612 address: ethereum_log.address,
613 topics: ethereum_log.topics.clone(),
614 data: Bytes(ethereum_log.data.clone()),
615 block_hash: Some(ethereum_block_hash),
616 block_number: Some(U256::from(block_number)),
617 transaction_hash: Some(transaction_hash),
618 transaction_index: Some(U256::from(transaction_index)),
619 log_index: Some(U256::from(block_log_index)),
620 transaction_log_index: Some(U256::from(transaction_log_index)),
621 removed: false,
622 });
623 }
624 transaction_log_index += 1;
625 block_log_index += 1;
626 }
627 }
628 }
629 if logs_to_return.len() as u32 > max_past_logs {
631 return Err(internal_err(format!(
632 "query returned more than {max_past_logs} results",
633 )));
634 }
635 if begin_request.elapsed() > max_duration {
636 return Err(internal_err(format!(
637 "query timeout of {} seconds exceeded",
638 max_duration.as_secs()
639 )));
640 }
641 }
642
643 let time_post = timer_post.elapsed().as_millis();
644
645 log::info!(
646 target: "frontier-sql",
647 "OUTER-TIMER fetch={time_fetch}, post={time_post}"
648 );
649 }
650
651 log::info!(
652 target: "frontier-sql",
653 "OUTER-TIMER start={}, prepare={}, all_fetch = {}",
654 timer_start.elapsed().as_millis(),
655 time_prepare,
656 timer_fetch.elapsed().as_millis(),
657 );
658 Ok(logs_to_return)
659}
660
661async fn filter_range_logs<B, C, BE>(
662 client: &C,
663 block_data_cache: &EthBlockDataCacheTask<B>,
664 max_past_logs: u32,
665 filter: &Filter,
666 from: NumberFor<B>,
667 to: NumberFor<B>,
668) -> RpcResult<Vec<Log>>
669where
670 B: BlockT,
671 C: ProvideRuntimeApi<B>,
672 C::Api: EthereumRuntimeRPCApi<B>,
673 C: HeaderBackend<B> + StorageProvider<B, BE> + 'static,
674 BE: Backend<B> + 'static,
675{
676 let max_duration = Duration::from_secs(10);
678 let begin_request = Instant::now();
679
680 let mut current_number = from;
681
682 let address_bloom_filter = FilteredParams::address_bloom_filter(&filter.address);
684 let topics_bloom_filter = FilteredParams::topics_bloom_filter(&filter.topics);
685
686 let mut logs = Vec::new();
687 while current_number <= to {
688 let id = BlockId::Number(current_number);
689 let substrate_hash = client
690 .expect_block_hash_from_id(&id)
691 .map_err(|_| internal_err(format!("Expect block number from id: {id}")))?;
692
693 let block = block_data_cache.current_block(substrate_hash).await;
694
695 if let Some(block) = block {
696 if FilteredParams::address_in_bloom(block.header.logs_bloom, &address_bloom_filter)
697 && FilteredParams::topics_in_bloom(block.header.logs_bloom, &topics_bloom_filter)
698 {
699 let statuses = block_data_cache
700 .current_transaction_statuses(substrate_hash)
701 .await;
702 if let Some(statuses) = statuses {
703 logs.extend(filter_block_logs(filter, block, statuses));
704 }
705 }
706 }
707 if logs.len() as u32 > max_past_logs {
709 return Err(internal_err(format!(
710 "query returned more than {max_past_logs} results"
711 )));
712 }
713 if begin_request.elapsed() > max_duration {
714 return Err(internal_err(format!(
715 "query timeout of {} seconds exceeded",
716 max_duration.as_secs()
717 )));
718 }
719 if current_number == to {
720 break;
721 } else {
722 current_number = current_number.saturating_add(One::one());
723 }
724 }
725 Ok(logs)
726}
727
728pub(crate) fn filter_block_logs(
729 filter: &Filter,
730 block: EthereumBlock,
731 transaction_statuses: Vec<TransactionStatus>,
732) -> Vec<Log> {
733 let params = FilteredParams::new(filter.clone());
734 let mut block_log_index: u32 = 0;
735 let block_hash = H256::from(keccak_256(&rlp::encode(&block.header)));
736
737 let mut logs = Vec::new();
738 for status in transaction_statuses.iter() {
739 let mut transaction_log_index: u32 = 0;
740 let transaction_hash = status.transaction_hash;
741 for ethereum_log in &status.logs {
742 let mut log = Log {
743 address: ethereum_log.address,
744 topics: ethereum_log.topics.clone(),
745 data: Bytes(ethereum_log.data.clone()),
746 block_hash: None,
747 block_number: None,
748 transaction_hash: None,
749 transaction_index: None,
750 log_index: None,
751 transaction_log_index: None,
752 removed: false,
753 };
754
755 let topics_match = filter.topics.is_empty() || params.filter_topics(&log.topics);
756 let address_match = filter
757 .address
758 .as_ref()
759 .is_none_or(|_| params.filter_address(&log.address));
760 if topics_match && address_match {
761 log.block_hash = Some(block_hash);
762 log.block_number = Some(block.header.number);
763 log.transaction_hash = Some(transaction_hash);
764 log.transaction_index = Some(U256::from(status.transaction_index));
765 log.log_index = Some(U256::from(block_log_index));
766 log.transaction_log_index = Some(U256::from(transaction_log_index));
767 logs.push(log);
768 }
769 transaction_log_index += 1;
770 block_log_index += 1;
771 }
772 }
773 logs
774}