fc_rpc/eth/
filter.rs

1// This file is part of Frontier.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use std::{
20	collections::{BTreeMap, HashSet},
21	marker::PhantomData,
22	sync::Arc,
23	time::{Duration, Instant},
24};
25
26use ethereum::BlockV3 as EthereumBlock;
27use ethereum_types::{H256, U256};
28use jsonrpsee::core::{async_trait, RpcResult};
29// Substrate
30use sc_client_api::backend::{Backend, StorageProvider};
31use sc_transaction_pool_api::{InPoolTransaction, TransactionPool};
32use sp_api::ProvideRuntimeApi;
33use sp_blockchain::HeaderBackend;
34use sp_core::hashing::keccak_256;
35use sp_runtime::{
36	generic::BlockId,
37	traits::{Block as BlockT, NumberFor, One, Saturating, UniqueSaturatedInto},
38};
39// Frontier
40use fc_rpc_core::{types::*, EthFilterApiServer};
41use fp_rpc::{EthereumRuntimeRPCApi, TransactionStatus};
42
43use crate::{cache::EthBlockDataCacheTask, frontier_backend_client, internal_err};
44
45pub struct EthFilter<B: BlockT, C, BE, P> {
46	client: Arc<C>,
47	backend: Arc<dyn fc_api::Backend<B>>,
48	graph: Arc<P>,
49	filter_pool: FilterPool,
50	max_stored_filters: usize,
51	max_past_logs: u32,
52	block_data_cache: Arc<EthBlockDataCacheTask<B>>,
53	_marker: PhantomData<BE>,
54}
55
56impl<B: BlockT, C, BE, P: TransactionPool> EthFilter<B, C, BE, P> {
57	pub fn new(
58		client: Arc<C>,
59		backend: Arc<dyn fc_api::Backend<B>>,
60		graph: Arc<P>,
61		filter_pool: FilterPool,
62		max_stored_filters: usize,
63		max_past_logs: u32,
64		block_data_cache: Arc<EthBlockDataCacheTask<B>>,
65	) -> Self {
66		Self {
67			client,
68			backend,
69			graph,
70			filter_pool,
71			max_stored_filters,
72			max_past_logs,
73			block_data_cache,
74			_marker: PhantomData,
75		}
76	}
77}
78
79impl<B, C, BE, P> EthFilter<B, C, BE, P>
80where
81	B: BlockT,
82	C: ProvideRuntimeApi<B>,
83	C::Api: EthereumRuntimeRPCApi<B>,
84	C: HeaderBackend<B> + 'static,
85	P: TransactionPool<Block = B, Hash = B::Hash> + 'static,
86{
87	fn create_filter(&self, filter_type: FilterType) -> RpcResult<U256> {
88		let info = self.client.info();
89		let best_hash = info.best_hash;
90		let best_number = UniqueSaturatedInto::<u64>::unique_saturated_into(info.best_number);
91		let pool = self.filter_pool.clone();
92		let response = if let Ok(locked) = &mut pool.lock() {
93			if locked.len() >= self.max_stored_filters {
94				return Err(internal_err(format!(
95					"Filter pool is full (limit {:?}).",
96					self.max_stored_filters
97				)));
98			}
99			let next_back = {
100				let mut iter = locked.iter();
101				iter.next_back()
102			};
103			let last_key = match next_back {
104				Some((k, _)) => *k,
105				None => U256::zero(),
106			};
107
108			let pending_transaction_hashes = if let FilterType::PendingTransaction = filter_type {
109				let txs_ready = self
110					.graph
111					.ready()
112					.map(|in_pool_tx| in_pool_tx.data().as_ref().clone())
113					.collect();
114				// Use the runtime to match the (here) opaque extrinsics against ethereum transactions.
115				let api = self.client.runtime_api();
116				api.extrinsic_filter(best_hash, txs_ready)
117					.map_err(|err| {
118						internal_err(format!("fetch ready transactions failed: {err:?}"))
119					})?
120					.into_iter()
121					.map(|tx| tx.hash())
122					.collect::<HashSet<_>>()
123			} else {
124				HashSet::new()
125			};
126
127			// Assume `max_stored_filters` is always < U256::max.
128			let key = last_key.checked_add(U256::one()).unwrap();
129			locked.insert(
130				key,
131				FilterPoolItem {
132					last_poll: BlockNumberOrHash::Num(best_number),
133					filter_type,
134					at_block: best_number,
135					pending_transaction_hashes,
136				},
137			);
138			Ok(key)
139		} else {
140			Err(internal_err("Filter pool is not available."))
141		};
142		response
143	}
144}
145
146#[async_trait]
147impl<B, C, BE, P> EthFilterApiServer for EthFilter<B, C, BE, P>
148where
149	B: BlockT,
150	C: ProvideRuntimeApi<B>,
151	C::Api: EthereumRuntimeRPCApi<B>,
152	C: HeaderBackend<B> + StorageProvider<B, BE> + 'static,
153	BE: Backend<B> + 'static,
154	P: TransactionPool<Block = B, Hash = B::Hash> + 'static,
155{
156	fn new_filter(&self, filter: Filter) -> RpcResult<U256> {
157		self.create_filter(FilterType::Log(filter))
158	}
159
160	fn new_block_filter(&self) -> RpcResult<U256> {
161		self.create_filter(FilterType::Block)
162	}
163
164	fn new_pending_transaction_filter(&self) -> RpcResult<U256> {
165		self.create_filter(FilterType::PendingTransaction)
166	}
167
168	async fn filter_changes(&self, index: Index) -> RpcResult<FilterChanges> {
169		// There are multiple branches that needs to return async blocks.
170		// Also, each branch need to (synchronously) do stuff with the pool
171		// (behind a lock), and the lock should be released before entering
172		// an async block.
173		//
174		// To avoid issues with multiple async blocks (having different
175		// anonymous types) we collect all necessary data in this enum then have
176		// a single async block.
177		enum FuturePath<B: BlockT> {
178			Block {
179				last: u64,
180				next: u64,
181			},
182			PendingTransaction {
183				new_hashes: Vec<H256>,
184			},
185			Log {
186				filter: Filter,
187				from_number: NumberFor<B>,
188				current_number: NumberFor<B>,
189			},
190			Error(jsonrpsee::types::ErrorObjectOwned),
191		}
192
193		let key = U256::from(index.value());
194		let info = self.client.info();
195		let best_hash = info.best_hash;
196		let best_number = UniqueSaturatedInto::<u64>::unique_saturated_into(info.best_number);
197		let pool = self.filter_pool.clone();
198		// Try to lock.
199		let path = if let Ok(locked) = &mut pool.lock() {
200			// Try to get key.
201			if let Some(pool_item) = locked.get(&key).cloned() {
202				match &pool_item.filter_type {
203					// For each block created since last poll, get a vector of ethereum hashes.
204					FilterType::Block => {
205						let last = pool_item.last_poll.to_min_block_num().unwrap();
206						let next = best_number + 1;
207						// Update filter `last_poll`.
208						locked.insert(
209							key,
210							FilterPoolItem {
211								last_poll: BlockNumberOrHash::Num(next),
212								filter_type: pool_item.filter_type.clone(),
213								at_block: pool_item.at_block,
214								pending_transaction_hashes: HashSet::new(),
215							},
216						);
217
218						FuturePath::<B>::Block { last, next }
219					}
220					FilterType::PendingTransaction => {
221						let previous_hashes = pool_item.pending_transaction_hashes;
222						let txs_ready = self
223							.graph
224							.ready()
225							.map(|in_pool_tx| in_pool_tx.data().as_ref().clone())
226							.collect();
227						// Use the runtime to match the (here) opaque extrinsics against ethereum transactions.
228						let api = self.client.runtime_api();
229						let current_hashes = api
230							.extrinsic_filter(best_hash, txs_ready)
231							.map_err(|err| {
232								internal_err(format!("fetch ready transactions failed: {err:?}"))
233							})?
234							.into_iter()
235							.map(|tx| tx.hash())
236							.collect::<HashSet<_>>();
237
238						// Update filter `last_poll`.
239						locked.insert(
240							key,
241							FilterPoolItem {
242								last_poll: BlockNumberOrHash::Num(best_number + 1),
243								filter_type: pool_item.filter_type.clone(),
244								at_block: pool_item.at_block,
245								pending_transaction_hashes: current_hashes.clone(),
246							},
247						);
248
249						let mew_hashes = current_hashes
250							.difference(&previous_hashes)
251							.collect::<HashSet<&H256>>();
252						FuturePath::PendingTransaction {
253							new_hashes: mew_hashes.into_iter().copied().collect(),
254						}
255					}
256					// For each event since last poll, get a vector of ethereum logs.
257					FilterType::Log(filter) => {
258						// Update filter `last_poll`.
259						locked.insert(
260							key,
261							FilterPoolItem {
262								last_poll: BlockNumberOrHash::Num(best_number + 1),
263								filter_type: pool_item.filter_type.clone(),
264								at_block: pool_item.at_block,
265								pending_transaction_hashes: HashSet::new(),
266							},
267						);
268
269						// Either the filter-specific `to` block or best block.
270						let best_number = self.client.info().best_number;
271						let mut current_number = filter
272							.to_block
273							.and_then(|v| v.to_min_block_num())
274							.map(|s| s.unique_saturated_into())
275							.unwrap_or(best_number);
276
277						if current_number > best_number {
278							current_number = best_number;
279						}
280
281						// The from clause is the max(last_poll, filter_from).
282						let last_poll = pool_item
283							.last_poll
284							.to_min_block_num()
285							.unwrap()
286							.unique_saturated_into();
287
288						let filter_from = filter
289							.from_block
290							.and_then(|v| v.to_min_block_num())
291							.map(|s| s.unique_saturated_into())
292							.unwrap_or(last_poll);
293
294						let from_number = std::cmp::max(last_poll, filter_from);
295
296						// Build the response.
297						FuturePath::Log {
298							filter: filter.clone(),
299							from_number,
300							current_number,
301						}
302					}
303				}
304			} else {
305				FuturePath::Error(internal_err(format!("Filter id {key:?} does not exist.")))
306			}
307		} else {
308			FuturePath::Error(internal_err("Filter pool is not available."))
309		};
310
311		let client = Arc::clone(&self.client);
312		let backend = Arc::clone(&self.backend);
313		let block_data_cache = Arc::clone(&self.block_data_cache);
314		let max_past_logs = self.max_past_logs;
315
316		match path {
317			FuturePath::Error(err) => Err(err),
318			FuturePath::Block { last, next } => {
319				let mut ethereum_hashes: Vec<H256> = Vec::new();
320				for n in last..next {
321					let id = BlockId::Number(n.unique_saturated_into());
322					let substrate_hash = client
323						.expect_block_hash_from_id(&id)
324						.map_err(|_| internal_err(format!("Expect block number from id: {id}")))?;
325
326					let block = block_data_cache.current_block(substrate_hash).await;
327					if let Some(block) = block {
328						ethereum_hashes.push(block.header.hash())
329					}
330				}
331				Ok(FilterChanges::Hashes(ethereum_hashes))
332			}
333			FuturePath::PendingTransaction { new_hashes } => Ok(FilterChanges::Hashes(new_hashes)),
334			FuturePath::Log {
335				filter,
336				from_number,
337				current_number,
338			} => {
339				let mut ret: Vec<Log> = Vec::new();
340				if backend.is_indexed() {
341					let _ = filter_range_logs_indexed(
342						client.as_ref(),
343						backend.log_indexer(),
344						&block_data_cache,
345						&mut ret,
346						max_past_logs,
347						&filter,
348						from_number,
349						current_number,
350					)
351					.await?;
352				} else {
353					let _ = filter_range_logs(
354						client.as_ref(),
355						&block_data_cache,
356						&mut ret,
357						max_past_logs,
358						&filter,
359						from_number,
360						current_number,
361					)
362					.await?;
363				}
364
365				Ok(FilterChanges::Logs(ret))
366			}
367		}
368	}
369
370	async fn filter_logs(&self, index: Index) -> RpcResult<Vec<Log>> {
371		let key = U256::from(index.value());
372		let pool = self.filter_pool.clone();
373
374		// We want to get the filter, while releasing the pool lock outside
375		// of the async block.
376		let filter_result: RpcResult<Filter> = (|| {
377			let pool = pool
378				.lock()
379				.map_err(|_| internal_err("Filter pool is not available."))?;
380
381			let pool_item = pool
382				.get(&key)
383				.ok_or_else(|| internal_err(format!("Filter id {key:?} does not exist.")))?;
384
385			match &pool_item.filter_type {
386				FilterType::Log(filter) => Ok(filter.clone()),
387				_ => Err(internal_err(format!(
388					"Filter id {key:?} is not a Log filter."
389				))),
390			}
391		})();
392
393		let client = Arc::clone(&self.client);
394		let backend = Arc::clone(&self.backend);
395		let block_data_cache = Arc::clone(&self.block_data_cache);
396		let max_past_logs = self.max_past_logs;
397
398		let filter = filter_result?;
399
400		let best_number = client.info().best_number;
401		let mut current_number = filter
402			.to_block
403			.and_then(|v| v.to_min_block_num())
404			.map(|s| s.unique_saturated_into())
405			.unwrap_or(best_number);
406
407		if current_number > best_number {
408			current_number = best_number;
409		}
410
411		let from_number = filter
412			.from_block
413			.and_then(|v| v.to_min_block_num())
414			.map(|s| s.unique_saturated_into())
415			.unwrap_or(best_number);
416
417		let mut ret: Vec<Log> = Vec::new();
418		if backend.is_indexed() {
419			let _ = filter_range_logs_indexed(
420				client.as_ref(),
421				backend.log_indexer(),
422				&block_data_cache,
423				&mut ret,
424				max_past_logs,
425				&filter,
426				from_number,
427				current_number,
428			)
429			.await?;
430		} else {
431			let _ = filter_range_logs(
432				client.as_ref(),
433				&block_data_cache,
434				&mut ret,
435				max_past_logs,
436				&filter,
437				from_number,
438				current_number,
439			)
440			.await?;
441		}
442		Ok(ret)
443	}
444
445	fn uninstall_filter(&self, index: Index) -> RpcResult<bool> {
446		let key = U256::from(index.value());
447		let pool = self.filter_pool.clone();
448		// Try to lock.
449		let response = if let Ok(locked) = &mut pool.lock() {
450			if locked.remove(&key).is_some() {
451				Ok(true)
452			} else {
453				Err(internal_err(format!("Filter id {key:?} does not exist.")))
454			}
455		} else {
456			Err(internal_err("Filter pool is not available."))
457		};
458		response
459	}
460
461	async fn logs(&self, filter: Filter) -> RpcResult<Vec<Log>> {
462		let client = Arc::clone(&self.client);
463		let block_data_cache = Arc::clone(&self.block_data_cache);
464		let backend = Arc::clone(&self.backend);
465		let max_past_logs = self.max_past_logs;
466
467		let mut ret: Vec<Log> = Vec::new();
468		if let Some(hash) = filter.block_hash {
469			let substrate_hash = match frontier_backend_client::load_hash::<B, C>(
470				client.as_ref(),
471				backend.as_ref(),
472				hash,
473			)
474			.await
475			.map_err(|err| internal_err(format!("{err:?}")))?
476			{
477				Some(hash) => hash,
478				_ => return Err(crate::err(-32000, "unknown block", None)),
479			};
480
481			let block = block_data_cache.current_block(substrate_hash).await;
482			let statuses = block_data_cache
483				.current_transaction_statuses(substrate_hash)
484				.await;
485			if let (Some(block), Some(statuses)) = (block, statuses) {
486				filter_block_logs(&mut ret, &filter, block, statuses);
487			}
488		} else {
489			let best_number = client.info().best_number;
490			let mut current_number = filter
491				.to_block
492				.and_then(|v| v.to_min_block_num())
493				.map(|s| s.unique_saturated_into())
494				.unwrap_or(best_number);
495
496			if current_number > best_number {
497				current_number = best_number;
498			}
499
500			let from_number = filter
501				.from_block
502				.and_then(|v| v.to_min_block_num())
503				.map(|s| s.unique_saturated_into())
504				.unwrap_or(best_number);
505
506			if backend.is_indexed() {
507				let _ = filter_range_logs_indexed(
508					client.as_ref(),
509					backend.log_indexer(),
510					&block_data_cache,
511					&mut ret,
512					max_past_logs,
513					&filter,
514					from_number,
515					current_number,
516				)
517				.await?;
518			} else {
519				let _ = filter_range_logs(
520					client.as_ref(),
521					&block_data_cache,
522					&mut ret,
523					max_past_logs,
524					&filter,
525					from_number,
526					current_number,
527				)
528				.await?;
529			}
530		}
531		Ok(ret)
532	}
533}
534
535async fn filter_range_logs_indexed<B, C, BE>(
536	_client: &C,
537	backend: &dyn fc_api::LogIndexerBackend<B>,
538	block_data_cache: &EthBlockDataCacheTask<B>,
539	ret: &mut Vec<Log>,
540	max_past_logs: u32,
541	filter: &Filter,
542	from: NumberFor<B>,
543	to: NumberFor<B>,
544) -> RpcResult<()>
545where
546	B: BlockT,
547	C: ProvideRuntimeApi<B>,
548	C::Api: EthereumRuntimeRPCApi<B>,
549	C: HeaderBackend<B> + StorageProvider<B, BE> + 'static,
550	BE: Backend<B> + 'static,
551{
552	let timer_start = Instant::now();
553	let timer_prepare = Instant::now();
554
555	// Max request duration of 10 seconds.
556	let max_duration = Duration::from_secs(10);
557	let begin_request = Instant::now();
558
559	let topics_input = if filter.topics.is_some() {
560		let filtered_params = FilteredParams::new(Some(filter.clone()));
561		Some(filtered_params.flat_topics)
562	} else {
563		None
564	};
565
566	// Normalize filter data
567	let addresses = match &filter.address {
568		Some(VariadicValue::Single(item)) => vec![*item],
569		Some(VariadicValue::Multiple(items)) => items.clone(),
570		_ => vec![],
571	};
572	let topics = topics_input
573		.unwrap_or_default()
574		.iter()
575		.map(|flat| match flat {
576			VariadicValue::Single(item) => vec![*item],
577			VariadicValue::Multiple(items) => items.clone(),
578			_ => vec![],
579		})
580		.collect::<Vec<Vec<Option<H256>>>>();
581
582	let time_prepare = timer_prepare.elapsed().as_millis();
583	let timer_fetch = Instant::now();
584	if let Ok(logs) = backend
585		.filter_logs(
586			UniqueSaturatedInto::<u64>::unique_saturated_into(from),
587			UniqueSaturatedInto::<u64>::unique_saturated_into(to),
588			addresses,
589			topics,
590		)
591		.await
592	{
593		let time_fetch = timer_fetch.elapsed().as_millis();
594		let timer_post = Instant::now();
595
596		let mut statuses_cache: BTreeMap<B::Hash, Option<Vec<TransactionStatus>>> = BTreeMap::new();
597
598		for log in logs.iter() {
599			let substrate_hash = log.substrate_block_hash;
600
601			let ethereum_block_hash = log.ethereum_block_hash;
602			let block_number = log.block_number;
603			let db_transaction_index = log.transaction_index;
604			let db_log_index = log.log_index;
605
606			let statuses = if let Some(statuses) = statuses_cache.get(&log.substrate_block_hash) {
607				statuses.clone()
608			} else {
609				let statuses = block_data_cache
610					.current_transaction_statuses(substrate_hash)
611					.await;
612				statuses_cache.insert(log.substrate_block_hash, statuses.clone());
613				statuses
614			};
615			if let Some(statuses) = statuses {
616				let mut block_log_index: u32 = 0;
617				for status in statuses.iter() {
618					let mut transaction_log_index: u32 = 0;
619					let transaction_hash = status.transaction_hash;
620					let transaction_index = status.transaction_index;
621					for ethereum_log in &status.logs {
622						if transaction_index == db_transaction_index
623							&& transaction_log_index == db_log_index
624						{
625							ret.push(Log {
626								address: ethereum_log.address,
627								topics: ethereum_log.topics.clone(),
628								data: Bytes(ethereum_log.data.clone()),
629								block_hash: Some(ethereum_block_hash),
630								block_number: Some(U256::from(block_number)),
631								transaction_hash: Some(transaction_hash),
632								transaction_index: Some(U256::from(transaction_index)),
633								log_index: Some(U256::from(block_log_index)),
634								transaction_log_index: Some(U256::from(transaction_log_index)),
635								removed: false,
636							});
637						}
638						transaction_log_index += 1;
639						block_log_index += 1;
640					}
641				}
642			}
643			// Check for restrictions
644			if ret.len() as u32 > max_past_logs {
645				return Err(internal_err(format!(
646					"query returned more than {max_past_logs} results",
647				)));
648			}
649			if begin_request.elapsed() > max_duration {
650				return Err(internal_err(format!(
651					"query timeout of {} seconds exceeded",
652					max_duration.as_secs()
653				)));
654			}
655		}
656
657		let time_post = timer_post.elapsed().as_millis();
658
659		log::info!(
660			target: "frontier-sql",
661			"OUTER-TIMER fetch={time_fetch}, post={time_post}"
662		);
663	}
664
665	log::info!(
666		target: "frontier-sql",
667		"OUTER-TIMER start={}, prepare={}, all_fetch = {}",
668		timer_start.elapsed().as_millis(),
669		time_prepare,
670		timer_fetch.elapsed().as_millis(),
671	);
672	Ok(())
673}
674
675async fn filter_range_logs<B, C, BE>(
676	client: &C,
677	block_data_cache: &EthBlockDataCacheTask<B>,
678	ret: &mut Vec<Log>,
679	max_past_logs: u32,
680	filter: &Filter,
681	from: NumberFor<B>,
682	to: NumberFor<B>,
683) -> RpcResult<()>
684where
685	B: BlockT,
686	C: ProvideRuntimeApi<B>,
687	C::Api: EthereumRuntimeRPCApi<B>,
688	C: HeaderBackend<B> + StorageProvider<B, BE> + 'static,
689	BE: Backend<B> + 'static,
690{
691	// Max request duration of 10 seconds.
692	let max_duration = Duration::from_secs(10);
693	let begin_request = Instant::now();
694
695	let mut current_number = from;
696
697	// Pre-calculate BloomInput for reuse.
698	let topics_input = if filter.topics.is_some() {
699		let filtered_params = FilteredParams::new(Some(filter.clone()));
700		Some(filtered_params.flat_topics)
701	} else {
702		None
703	};
704	let address_bloom_filter = FilteredParams::address_bloom_filter(&filter.address);
705	let topics_bloom_filter = FilteredParams::topics_bloom_filter(&topics_input);
706
707	while current_number <= to {
708		let id = BlockId::Number(current_number);
709		let substrate_hash = client
710			.expect_block_hash_from_id(&id)
711			.map_err(|_| internal_err(format!("Expect block number from id: {id}")))?;
712
713		let block = block_data_cache.current_block(substrate_hash).await;
714
715		if let Some(block) = block {
716			if FilteredParams::address_in_bloom(block.header.logs_bloom, &address_bloom_filter)
717				&& FilteredParams::topics_in_bloom(block.header.logs_bloom, &topics_bloom_filter)
718			{
719				let statuses = block_data_cache
720					.current_transaction_statuses(substrate_hash)
721					.await;
722				if let Some(statuses) = statuses {
723					filter_block_logs(ret, filter, block, statuses);
724				}
725			}
726		}
727		// Check for restrictions
728		if ret.len() as u32 > max_past_logs {
729			return Err(internal_err(format!(
730				"query returned more than {max_past_logs} results"
731			)));
732		}
733		if begin_request.elapsed() > max_duration {
734			return Err(internal_err(format!(
735				"query timeout of {} seconds exceeded",
736				max_duration.as_secs()
737			)));
738		}
739		if current_number == to {
740			break;
741		} else {
742			current_number = current_number.saturating_add(One::one());
743		}
744	}
745	Ok(())
746}
747
748fn filter_block_logs<'a>(
749	ret: &'a mut Vec<Log>,
750	filter: &'a Filter,
751	block: EthereumBlock,
752	transaction_statuses: Vec<TransactionStatus>,
753) -> &'a Vec<Log> {
754	let params = FilteredParams::new(Some(filter.clone()));
755	let mut block_log_index: u32 = 0;
756	let block_hash = H256::from(keccak_256(&rlp::encode(&block.header)));
757	for status in transaction_statuses.iter() {
758		let mut transaction_log_index: u32 = 0;
759		let transaction_hash = status.transaction_hash;
760		for ethereum_log in &status.logs {
761			let mut log = Log {
762				address: ethereum_log.address,
763				topics: ethereum_log.topics.clone(),
764				data: Bytes(ethereum_log.data.clone()),
765				block_hash: None,
766				block_number: None,
767				transaction_hash: None,
768				transaction_index: None,
769				log_index: None,
770				transaction_log_index: None,
771				removed: false,
772			};
773			let mut add: bool = true;
774			match (filter.address.clone(), filter.topics.clone()) {
775				(Some(_), Some(_)) => {
776					if !params.filter_address(&log.address) || !params.filter_topics(&log.topics) {
777						add = false;
778					}
779				}
780				(Some(_), _) => {
781					if !params.filter_address(&log.address) {
782						add = false;
783					}
784				}
785				(_, Some(_)) => {
786					if !params.filter_topics(&log.topics) {
787						add = false;
788					}
789				}
790				_ => {}
791			}
792			if add {
793				log.block_hash = Some(block_hash);
794				log.block_number = Some(block.header.number);
795				log.transaction_hash = Some(transaction_hash);
796				log.transaction_index = Some(U256::from(status.transaction_index));
797				log.log_index = Some(U256::from(block_log_index));
798				log.transaction_log_index = Some(U256::from(transaction_log_index));
799				ret.push(log);
800			}
801			transaction_log_index += 1;
802			block_log_index += 1;
803		}
804	}
805	ret
806}