1use std::{
20 collections::{BTreeMap, HashSet},
21 marker::PhantomData,
22 sync::Arc,
23 time::{Duration, Instant},
24};
25
26use ethereum::BlockV3 as EthereumBlock;
27use ethereum_types::{H256, U256};
28use jsonrpsee::core::{async_trait, RpcResult};
29use sc_client_api::backend::{Backend, StorageProvider};
31use sc_transaction_pool_api::{InPoolTransaction, TransactionPool};
32use sp_api::ProvideRuntimeApi;
33use sp_blockchain::HeaderBackend;
34use sp_core::hashing::keccak_256;
35use sp_runtime::{
36 generic::BlockId,
37 traits::{Block as BlockT, NumberFor, One, Saturating, UniqueSaturatedInto},
38};
39use fc_rpc_core::{types::*, EthFilterApiServer};
41use fp_rpc::{EthereumRuntimeRPCApi, TransactionStatus};
42
43use crate::{cache::EthBlockDataCacheTask, frontier_backend_client, internal_err};
44
45pub struct EthFilter<B: BlockT, C, BE, P> {
46 client: Arc<C>,
47 backend: Arc<dyn fc_api::Backend<B>>,
48 graph: Arc<P>,
49 filter_pool: FilterPool,
50 max_stored_filters: usize,
51 max_past_logs: u32,
52 block_data_cache: Arc<EthBlockDataCacheTask<B>>,
53 _marker: PhantomData<BE>,
54}
55
56impl<B: BlockT, C, BE, P: TransactionPool> EthFilter<B, C, BE, P> {
57 pub fn new(
58 client: Arc<C>,
59 backend: Arc<dyn fc_api::Backend<B>>,
60 graph: Arc<P>,
61 filter_pool: FilterPool,
62 max_stored_filters: usize,
63 max_past_logs: u32,
64 block_data_cache: Arc<EthBlockDataCacheTask<B>>,
65 ) -> Self {
66 Self {
67 client,
68 backend,
69 graph,
70 filter_pool,
71 max_stored_filters,
72 max_past_logs,
73 block_data_cache,
74 _marker: PhantomData,
75 }
76 }
77}
78
79impl<B, C, BE, P> EthFilter<B, C, BE, P>
80where
81 B: BlockT,
82 C: ProvideRuntimeApi<B>,
83 C::Api: EthereumRuntimeRPCApi<B>,
84 C: HeaderBackend<B> + 'static,
85 P: TransactionPool<Block = B, Hash = B::Hash> + 'static,
86{
87 fn create_filter(&self, filter_type: FilterType) -> RpcResult<U256> {
88 let info = self.client.info();
89 let best_hash = info.best_hash;
90 let best_number = UniqueSaturatedInto::<u64>::unique_saturated_into(info.best_number);
91 let pool = self.filter_pool.clone();
92 let response = if let Ok(locked) = &mut pool.lock() {
93 if locked.len() >= self.max_stored_filters {
94 return Err(internal_err(format!(
95 "Filter pool is full (limit {:?}).",
96 self.max_stored_filters
97 )));
98 }
99 let next_back = {
100 let mut iter = locked.iter();
101 iter.next_back()
102 };
103 let last_key = match next_back {
104 Some((k, _)) => *k,
105 None => U256::zero(),
106 };
107
108 let pending_transaction_hashes = if let FilterType::PendingTransaction = filter_type {
109 let txs_ready = self
110 .graph
111 .ready()
112 .map(|in_pool_tx| in_pool_tx.data().as_ref().clone())
113 .collect();
114 let api = self.client.runtime_api();
116 api.extrinsic_filter(best_hash, txs_ready)
117 .map_err(|err| {
118 internal_err(format!("fetch ready transactions failed: {err:?}"))
119 })?
120 .into_iter()
121 .map(|tx| tx.hash())
122 .collect::<HashSet<_>>()
123 } else {
124 HashSet::new()
125 };
126
127 let key = last_key.checked_add(U256::one()).unwrap();
129 locked.insert(
130 key,
131 FilterPoolItem {
132 last_poll: BlockNumberOrHash::Num(best_number),
133 filter_type,
134 at_block: best_number,
135 pending_transaction_hashes,
136 },
137 );
138 Ok(key)
139 } else {
140 Err(internal_err("Filter pool is not available."))
141 };
142 response
143 }
144}
145
146#[async_trait]
147impl<B, C, BE, P> EthFilterApiServer for EthFilter<B, C, BE, P>
148where
149 B: BlockT,
150 C: ProvideRuntimeApi<B>,
151 C::Api: EthereumRuntimeRPCApi<B>,
152 C: HeaderBackend<B> + StorageProvider<B, BE> + 'static,
153 BE: Backend<B> + 'static,
154 P: TransactionPool<Block = B, Hash = B::Hash> + 'static,
155{
156 fn new_filter(&self, filter: Filter) -> RpcResult<U256> {
157 self.create_filter(FilterType::Log(filter))
158 }
159
160 fn new_block_filter(&self) -> RpcResult<U256> {
161 self.create_filter(FilterType::Block)
162 }
163
164 fn new_pending_transaction_filter(&self) -> RpcResult<U256> {
165 self.create_filter(FilterType::PendingTransaction)
166 }
167
168 async fn filter_changes(&self, index: Index) -> RpcResult<FilterChanges> {
169 enum FuturePath<B: BlockT> {
178 Block {
179 last: u64,
180 next: u64,
181 },
182 PendingTransaction {
183 new_hashes: Vec<H256>,
184 },
185 Log {
186 filter: Filter,
187 from_number: NumberFor<B>,
188 current_number: NumberFor<B>,
189 },
190 Error(jsonrpsee::types::ErrorObjectOwned),
191 }
192
193 let key = U256::from(index.value());
194 let info = self.client.info();
195 let best_hash = info.best_hash;
196 let best_number = UniqueSaturatedInto::<u64>::unique_saturated_into(info.best_number);
197 let pool = self.filter_pool.clone();
198 let path = if let Ok(locked) = &mut pool.lock() {
200 if let Some(pool_item) = locked.get(&key).cloned() {
202 match &pool_item.filter_type {
203 FilterType::Block => {
205 let last = pool_item.last_poll.to_min_block_num().unwrap();
206 let next = best_number + 1;
207 locked.insert(
209 key,
210 FilterPoolItem {
211 last_poll: BlockNumberOrHash::Num(next),
212 filter_type: pool_item.filter_type.clone(),
213 at_block: pool_item.at_block,
214 pending_transaction_hashes: HashSet::new(),
215 },
216 );
217
218 FuturePath::<B>::Block { last, next }
219 }
220 FilterType::PendingTransaction => {
221 let previous_hashes = pool_item.pending_transaction_hashes;
222 let txs_ready = self
223 .graph
224 .ready()
225 .map(|in_pool_tx| in_pool_tx.data().as_ref().clone())
226 .collect();
227 let api = self.client.runtime_api();
229 let current_hashes = api
230 .extrinsic_filter(best_hash, txs_ready)
231 .map_err(|err| {
232 internal_err(format!("fetch ready transactions failed: {err:?}"))
233 })?
234 .into_iter()
235 .map(|tx| tx.hash())
236 .collect::<HashSet<_>>();
237
238 locked.insert(
240 key,
241 FilterPoolItem {
242 last_poll: BlockNumberOrHash::Num(best_number + 1),
243 filter_type: pool_item.filter_type.clone(),
244 at_block: pool_item.at_block,
245 pending_transaction_hashes: current_hashes.clone(),
246 },
247 );
248
249 let mew_hashes = current_hashes
250 .difference(&previous_hashes)
251 .collect::<HashSet<&H256>>();
252 FuturePath::PendingTransaction {
253 new_hashes: mew_hashes.into_iter().copied().collect(),
254 }
255 }
256 FilterType::Log(filter) => {
258 locked.insert(
260 key,
261 FilterPoolItem {
262 last_poll: BlockNumberOrHash::Num(best_number + 1),
263 filter_type: pool_item.filter_type.clone(),
264 at_block: pool_item.at_block,
265 pending_transaction_hashes: HashSet::new(),
266 },
267 );
268
269 let best_number = self.client.info().best_number;
271 let mut current_number = filter
272 .to_block
273 .and_then(|v| v.to_min_block_num())
274 .map(|s| s.unique_saturated_into())
275 .unwrap_or(best_number);
276
277 if current_number > best_number {
278 current_number = best_number;
279 }
280
281 let last_poll = pool_item
283 .last_poll
284 .to_min_block_num()
285 .unwrap()
286 .unique_saturated_into();
287
288 let filter_from = filter
289 .from_block
290 .and_then(|v| v.to_min_block_num())
291 .map(|s| s.unique_saturated_into())
292 .unwrap_or(last_poll);
293
294 let from_number = std::cmp::max(last_poll, filter_from);
295
296 FuturePath::Log {
298 filter: filter.clone(),
299 from_number,
300 current_number,
301 }
302 }
303 }
304 } else {
305 FuturePath::Error(internal_err(format!("Filter id {key:?} does not exist.")))
306 }
307 } else {
308 FuturePath::Error(internal_err("Filter pool is not available."))
309 };
310
311 let client = Arc::clone(&self.client);
312 let backend = Arc::clone(&self.backend);
313 let block_data_cache = Arc::clone(&self.block_data_cache);
314 let max_past_logs = self.max_past_logs;
315
316 match path {
317 FuturePath::Error(err) => Err(err),
318 FuturePath::Block { last, next } => {
319 let mut ethereum_hashes: Vec<H256> = Vec::new();
320 for n in last..next {
321 let id = BlockId::Number(n.unique_saturated_into());
322 let substrate_hash = client
323 .expect_block_hash_from_id(&id)
324 .map_err(|_| internal_err(format!("Expect block number from id: {id}")))?;
325
326 let block = block_data_cache.current_block(substrate_hash).await;
327 if let Some(block) = block {
328 ethereum_hashes.push(block.header.hash())
329 }
330 }
331 Ok(FilterChanges::Hashes(ethereum_hashes))
332 }
333 FuturePath::PendingTransaction { new_hashes } => Ok(FilterChanges::Hashes(new_hashes)),
334 FuturePath::Log {
335 filter,
336 from_number,
337 current_number,
338 } => {
339 let mut ret: Vec<Log> = Vec::new();
340 if backend.is_indexed() {
341 let _ = filter_range_logs_indexed(
342 client.as_ref(),
343 backend.log_indexer(),
344 &block_data_cache,
345 &mut ret,
346 max_past_logs,
347 &filter,
348 from_number,
349 current_number,
350 )
351 .await?;
352 } else {
353 let _ = filter_range_logs(
354 client.as_ref(),
355 &block_data_cache,
356 &mut ret,
357 max_past_logs,
358 &filter,
359 from_number,
360 current_number,
361 )
362 .await?;
363 }
364
365 Ok(FilterChanges::Logs(ret))
366 }
367 }
368 }
369
370 async fn filter_logs(&self, index: Index) -> RpcResult<Vec<Log>> {
371 let key = U256::from(index.value());
372 let pool = self.filter_pool.clone();
373
374 let filter_result: RpcResult<Filter> = (|| {
377 let pool = pool
378 .lock()
379 .map_err(|_| internal_err("Filter pool is not available."))?;
380
381 let pool_item = pool
382 .get(&key)
383 .ok_or_else(|| internal_err(format!("Filter id {key:?} does not exist.")))?;
384
385 match &pool_item.filter_type {
386 FilterType::Log(filter) => Ok(filter.clone()),
387 _ => Err(internal_err(format!(
388 "Filter id {key:?} is not a Log filter."
389 ))),
390 }
391 })();
392
393 let client = Arc::clone(&self.client);
394 let backend = Arc::clone(&self.backend);
395 let block_data_cache = Arc::clone(&self.block_data_cache);
396 let max_past_logs = self.max_past_logs;
397
398 let filter = filter_result?;
399
400 let best_number = client.info().best_number;
401 let mut current_number = filter
402 .to_block
403 .and_then(|v| v.to_min_block_num())
404 .map(|s| s.unique_saturated_into())
405 .unwrap_or(best_number);
406
407 if current_number > best_number {
408 current_number = best_number;
409 }
410
411 let from_number = filter
412 .from_block
413 .and_then(|v| v.to_min_block_num())
414 .map(|s| s.unique_saturated_into())
415 .unwrap_or(best_number);
416
417 let mut ret: Vec<Log> = Vec::new();
418 if backend.is_indexed() {
419 let _ = filter_range_logs_indexed(
420 client.as_ref(),
421 backend.log_indexer(),
422 &block_data_cache,
423 &mut ret,
424 max_past_logs,
425 &filter,
426 from_number,
427 current_number,
428 )
429 .await?;
430 } else {
431 let _ = filter_range_logs(
432 client.as_ref(),
433 &block_data_cache,
434 &mut ret,
435 max_past_logs,
436 &filter,
437 from_number,
438 current_number,
439 )
440 .await?;
441 }
442 Ok(ret)
443 }
444
445 fn uninstall_filter(&self, index: Index) -> RpcResult<bool> {
446 let key = U256::from(index.value());
447 let pool = self.filter_pool.clone();
448 let response = if let Ok(locked) = &mut pool.lock() {
450 if locked.remove(&key).is_some() {
451 Ok(true)
452 } else {
453 Err(internal_err(format!("Filter id {key:?} does not exist.")))
454 }
455 } else {
456 Err(internal_err("Filter pool is not available."))
457 };
458 response
459 }
460
461 async fn logs(&self, filter: Filter) -> RpcResult<Vec<Log>> {
462 let client = Arc::clone(&self.client);
463 let block_data_cache = Arc::clone(&self.block_data_cache);
464 let backend = Arc::clone(&self.backend);
465 let max_past_logs = self.max_past_logs;
466
467 let mut ret: Vec<Log> = Vec::new();
468 if let Some(hash) = filter.block_hash {
469 let substrate_hash = match frontier_backend_client::load_hash::<B, C>(
470 client.as_ref(),
471 backend.as_ref(),
472 hash,
473 )
474 .await
475 .map_err(|err| internal_err(format!("{err:?}")))?
476 {
477 Some(hash) => hash,
478 _ => return Err(crate::err(-32000, "unknown block", None)),
479 };
480
481 let block = block_data_cache.current_block(substrate_hash).await;
482 let statuses = block_data_cache
483 .current_transaction_statuses(substrate_hash)
484 .await;
485 if let (Some(block), Some(statuses)) = (block, statuses) {
486 filter_block_logs(&mut ret, &filter, block, statuses);
487 }
488 } else {
489 let best_number = client.info().best_number;
490 let mut current_number = filter
491 .to_block
492 .and_then(|v| v.to_min_block_num())
493 .map(|s| s.unique_saturated_into())
494 .unwrap_or(best_number);
495
496 if current_number > best_number {
497 current_number = best_number;
498 }
499
500 let from_number = filter
501 .from_block
502 .and_then(|v| v.to_min_block_num())
503 .map(|s| s.unique_saturated_into())
504 .unwrap_or(best_number);
505
506 if backend.is_indexed() {
507 let _ = filter_range_logs_indexed(
508 client.as_ref(),
509 backend.log_indexer(),
510 &block_data_cache,
511 &mut ret,
512 max_past_logs,
513 &filter,
514 from_number,
515 current_number,
516 )
517 .await?;
518 } else {
519 let _ = filter_range_logs(
520 client.as_ref(),
521 &block_data_cache,
522 &mut ret,
523 max_past_logs,
524 &filter,
525 from_number,
526 current_number,
527 )
528 .await?;
529 }
530 }
531 Ok(ret)
532 }
533}
534
535async fn filter_range_logs_indexed<B, C, BE>(
536 _client: &C,
537 backend: &dyn fc_api::LogIndexerBackend<B>,
538 block_data_cache: &EthBlockDataCacheTask<B>,
539 ret: &mut Vec<Log>,
540 max_past_logs: u32,
541 filter: &Filter,
542 from: NumberFor<B>,
543 to: NumberFor<B>,
544) -> RpcResult<()>
545where
546 B: BlockT,
547 C: ProvideRuntimeApi<B>,
548 C::Api: EthereumRuntimeRPCApi<B>,
549 C: HeaderBackend<B> + StorageProvider<B, BE> + 'static,
550 BE: Backend<B> + 'static,
551{
552 let timer_start = Instant::now();
553 let timer_prepare = Instant::now();
554
555 let max_duration = Duration::from_secs(10);
557 let begin_request = Instant::now();
558
559 let topics_input = if filter.topics.is_some() {
560 let filtered_params = FilteredParams::new(Some(filter.clone()));
561 Some(filtered_params.flat_topics)
562 } else {
563 None
564 };
565
566 let addresses = match &filter.address {
568 Some(VariadicValue::Single(item)) => vec![*item],
569 Some(VariadicValue::Multiple(items)) => items.clone(),
570 _ => vec![],
571 };
572 let topics = topics_input
573 .unwrap_or_default()
574 .iter()
575 .map(|flat| match flat {
576 VariadicValue::Single(item) => vec![*item],
577 VariadicValue::Multiple(items) => items.clone(),
578 _ => vec![],
579 })
580 .collect::<Vec<Vec<Option<H256>>>>();
581
582 let time_prepare = timer_prepare.elapsed().as_millis();
583 let timer_fetch = Instant::now();
584 if let Ok(logs) = backend
585 .filter_logs(
586 UniqueSaturatedInto::<u64>::unique_saturated_into(from),
587 UniqueSaturatedInto::<u64>::unique_saturated_into(to),
588 addresses,
589 topics,
590 )
591 .await
592 {
593 let time_fetch = timer_fetch.elapsed().as_millis();
594 let timer_post = Instant::now();
595
596 let mut statuses_cache: BTreeMap<B::Hash, Option<Vec<TransactionStatus>>> = BTreeMap::new();
597
598 for log in logs.iter() {
599 let substrate_hash = log.substrate_block_hash;
600
601 let ethereum_block_hash = log.ethereum_block_hash;
602 let block_number = log.block_number;
603 let db_transaction_index = log.transaction_index;
604 let db_log_index = log.log_index;
605
606 let statuses = if let Some(statuses) = statuses_cache.get(&log.substrate_block_hash) {
607 statuses.clone()
608 } else {
609 let statuses = block_data_cache
610 .current_transaction_statuses(substrate_hash)
611 .await;
612 statuses_cache.insert(log.substrate_block_hash, statuses.clone());
613 statuses
614 };
615 if let Some(statuses) = statuses {
616 let mut block_log_index: u32 = 0;
617 for status in statuses.iter() {
618 let mut transaction_log_index: u32 = 0;
619 let transaction_hash = status.transaction_hash;
620 let transaction_index = status.transaction_index;
621 for ethereum_log in &status.logs {
622 if transaction_index == db_transaction_index
623 && transaction_log_index == db_log_index
624 {
625 ret.push(Log {
626 address: ethereum_log.address,
627 topics: ethereum_log.topics.clone(),
628 data: Bytes(ethereum_log.data.clone()),
629 block_hash: Some(ethereum_block_hash),
630 block_number: Some(U256::from(block_number)),
631 transaction_hash: Some(transaction_hash),
632 transaction_index: Some(U256::from(transaction_index)),
633 log_index: Some(U256::from(block_log_index)),
634 transaction_log_index: Some(U256::from(transaction_log_index)),
635 removed: false,
636 });
637 }
638 transaction_log_index += 1;
639 block_log_index += 1;
640 }
641 }
642 }
643 if ret.len() as u32 > max_past_logs {
645 return Err(internal_err(format!(
646 "query returned more than {max_past_logs} results",
647 )));
648 }
649 if begin_request.elapsed() > max_duration {
650 return Err(internal_err(format!(
651 "query timeout of {} seconds exceeded",
652 max_duration.as_secs()
653 )));
654 }
655 }
656
657 let time_post = timer_post.elapsed().as_millis();
658
659 log::info!(
660 target: "frontier-sql",
661 "OUTER-TIMER fetch={time_fetch}, post={time_post}"
662 );
663 }
664
665 log::info!(
666 target: "frontier-sql",
667 "OUTER-TIMER start={}, prepare={}, all_fetch = {}",
668 timer_start.elapsed().as_millis(),
669 time_prepare,
670 timer_fetch.elapsed().as_millis(),
671 );
672 Ok(())
673}
674
675async fn filter_range_logs<B, C, BE>(
676 client: &C,
677 block_data_cache: &EthBlockDataCacheTask<B>,
678 ret: &mut Vec<Log>,
679 max_past_logs: u32,
680 filter: &Filter,
681 from: NumberFor<B>,
682 to: NumberFor<B>,
683) -> RpcResult<()>
684where
685 B: BlockT,
686 C: ProvideRuntimeApi<B>,
687 C::Api: EthereumRuntimeRPCApi<B>,
688 C: HeaderBackend<B> + StorageProvider<B, BE> + 'static,
689 BE: Backend<B> + 'static,
690{
691 let max_duration = Duration::from_secs(10);
693 let begin_request = Instant::now();
694
695 let mut current_number = from;
696
697 let topics_input = if filter.topics.is_some() {
699 let filtered_params = FilteredParams::new(Some(filter.clone()));
700 Some(filtered_params.flat_topics)
701 } else {
702 None
703 };
704 let address_bloom_filter = FilteredParams::address_bloom_filter(&filter.address);
705 let topics_bloom_filter = FilteredParams::topics_bloom_filter(&topics_input);
706
707 while current_number <= to {
708 let id = BlockId::Number(current_number);
709 let substrate_hash = client
710 .expect_block_hash_from_id(&id)
711 .map_err(|_| internal_err(format!("Expect block number from id: {id}")))?;
712
713 let block = block_data_cache.current_block(substrate_hash).await;
714
715 if let Some(block) = block {
716 if FilteredParams::address_in_bloom(block.header.logs_bloom, &address_bloom_filter)
717 && FilteredParams::topics_in_bloom(block.header.logs_bloom, &topics_bloom_filter)
718 {
719 let statuses = block_data_cache
720 .current_transaction_statuses(substrate_hash)
721 .await;
722 if let Some(statuses) = statuses {
723 filter_block_logs(ret, filter, block, statuses);
724 }
725 }
726 }
727 if ret.len() as u32 > max_past_logs {
729 return Err(internal_err(format!(
730 "query returned more than {max_past_logs} results"
731 )));
732 }
733 if begin_request.elapsed() > max_duration {
734 return Err(internal_err(format!(
735 "query timeout of {} seconds exceeded",
736 max_duration.as_secs()
737 )));
738 }
739 if current_number == to {
740 break;
741 } else {
742 current_number = current_number.saturating_add(One::one());
743 }
744 }
745 Ok(())
746}
747
748fn filter_block_logs<'a>(
749 ret: &'a mut Vec<Log>,
750 filter: &'a Filter,
751 block: EthereumBlock,
752 transaction_statuses: Vec<TransactionStatus>,
753) -> &'a Vec<Log> {
754 let params = FilteredParams::new(Some(filter.clone()));
755 let mut block_log_index: u32 = 0;
756 let block_hash = H256::from(keccak_256(&rlp::encode(&block.header)));
757 for status in transaction_statuses.iter() {
758 let mut transaction_log_index: u32 = 0;
759 let transaction_hash = status.transaction_hash;
760 for ethereum_log in &status.logs {
761 let mut log = Log {
762 address: ethereum_log.address,
763 topics: ethereum_log.topics.clone(),
764 data: Bytes(ethereum_log.data.clone()),
765 block_hash: None,
766 block_number: None,
767 transaction_hash: None,
768 transaction_index: None,
769 log_index: None,
770 transaction_log_index: None,
771 removed: false,
772 };
773 let mut add: bool = true;
774 match (filter.address.clone(), filter.topics.clone()) {
775 (Some(_), Some(_)) => {
776 if !params.filter_address(&log.address) || !params.filter_topics(&log.topics) {
777 add = false;
778 }
779 }
780 (Some(_), _) => {
781 if !params.filter_address(&log.address) {
782 add = false;
783 }
784 }
785 (_, Some(_)) => {
786 if !params.filter_topics(&log.topics) {
787 add = false;
788 }
789 }
790 _ => {}
791 }
792 if add {
793 log.block_hash = Some(block_hash);
794 log.block_number = Some(block.header.number);
795 log.transaction_hash = Some(transaction_hash);
796 log.transaction_index = Some(U256::from(status.transaction_index));
797 log.log_index = Some(U256::from(block_log_index));
798 log.transaction_log_index = Some(U256::from(transaction_log_index));
799 ret.push(log);
800 }
801 transaction_log_index += 1;
802 block_log_index += 1;
803 }
804 }
805 ret
806}