fc_mapping_sync/sql/
mod.rs

1// This file is part of Frontier.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use std::{ops::DerefMut, sync::Arc, time::Duration};
20
21use futures::prelude::*;
22// Substrate
23use sc_client_api::backend::{Backend as BackendT, StorageProvider};
24use sp_api::ProvideRuntimeApi;
25use sp_blockchain::{Backend, HeaderBackend};
26use sp_consensus::SyncOracle;
27use sp_core::H256;
28use sp_runtime::traits::{Block as BlockT, Header as HeaderT, UniqueSaturatedInto};
29// Frontier
30use fp_rpc::EthereumRuntimeRPCApi;
31
32use crate::{EthereumBlockNotification, EthereumBlockNotificationSinks, SyncStrategy};
33
34/// Defines the commands for the sync worker.
35#[derive(Debug)]
36pub enum WorkerCommand {
37	/// Resume indexing from the last indexed canon block.
38	ResumeSync,
39	/// Index leaves.
40	IndexLeaves(Vec<H256>),
41	/// Index the best block known so far via import notifications.
42	IndexBestBlock(H256),
43	/// Canonicalize the enacted and retracted blocks reported via import notifications.
44	Canonicalize {
45		common: H256,
46		enacted: Vec<H256>,
47		retracted: Vec<H256>,
48	},
49	/// Verify indexed blocks' consistency.
50	/// Check for any canon blocks that haven't had their logs indexed.
51	/// Check for any missing parent blocks from the latest canon block.
52	CheckIndexedBlocks,
53}
54
55/// Config parameters for the SyncWorker.
56pub struct SyncWorkerConfig {
57	pub check_indexed_blocks_interval: Duration,
58	pub read_notification_timeout: Duration,
59}
60
61/// Implements an indexer that imports blocks and their transactions.
62pub struct SyncWorker<Block, Backend, Client> {
63	_phantom: std::marker::PhantomData<(Block, Backend, Client)>,
64}
65
66impl<Block, Backend, Client> SyncWorker<Block, Backend, Client>
67where
68	Block: BlockT<Hash = H256>,
69	Client: ProvideRuntimeApi<Block>,
70	Client::Api: EthereumRuntimeRPCApi<Block>,
71	Client: HeaderBackend<Block> + StorageProvider<Block, Backend> + 'static,
72	Backend: BackendT<Block> + 'static,
73{
74	/// Spawn the indexing worker. The worker can be given commands via the sender channel.
75	/// Once the buffer is full, attempts to send new messages will wait until a message is read from the channel.
76	pub async fn spawn_worker(
77		client: Arc<Client>,
78		substrate_backend: Arc<Backend>,
79		indexer_backend: Arc<fc_db::sql::Backend<Block>>,
80		pubsub_notification_sinks: Arc<
81			EthereumBlockNotificationSinks<EthereumBlockNotification<Block>>,
82		>,
83	) -> tokio::sync::mpsc::Sender<WorkerCommand> {
84		let (tx, mut rx) = tokio::sync::mpsc::channel(100);
85		tokio::task::spawn(async move {
86			while let Some(cmd) = rx.recv().await {
87				log::debug!(target: "frontier-sql", "💬 Recv Worker Command {cmd:?}");
88				match cmd {
89					WorkerCommand::ResumeSync => {
90						// Attempt to resume from last indexed block. If there is no data in the db, sync genesis.
91						match indexer_backend.last_indexed_canon_block().await.ok() {
92							Some(last_block_hash) => {
93								log::debug!(target: "frontier-sql", "Resume from last block {last_block_hash:?}");
94								if let Some(parent_hash) = client
95									.header(last_block_hash)
96									.ok()
97									.flatten()
98									.map(|header| *header.parent_hash())
99								{
100									index_canonical_block_and_ancestors(
101										client.clone(),
102										substrate_backend.clone(),
103										indexer_backend.clone(),
104										parent_hash,
105									)
106									.await;
107								}
108							}
109							None => {
110								index_genesis_block(client.clone(), indexer_backend.clone()).await;
111							}
112						};
113					}
114					WorkerCommand::IndexLeaves(leaves) => {
115						for leaf in leaves {
116							index_block_and_ancestors(
117								client.clone(),
118								substrate_backend.clone(),
119								indexer_backend.clone(),
120								leaf,
121							)
122							.await;
123						}
124					}
125					WorkerCommand::IndexBestBlock(block_hash) => {
126						index_canonical_block_and_ancestors(
127							client.clone(),
128							substrate_backend.clone(),
129							indexer_backend.clone(),
130							block_hash,
131						)
132						.await;
133						let sinks = &mut pubsub_notification_sinks.lock();
134						for sink in sinks.iter() {
135							let _ = sink.unbounded_send(EthereumBlockNotification {
136								is_new_best: true,
137								hash: block_hash,
138							});
139						}
140					}
141					WorkerCommand::Canonicalize {
142						common,
143						enacted,
144						retracted,
145					} => {
146						canonicalize_blocks(indexer_backend.clone(), common, enacted, retracted)
147							.await;
148					}
149					WorkerCommand::CheckIndexedBlocks => {
150						// Fix any indexed blocks that did not have their logs indexed
151						if let Some(block_hash) =
152							indexer_backend.get_first_pending_canon_block().await
153						{
154							log::debug!(target: "frontier-sql", "Indexing pending canonical block {block_hash:?}");
155							indexer_backend.index_block_logs(block_hash).await;
156						}
157
158						// Fix any missing blocks
159						index_missing_blocks(
160							client.clone(),
161							substrate_backend.clone(),
162							indexer_backend.clone(),
163						)
164						.await;
165					}
166				}
167			}
168		});
169
170		tx
171	}
172
173	/// Start the worker.
174	pub async fn run(
175		client: Arc<Client>,
176		substrate_backend: Arc<Backend>,
177		indexer_backend: Arc<fc_db::sql::Backend<Block>>,
178		import_notifications: sc_client_api::ImportNotifications<Block>,
179		worker_config: SyncWorkerConfig,
180		_sync_strategy: SyncStrategy,
181		sync_oracle: Arc<dyn SyncOracle + Send + Sync + 'static>,
182		pubsub_notification_sinks: Arc<
183			EthereumBlockNotificationSinks<EthereumBlockNotification<Block>>,
184		>,
185	) {
186		let tx = Self::spawn_worker(
187			client.clone(),
188			substrate_backend.clone(),
189			indexer_backend.clone(),
190			pubsub_notification_sinks.clone(),
191		)
192		.await;
193
194		// Resume sync from the last indexed block until we reach an already indexed parent
195		tx.send(WorkerCommand::ResumeSync).await.ok();
196		// check missing blocks every interval
197		let tx2 = tx.clone();
198		tokio::task::spawn(async move {
199			loop {
200				futures_timer::Delay::new(worker_config.check_indexed_blocks_interval).await;
201				tx2.send(WorkerCommand::CheckIndexedBlocks).await.ok();
202			}
203		});
204
205		// check notifications
206		let mut notifications = import_notifications.fuse();
207		loop {
208			let mut timeout =
209				futures_timer::Delay::new(worker_config.read_notification_timeout).fuse();
210			futures::select! {
211				_ = timeout => {
212					if let Ok(leaves) = substrate_backend.blockchain().leaves() {
213						tx.send(WorkerCommand::IndexLeaves(leaves)).await.ok();
214					}
215					if sync_oracle.is_major_syncing() {
216						let sinks = &mut pubsub_notification_sinks.lock();
217						if !sinks.is_empty() {
218							*sinks.deref_mut() = vec![];
219						}
220					}
221				}
222				notification = notifications.next() => if let Some(notification) = notification {
223					log::debug!(
224						target: "frontier-sql",
225						"📣  New notification: #{} {:?} (parent {}), best = {}",
226						notification.header.number(),
227						notification.hash,
228						notification.header.parent_hash(),
229						notification.is_new_best,
230					);
231					if notification.is_new_best {
232						if let Some(tree_route) = notification.tree_route {
233							log::debug!(
234								target: "frontier-sql",
235								"🔀  Re-org happened at new best {}, proceeding to canonicalize db",
236								notification.hash
237							);
238							let retracted = tree_route
239								.retracted()
240								.iter()
241								.map(|hash_and_number| hash_and_number.hash)
242								.collect::<Vec<_>>();
243							let enacted = tree_route
244								.enacted()
245								.iter()
246								.map(|hash_and_number| hash_and_number.hash)
247								.collect::<Vec<_>>();
248
249							let common = tree_route.common_block().hash;
250							tx.send(WorkerCommand::Canonicalize {
251								common,
252								enacted,
253								retracted,
254							}).await.ok();
255						}
256
257						tx.send(WorkerCommand::IndexBestBlock(notification.hash)).await.ok();
258					}
259				}
260			}
261		}
262	}
263}
264
265/// Index the provided blocks. The function loops over the ancestors of the provided nodes
266/// until it encounters the genesis block, or a block that has already been imported, or
267/// is already in the active set. The `hashes` parameter is populated with any parent blocks
268/// that is scheduled to be indexed.
269async fn index_block_and_ancestors<Block, Backend, Client>(
270	client: Arc<Client>,
271	substrate_backend: Arc<Backend>,
272	indexer_backend: Arc<fc_db::sql::Backend<Block>>,
273	hash: H256,
274) where
275	Block: BlockT<Hash = H256>,
276	Client: ProvideRuntimeApi<Block>,
277	Client::Api: EthereumRuntimeRPCApi<Block>,
278	Client: HeaderBackend<Block> + StorageProvider<Block, Backend> + 'static,
279	Backend: BackendT<Block> + 'static,
280{
281	let blockchain_backend = substrate_backend.blockchain();
282	let mut hashes = vec![hash];
283	while let Some(hash) = hashes.pop() {
284		// exit if genesis block is reached
285		if hash == H256::default() {
286			break;
287		}
288
289		// exit if block is already imported
290		if indexer_backend.is_block_indexed(hash).await {
291			log::debug!(target: "frontier-sql", "🔴 Block {hash:?} already imported");
292			break;
293		}
294
295		log::debug!(target: "frontier-sql", "🛠️  Importing {hash:?}");
296		let _ = indexer_backend
297			.insert_block_metadata(client.clone(), hash)
298			.await
299			.map_err(|e| {
300				log::error!(target: "frontier-sql", "{e}");
301			});
302		log::debug!(target: "frontier-sql", "Inserted block metadata");
303		indexer_backend.index_block_logs(hash).await;
304
305		if let Ok(Some(header)) = blockchain_backend.header(hash) {
306			let parent_hash = header.parent_hash();
307			hashes.push(*parent_hash);
308		}
309	}
310}
311
312/// Index the provided known canonical blocks. The function loops over the ancestors of the provided nodes
313/// until it encounters the genesis block, or a block that has already been imported, or
314/// is already in the active set. The `hashes` parameter is populated with any parent blocks
315/// that is scheduled to be indexed.
316async fn index_canonical_block_and_ancestors<Block, Backend, Client>(
317	client: Arc<Client>,
318	substrate_backend: Arc<Backend>,
319	indexer_backend: Arc<fc_db::sql::Backend<Block>>,
320	hash: H256,
321) where
322	Block: BlockT<Hash = H256>,
323	Client: ProvideRuntimeApi<Block>,
324	Client::Api: EthereumRuntimeRPCApi<Block>,
325	Client: HeaderBackend<Block> + StorageProvider<Block, Backend> + 'static,
326	Backend: BackendT<Block> + 'static,
327{
328	let blockchain_backend = substrate_backend.blockchain();
329	let mut hashes = vec![hash];
330	while let Some(hash) = hashes.pop() {
331		// exit if genesis block is reached
332		if hash == H256::default() {
333			break;
334		}
335
336		let status = indexer_backend.block_indexed_and_canon_status(hash).await;
337
338		// exit if canonical block is already imported
339		if status.indexed && status.canon {
340			log::debug!(target: "frontier-sql", "🔴 Block {hash:?} already imported");
341			break;
342		}
343
344		// If block was previously indexed as non-canon then mark it as canon
345		if status.indexed && !status.canon {
346			if let Err(err) = indexer_backend.set_block_as_canon(hash).await {
347				log::error!(target: "frontier-sql", "Failed setting block {hash:?} as canon: {err:?}");
348				continue;
349			}
350
351			log::debug!(target: "frontier-sql", "🛠️  Marked block as canon {hash:?}");
352
353			// Check parent block
354			if let Ok(Some(header)) = blockchain_backend.header(hash) {
355				let parent_hash = header.parent_hash();
356				hashes.push(*parent_hash);
357			}
358			continue;
359		}
360
361		// Else, import the new block
362		log::debug!(target: "frontier-sql", "🛠️  Importing {hash:?}");
363		let _ = indexer_backend
364			.insert_block_metadata(client.clone(), hash)
365			.await
366			.map_err(|e| {
367				log::error!(target: "frontier-sql", "{e}");
368			});
369		log::debug!(target: "frontier-sql", "Inserted block metadata  {hash:?}");
370		indexer_backend.index_block_logs(hash).await;
371
372		if let Ok(Some(header)) = blockchain_backend.header(hash) {
373			let parent_hash = header.parent_hash();
374			hashes.push(*parent_hash);
375		}
376	}
377}
378
379/// Canonicalizes the database by setting the `is_canon` field for the retracted blocks to `0`,
380/// and `1` if they are enacted.
381async fn canonicalize_blocks<Block: BlockT<Hash = H256>>(
382	indexer_backend: Arc<fc_db::sql::Backend<Block>>,
383	common: H256,
384	enacted: Vec<H256>,
385	retracted: Vec<H256>,
386) {
387	if (indexer_backend.canonicalize(&retracted, &enacted).await).is_err() {
388		log::error!(
389			target: "frontier-sql",
390			"❌  Canonicalization failed for common ancestor {common}, potentially corrupted db. Retracted: {retracted:?}, Enacted: {enacted:?}"
391		);
392	}
393}
394
395/// Attempts to index any missing blocks that are in the past. This fixes any gaps that may
396/// be present in the indexing strategy, since the indexer only walks the parent hashes until
397/// it finds the first ancestor that has already been indexed.
398async fn index_missing_blocks<Block, Client, Backend>(
399	client: Arc<Client>,
400	substrate_backend: Arc<Backend>,
401	indexer_backend: Arc<fc_db::sql::Backend<Block>>,
402) where
403	Block: BlockT<Hash = H256>,
404	Client: ProvideRuntimeApi<Block>,
405	Client::Api: EthereumRuntimeRPCApi<Block>,
406	Client: HeaderBackend<Block> + StorageProvider<Block, Backend> + 'static,
407	Backend: BackendT<Block> + 'static,
408{
409	if let Some(block_number) = indexer_backend.get_first_missing_canon_block().await {
410		log::debug!(target: "frontier-sql", "Missing {block_number:?}");
411		if block_number == 0 {
412			index_genesis_block(client.clone(), indexer_backend.clone()).await;
413		} else if let Ok(Some(block_hash)) = client.hash(block_number.unique_saturated_into()) {
414			log::debug!(
415				target: "frontier-sql",
416				"Indexing past canonical blocks from #{block_number} {block_hash:?}"
417			);
418			index_canonical_block_and_ancestors(
419				client.clone(),
420				substrate_backend.clone(),
421				indexer_backend.clone(),
422				block_hash,
423			)
424			.await;
425		} else {
426			log::debug!(target: "frontier-sql", "Failed retrieving hash for block #{block_number}");
427		}
428	}
429}
430
431/// Attempts to index any missing blocks that are in the past. This fixes any gaps that may
432/// be present in the indexing strategy, since the indexer only walks the parent hashes until
433/// it finds the first ancestor that has already been indexed.
434async fn index_genesis_block<Block, Client, Backend>(
435	client: Arc<Client>,
436	indexer_backend: Arc<fc_db::sql::Backend<Block>>,
437) where
438	Block: BlockT<Hash = H256>,
439	Client: ProvideRuntimeApi<Block>,
440	Client::Api: EthereumRuntimeRPCApi<Block>,
441	Client: HeaderBackend<Block> + StorageProvider<Block, Backend> + 'static,
442	Backend: BackendT<Block> + 'static,
443{
444	log::info!(
445		target: "frontier-sql",
446		"Import genesis",
447	);
448	if let Ok(Some(substrate_genesis_hash)) = indexer_backend
449		.insert_genesis_block_metadata(client.clone())
450		.await
451		.map_err(|e| {
452			log::error!(target: "frontier-sql", "💔  Cannot sync genesis block: {e}");
453		}) {
454		log::debug!(target: "frontier-sql", "Imported genesis block {substrate_genesis_hash:?}");
455	}
456}
457
458#[cfg(test)]
459mod test {
460	use super::*;
461
462	use std::{
463		path::Path,
464		sync::{Arc, Mutex},
465	};
466
467	use futures::executor;
468	use scale_codec::Encode;
469	use sqlx::Row;
470	use tempfile::tempdir;
471	// Substrate
472	use sc_block_builder::BlockBuilderBuilder;
473	use sc_client_api::{BlockchainEvents, HeaderBackend};
474	use sp_consensus::BlockOrigin;
475	use sp_core::{H160, H256, U256};
476	use sp_io::hashing::twox_128;
477	use sp_runtime::{
478		generic::{DigestItem, Header},
479		traits::BlakeTwo256,
480	};
481	use substrate_test_runtime_client::{
482		prelude::*, DefaultTestClientBuilderExt, TestClientBuilder, TestClientBuilderExt,
483	};
484	// Frontier
485	use fc_storage::SchemaV3StorageOverride;
486	use fp_storage::{constants::*, EthereumStorageSchema, PALLET_ETHEREUM_SCHEMA};
487
488	type OpaqueBlock = sp_runtime::generic::Block<
489		Header<u64, BlakeTwo256>,
490		substrate_test_runtime_client::runtime::Extrinsic,
491	>;
492
493	struct TestSyncOracleNotSyncing;
494	impl sp_consensus::SyncOracle for TestSyncOracleNotSyncing {
495		fn is_major_syncing(&self) -> bool {
496			false
497		}
498		fn is_offline(&self) -> bool {
499			false
500		}
501	}
502
503	fn storage_prefix_build(module: &[u8], storage: &[u8]) -> Vec<u8> {
504		[twox_128(module), twox_128(storage)].concat().to_vec()
505	}
506
507	fn ethereum_digest() -> DigestItem {
508		let partial_header = ethereum::PartialHeader {
509			parent_hash: H256::random(),
510			beneficiary: H160::default(),
511			state_root: H256::default(),
512			receipts_root: H256::default(),
513			logs_bloom: ethereum_types::Bloom::default(),
514			difficulty: U256::zero(),
515			number: U256::zero(),
516			gas_limit: U256::zero(),
517			gas_used: U256::zero(),
518			timestamp: 0u64,
519			extra_data: Vec::new(),
520			mix_hash: H256::default(),
521			nonce: ethereum_types::H64::default(),
522		};
523		let ethereum_transactions: Vec<ethereum::TransactionV3> = vec![];
524		let ethereum_block = ethereum::Block::new(partial_header, ethereum_transactions, vec![]);
525		DigestItem::Consensus(
526			fp_consensus::FRONTIER_ENGINE_ID,
527			fp_consensus::PostLog::Hashes(fp_consensus::Hashes::from_block(ethereum_block))
528				.encode(),
529		)
530	}
531
532	#[tokio::test]
533	async fn interval_indexing_works() {
534		let tmp = tempdir().expect("create a temporary directory");
535		// Initialize storage with schema V3
536		let builder = TestClientBuilder::new().add_extra_storage(
537			PALLET_ETHEREUM_SCHEMA.to_vec(),
538			Encode::encode(&EthereumStorageSchema::V3),
539		);
540		// Backend
541		let backend = builder.backend();
542		// Client
543		let (client, _) =
544			builder.build_with_native_executor::<frontier_template_runtime::RuntimeApi, _>(None);
545		let client = Arc::new(client);
546		// Overrides
547		let storage_override = Arc::new(SchemaV3StorageOverride::new(client.clone()));
548		// Indexer backend
549		let indexer_backend = fc_db::sql::Backend::new(
550			fc_db::sql::BackendConfig::Sqlite(fc_db::sql::SqliteBackendConfig {
551				path: Path::new("sqlite:///")
552					.join(tmp.path())
553					.join("test.db3")
554					.to_str()
555					.unwrap(),
556				create_if_missing: true,
557				cache_size: 204800,
558				thread_count: 4,
559			}),
560			100,
561			None,
562			storage_override.clone(),
563		)
564		.await
565		.expect("indexer pool to be created");
566		// Pool
567		let pool = indexer_backend.pool().clone();
568
569		// Create 10 blocks, 2 receipts each, 1 log per receipt
570		let mut logs: Vec<(i32, fc_db::sql::Log)> = vec![];
571		for block_number in 1..11 {
572			// New block including pallet ethereum block digest
573			let chain = client.chain_info();
574			let mut builder = BlockBuilderBuilder::new(&*client)
575				.on_parent_block(chain.best_hash)
576				.with_parent_block_number(chain.best_number)
577				.build()
578				.unwrap();
579			builder
580				.push_deposit_log_digest_item(ethereum_digest())
581				.expect("deposit log");
582			// Addresses
583			let address_1 = H160::repeat_byte(0x01);
584			let address_2 = H160::repeat_byte(0x02);
585			// Topics
586			let topics_1_1 = H256::repeat_byte(0x01);
587			let topics_1_2 = H256::repeat_byte(0x02);
588			let topics_2_1 = H256::repeat_byte(0x03);
589			let topics_2_2 = H256::repeat_byte(0x04);
590			let topics_2_3 = H256::repeat_byte(0x05);
591			let topics_2_4 = H256::repeat_byte(0x06);
592
593			let receipts = Encode::encode(&vec![
594				ethereum::ReceiptV4::EIP1559(ethereum::EIP1559ReceiptData {
595					status_code: 0u8,
596					used_gas: U256::zero(),
597					logs_bloom: ethereum_types::Bloom::zero(),
598					logs: vec![ethereum::Log {
599						address: address_1,
600						topics: vec![topics_1_1, topics_1_2],
601						data: vec![],
602					}],
603				}),
604				ethereum::ReceiptV4::EIP1559(ethereum::EIP1559ReceiptData {
605					status_code: 0u8,
606					used_gas: U256::zero(),
607					logs_bloom: ethereum_types::Bloom::zero(),
608					logs: vec![ethereum::Log {
609						address: address_2,
610						topics: vec![topics_2_1, topics_2_2, topics_2_3, topics_2_4],
611						data: vec![],
612					}],
613				}),
614			]);
615			builder
616				.push_storage_change(
617					storage_prefix_build(PALLET_ETHEREUM, ETHEREUM_CURRENT_RECEIPTS),
618					Some(receipts),
619				)
620				.unwrap();
621			let block = builder.build().unwrap().block;
622			let block_hash = block.header.hash();
623			executor::block_on(client.import(BlockOrigin::Own, block)).unwrap();
624			logs.push((
625				block_number,
626				fc_db::sql::Log {
627					address: address_1.as_bytes().to_owned(),
628					topic_1: Some(topics_1_1.as_bytes().to_owned()),
629					topic_2: Some(topics_1_2.as_bytes().to_owned()),
630					topic_3: None,
631					topic_4: None,
632					log_index: 0i32,
633					transaction_index: 0i32,
634					substrate_block_hash: block_hash.as_bytes().to_owned(),
635				},
636			));
637			logs.push((
638				block_number,
639				fc_db::sql::Log {
640					address: address_2.as_bytes().to_owned(),
641					topic_1: Some(topics_2_1.as_bytes().to_owned()),
642					topic_2: Some(topics_2_2.as_bytes().to_owned()),
643					topic_3: Some(topics_2_3.as_bytes().to_owned()),
644					topic_4: Some(topics_2_4.as_bytes().to_owned()),
645					log_index: 0i32,
646					transaction_index: 1i32,
647					substrate_block_hash: block_hash.as_bytes().to_owned(),
648				},
649			));
650		}
651
652		let test_sync_oracle = TestSyncOracleNotSyncing {};
653		let pubsub_notification_sinks: EthereumBlockNotificationSinks<
654			EthereumBlockNotification<OpaqueBlock>,
655		> = Default::default();
656		let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
657
658		let pubsub_notification_sinks_inner = pubsub_notification_sinks.clone();
659
660		// Spawn worker after creating the blocks will resolve the interval future.
661		// Because the SyncWorker is spawned at service level, in the real world this will only
662		// happen when we are in major syncing (where there is lack of import notifications).
663		tokio::task::spawn(async move {
664			crate::sql::SyncWorker::run(
665				client.clone(),
666				backend.clone(),
667				Arc::new(indexer_backend),
668				client.clone().import_notification_stream(),
669				SyncWorkerConfig {
670					read_notification_timeout: Duration::from_secs(1),
671					check_indexed_blocks_interval: Duration::from_secs(60),
672				},
673				SyncStrategy::Parachain,
674				Arc::new(test_sync_oracle),
675				pubsub_notification_sinks_inner,
676			)
677			.await
678		});
679
680		// Enough time for interval to run
681		futures_timer::Delay::new(Duration::from_millis(1500)).await;
682
683		// Query db
684		let db_logs = sqlx::query(
685			"SELECT
686					b.block_number,
687					address,
688					topic_1,
689					topic_2,
690					topic_3,
691					topic_4,
692					log_index,
693					transaction_index,
694					a.substrate_block_hash
695				FROM logs AS a INNER JOIN blocks AS b ON a.substrate_block_hash = b.substrate_block_hash
696				ORDER BY b.block_number ASC, log_index ASC, transaction_index ASC",
697		)
698		.fetch_all(&pool)
699		.await
700		.expect("test query result")
701		.iter()
702		.map(|row| {
703			let block_number = row.get::<i32, _>(0);
704			let address = row.get::<Vec<u8>, _>(1);
705			let topic_1 = row.get::<Option<Vec<u8>>, _>(2);
706			let topic_2 = row.get::<Option<Vec<u8>>, _>(3);
707			let topic_3 = row.get::<Option<Vec<u8>>, _>(4);
708			let topic_4 = row.get::<Option<Vec<u8>>, _>(5);
709			let log_index = row.get::<i32, _>(6);
710			let transaction_index = row.get::<i32, _>(7);
711			let substrate_block_hash = row.get::<Vec<u8>, _>(8);
712			(
713				block_number,
714				fc_db::sql::Log {
715					address,
716					topic_1,
717					topic_2,
718					topic_3,
719					topic_4,
720					log_index,
721					transaction_index,
722					substrate_block_hash,
723				},
724			)
725		})
726		.collect::<Vec<(i32, fc_db::sql::Log)>>();
727
728		// Expect the db to contain 20 rows. 10 blocks, 2 logs each.
729		// Db data is sorted ASC by block_number, log_index and transaction_index.
730		// This is necessary because indexing is done from tip to genesis.
731		// Expect the db resultset to be equal to the locally produced Log vector.
732		assert_eq!(db_logs, logs);
733	}
734
735	#[tokio::test]
736	async fn notification_indexing_works() {
737		let tmp = tempdir().expect("create a temporary directory");
738		// Initialize storage with schema V3
739		let builder = TestClientBuilder::new().add_extra_storage(
740			PALLET_ETHEREUM_SCHEMA.to_vec(),
741			Encode::encode(&EthereumStorageSchema::V3),
742		);
743		// Backend
744		let backend = builder.backend();
745		// Client
746		let (client, _) =
747			builder.build_with_native_executor::<frontier_template_runtime::RuntimeApi, _>(None);
748		let client = Arc::new(client);
749		// Overrides
750		let storage_override = Arc::new(SchemaV3StorageOverride::new(client.clone()));
751		// Indexer backend
752		let indexer_backend = fc_db::sql::Backend::new(
753			fc_db::sql::BackendConfig::Sqlite(fc_db::sql::SqliteBackendConfig {
754				path: Path::new("sqlite:///")
755					.join(tmp.path())
756					.join("test.db3")
757					.to_str()
758					.unwrap(),
759				create_if_missing: true,
760				cache_size: 204800,
761				thread_count: 4,
762			}),
763			100,
764			None,
765			storage_override.clone(),
766		)
767		.await
768		.expect("indexer pool to be created");
769		// Pool
770		let pool = indexer_backend.pool().clone();
771
772		let test_sync_oracle = TestSyncOracleNotSyncing {};
773		let pubsub_notification_sinks: EthereumBlockNotificationSinks<
774			EthereumBlockNotification<OpaqueBlock>,
775		> = Default::default();
776		let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
777
778		let pubsub_notification_sinks_inner = pubsub_notification_sinks.clone();
779
780		// Spawn worker after creating the blocks will resolve the interval future.
781		// Because the SyncWorker is spawned at service level, in the real world this will only
782		// happen when we are in major syncing (where there is lack of import notifications).
783		let notification_stream = client.clone().import_notification_stream();
784		let client_inner = client.clone();
785		tokio::task::spawn(async move {
786			crate::sql::SyncWorker::run(
787				client_inner,
788				backend.clone(),
789				Arc::new(indexer_backend),
790				notification_stream,
791				SyncWorkerConfig {
792					read_notification_timeout: Duration::from_secs(10),
793					check_indexed_blocks_interval: Duration::from_secs(60),
794				},
795				SyncStrategy::Parachain,
796				Arc::new(test_sync_oracle),
797				pubsub_notification_sinks_inner,
798			)
799			.await
800		});
801
802		// Create 10 blocks, 2 receipts each, 1 log per receipt
803		let mut logs: Vec<(i32, fc_db::sql::Log)> = vec![];
804		for block_number in 1..11 {
805			// New block including pallet ethereum block digest
806			let chain = client.chain_info();
807			let mut builder = BlockBuilderBuilder::new(&*client)
808				.on_parent_block(chain.best_hash)
809				.with_parent_block_number(chain.best_number)
810				.build()
811				.unwrap();
812			builder
813				.push_deposit_log_digest_item(ethereum_digest())
814				.expect("deposit log");
815			// Addresses
816			let address_1 = H160::random();
817			let address_2 = H160::random();
818			// Topics
819			let topics_1_1 = H256::random();
820			let topics_1_2 = H256::random();
821			let topics_2_1 = H256::random();
822			let topics_2_2 = H256::random();
823			let topics_2_3 = H256::random();
824			let topics_2_4 = H256::random();
825
826			let receipts = Encode::encode(&vec![
827				ethereum::ReceiptV4::EIP1559(ethereum::EIP1559ReceiptData {
828					status_code: 0u8,
829					used_gas: U256::zero(),
830					logs_bloom: ethereum_types::Bloom::zero(),
831					logs: vec![ethereum::Log {
832						address: address_1,
833						topics: vec![topics_1_1, topics_1_2],
834						data: vec![],
835					}],
836				}),
837				ethereum::ReceiptV4::EIP1559(ethereum::EIP1559ReceiptData {
838					status_code: 0u8,
839					used_gas: U256::zero(),
840					logs_bloom: ethereum_types::Bloom::zero(),
841					logs: vec![ethereum::Log {
842						address: address_2,
843						topics: vec![topics_2_1, topics_2_2, topics_2_3, topics_2_4],
844						data: vec![],
845					}],
846				}),
847			]);
848			builder
849				.push_storage_change(
850					storage_prefix_build(PALLET_ETHEREUM, ETHEREUM_CURRENT_RECEIPTS),
851					Some(receipts),
852				)
853				.unwrap();
854			let block = builder.build().unwrap().block;
855			let block_hash = block.header.hash();
856			executor::block_on(client.import(BlockOrigin::Own, block)).unwrap();
857			logs.push((
858				block_number,
859				fc_db::sql::Log {
860					address: address_1.as_bytes().to_owned(),
861					topic_1: Some(topics_1_1.as_bytes().to_owned()),
862					topic_2: Some(topics_1_2.as_bytes().to_owned()),
863					topic_3: None,
864					topic_4: None,
865					log_index: 0i32,
866					transaction_index: 0i32,
867					substrate_block_hash: block_hash.as_bytes().to_owned(),
868				},
869			));
870			logs.push((
871				block_number,
872				fc_db::sql::Log {
873					address: address_2.as_bytes().to_owned(),
874					topic_1: Some(topics_2_1.as_bytes().to_owned()),
875					topic_2: Some(topics_2_2.as_bytes().to_owned()),
876					topic_3: Some(topics_2_3.as_bytes().to_owned()),
877					topic_4: Some(topics_2_4.as_bytes().to_owned()),
878					log_index: 0i32,
879					transaction_index: 1i32,
880					substrate_block_hash: block_hash.as_bytes().to_owned(),
881				},
882			));
883			// Let's not notify too quickly
884			futures_timer::Delay::new(Duration::from_millis(100)).await;
885		}
886
887		// Query db
888		let db_logs = sqlx::query(
889			"SELECT
890					b.block_number,
891					address,
892					topic_1,
893					topic_2,
894					topic_3,
895					topic_4,
896					log_index,
897					transaction_index,
898					a.substrate_block_hash
899				FROM logs AS a INNER JOIN blocks AS b ON a.substrate_block_hash = b.substrate_block_hash
900				ORDER BY b.block_number ASC, log_index ASC, transaction_index ASC",
901		)
902		.fetch_all(&pool)
903		.await
904		.expect("test query result")
905		.iter()
906		.map(|row| {
907			let block_number = row.get::<i32, _>(0);
908			let address = row.get::<Vec<u8>, _>(1);
909			let topic_1 = row.get::<Option<Vec<u8>>, _>(2);
910			let topic_2 = row.get::<Option<Vec<u8>>, _>(3);
911			let topic_3 = row.get::<Option<Vec<u8>>, _>(4);
912			let topic_4 = row.get::<Option<Vec<u8>>, _>(5);
913			let log_index = row.get::<i32, _>(6);
914			let transaction_index = row.get::<i32, _>(7);
915			let substrate_block_hash = row.get::<Vec<u8>, _>(8);
916			(
917				block_number,
918				fc_db::sql::Log {
919					address,
920					topic_1,
921					topic_2,
922					topic_3,
923					topic_4,
924					log_index,
925					transaction_index,
926					substrate_block_hash,
927				},
928			)
929		})
930		.collect::<Vec<(i32, fc_db::sql::Log)>>();
931
932		// Expect the db to contain 20 rows. 10 blocks, 2 logs each.
933		// Db data is sorted ASC by block_number, log_index and transaction_index.
934		// This is necessary because indexing is done from tip to genesis.
935		// Expect the db resultset to be equal to the locally produced Log vector.
936		assert_eq!(db_logs, logs);
937	}
938
939	#[tokio::test]
940	async fn canonicalize_works() {
941		let tmp = tempdir().expect("create a temporary directory");
942		// Initialize storage with schema V3
943		let builder = TestClientBuilder::new().add_extra_storage(
944			PALLET_ETHEREUM_SCHEMA.to_vec(),
945			Encode::encode(&EthereumStorageSchema::V3),
946		);
947		// Backend
948		let backend = builder.backend();
949		// Client
950		let (client, _) =
951			builder.build_with_native_executor::<frontier_template_runtime::RuntimeApi, _>(None);
952		let client = Arc::new(client);
953		// Overrides
954		let storage_override = Arc::new(SchemaV3StorageOverride::new(client.clone()));
955		// Indexer backend
956		let indexer_backend = fc_db::sql::Backend::new(
957			fc_db::sql::BackendConfig::Sqlite(fc_db::sql::SqliteBackendConfig {
958				path: Path::new("sqlite:///")
959					.join(tmp.path())
960					.join("test.db3")
961					.to_str()
962					.unwrap(),
963				create_if_missing: true,
964				cache_size: 204800,
965				thread_count: 4,
966			}),
967			100,
968			None,
969			storage_override.clone(),
970		)
971		.await
972		.expect("indexer pool to be created");
973
974		// Pool
975		let pool = indexer_backend.pool().clone();
976
977		// Spawn indexer task
978		let test_sync_oracle = TestSyncOracleNotSyncing {};
979		let pubsub_notification_sinks: EthereumBlockNotificationSinks<
980			EthereumBlockNotification<OpaqueBlock>,
981		> = Default::default();
982		let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
983
984		let pubsub_notification_sinks_inner = pubsub_notification_sinks.clone();
985
986		let notification_stream = client.clone().import_notification_stream();
987		let client_inner = client.clone();
988		tokio::task::spawn(async move {
989			crate::sql::SyncWorker::run(
990				client_inner,
991				backend.clone(),
992				Arc::new(indexer_backend),
993				notification_stream,
994				SyncWorkerConfig {
995					read_notification_timeout: Duration::from_secs(10),
996					check_indexed_blocks_interval: Duration::from_secs(60),
997				},
998				SyncStrategy::Parachain,
999				Arc::new(test_sync_oracle),
1000				pubsub_notification_sinks_inner,
1001			)
1002			.await
1003		});
1004
1005		// Create 10 blocks saving the common ancestor for branching.
1006		let mut parent_hash = client
1007			.hash(sp_runtime::traits::Zero::zero())
1008			.unwrap()
1009			.expect("genesis hash");
1010		let mut common_ancestor = parent_hash;
1011		let mut hashes_to_be_orphaned: Vec<H256> = vec![];
1012		for block_number in 1..11 {
1013			// New block including pallet ethereum block digest
1014			let mut builder = BlockBuilderBuilder::new(&*client)
1015				.on_parent_block(parent_hash)
1016				.fetch_parent_block_number(&*client)
1017				.unwrap()
1018				.build()
1019				.unwrap();
1020			builder
1021				.push_deposit_log_digest_item(ethereum_digest())
1022				.expect("deposit log");
1023			let block = builder.build().unwrap().block;
1024			let block_hash = block.header.hash();
1025			executor::block_on(client.import(BlockOrigin::Own, block)).unwrap();
1026			if block_number == 8 {
1027				common_ancestor = block_hash;
1028			}
1029			if block_number == 9 || block_number == 10 {
1030				hashes_to_be_orphaned.push(block_hash);
1031			}
1032			parent_hash = block_hash;
1033			// Let's not notify too quickly
1034			futures_timer::Delay::new(Duration::from_millis(100)).await;
1035		}
1036
1037		// Test all blocks are initially canon.
1038		let mut res = sqlx::query("SELECT is_canon FROM blocks")
1039			.fetch_all(&pool)
1040			.await
1041			.expect("test query result")
1042			.iter()
1043			.map(|row| row.get::<i32, _>(0))
1044			.collect::<Vec<i32>>();
1045
1046		assert_eq!(res.len(), 10);
1047		res.dedup();
1048		assert_eq!(res.len(), 1);
1049
1050		// Create the new longest chain, 10 more blocks on top of the common ancestor.
1051		parent_hash = common_ancestor;
1052		for _ in 1..11 {
1053			// New block including pallet ethereum block digest
1054			let mut builder = BlockBuilderBuilder::new(&*client)
1055				.on_parent_block(parent_hash)
1056				.fetch_parent_block_number(&*client)
1057				.unwrap()
1058				.build()
1059				.unwrap();
1060			builder
1061				.push_deposit_log_digest_item(ethereum_digest())
1062				.expect("deposit log");
1063			let block = builder.build().unwrap().block;
1064			let block_hash = block.header.hash();
1065			executor::block_on(client.import(BlockOrigin::Own, block)).unwrap();
1066			parent_hash = block_hash;
1067			// Let's not notify too quickly
1068			futures_timer::Delay::new(Duration::from_millis(100)).await;
1069		}
1070
1071		// Test the reorged chain is correctly indexed.
1072		let res = sqlx::query("SELECT substrate_block_hash, is_canon, block_number FROM blocks")
1073			.fetch_all(&pool)
1074			.await
1075			.expect("test query result")
1076			.iter()
1077			.map(|row| {
1078				let substrate_block_hash = H256::from_slice(&row.get::<Vec<u8>, _>(0)[..]);
1079				let is_canon = row.get::<i32, _>(1);
1080				let block_number = row.get::<i32, _>(2);
1081				(substrate_block_hash, is_canon, block_number)
1082			})
1083			.collect::<Vec<(H256, i32, i32)>>();
1084
1085		// 20 blocks in total
1086		assert_eq!(res.len(), 20);
1087
1088		// 18 of which are canon
1089		let canon = res
1090			.clone()
1091			.into_iter()
1092			.filter(|&it| it.1 == 1)
1093			.collect::<Vec<(H256, i32, i32)>>();
1094		assert_eq!(canon.len(), 18);
1095
1096		// and 2 of which are the originally tracked as orphaned
1097		let not_canon = res
1098			.into_iter()
1099			.filter_map(|it| if it.1 == 0 { Some(it.0) } else { None })
1100			.collect::<Vec<H256>>();
1101		assert_eq!(not_canon.len(), hashes_to_be_orphaned.len());
1102		assert!(not_canon.iter().all(|h| hashes_to_be_orphaned.contains(h)));
1103	}
1104
1105	#[tokio::test]
1106	async fn resuming_from_last_indexed_block_works() {
1107		let tmp = tempdir().expect("create a temporary directory");
1108		// Initialize storage with schema V3
1109		let builder = TestClientBuilder::new().add_extra_storage(
1110			PALLET_ETHEREUM_SCHEMA.to_vec(),
1111			Encode::encode(&EthereumStorageSchema::V3),
1112		);
1113		// Backend
1114		let backend = builder.backend();
1115		// Client
1116		let (client, _) =
1117			builder.build_with_native_executor::<frontier_template_runtime::RuntimeApi, _>(None);
1118		let client = Arc::new(client);
1119		// Overrides
1120		let storage_override = Arc::new(SchemaV3StorageOverride::new(client.clone()));
1121		// Indexer backend
1122		let indexer_backend = fc_db::sql::Backend::new(
1123			fc_db::sql::BackendConfig::Sqlite(fc_db::sql::SqliteBackendConfig {
1124				path: Path::new("sqlite:///")
1125					.join(tmp.path())
1126					.join("test.db3")
1127					.to_str()
1128					.unwrap(),
1129				create_if_missing: true,
1130				cache_size: 204800,
1131				thread_count: 4,
1132			}),
1133			100,
1134			None,
1135			storage_override.clone(),
1136		)
1137		.await
1138		.expect("indexer pool to be created");
1139
1140		// Pool
1141		let pool = indexer_backend.pool().clone();
1142
1143		// Create 5 blocks, storing them newest first.
1144		let mut parent_hash = client
1145			.hash(sp_runtime::traits::Zero::zero())
1146			.unwrap()
1147			.expect("genesis hash");
1148		let mut best_block_hashes: Vec<H256> = vec![];
1149		for _block_number in 1..=5 {
1150			let mut builder = BlockBuilderBuilder::new(&*client)
1151				.on_parent_block(parent_hash)
1152				.fetch_parent_block_number(&*client)
1153				.unwrap()
1154				.build()
1155				.unwrap();
1156			builder
1157				.push_deposit_log_digest_item(ethereum_digest())
1158				.expect("deposit log");
1159			let block = builder.build().unwrap().block;
1160			let block_hash = block.header.hash();
1161			executor::block_on(client.import(BlockOrigin::Own, block)).unwrap();
1162			best_block_hashes.insert(0, block_hash);
1163			parent_hash = block_hash;
1164		}
1165
1166		// Mark the block as canon and indexed
1167		let block_resume_at = best_block_hashes[0];
1168		sqlx::query("INSERT INTO blocks(substrate_block_hash, ethereum_block_hash, ethereum_storage_schema, block_number, is_canon) VALUES (?, ?, ?, 5, 1)")
1169			.bind(block_resume_at.as_bytes())
1170			.bind(H256::zero().as_bytes())
1171			.bind(H256::zero().as_bytes())
1172			.execute(&pool)
1173			.await
1174			.expect("sql query must succeed");
1175		sqlx::query("INSERT INTO sync_status(substrate_block_hash, status) VALUES (?, 1)")
1176			.bind(block_resume_at.as_bytes())
1177			.execute(&pool)
1178			.await
1179			.expect("sql query must succeed");
1180
1181		// Spawn indexer task
1182		let test_sync_oracle = TestSyncOracleNotSyncing {};
1183		let pubsub_notification_sinks: EthereumBlockNotificationSinks<
1184			EthereumBlockNotification<OpaqueBlock>,
1185		> = Default::default();
1186		let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
1187
1188		let pubsub_notification_sinks_inner = pubsub_notification_sinks.clone();
1189
1190		let client_inner = client.clone();
1191		tokio::task::spawn(async move {
1192			crate::sql::SyncWorker::run(
1193				client_inner,
1194				backend.clone(),
1195				Arc::new(indexer_backend),
1196				client.clone().import_notification_stream(),
1197				SyncWorkerConfig {
1198					read_notification_timeout: Duration::from_secs(10),
1199					check_indexed_blocks_interval: Duration::from_secs(60),
1200				},
1201				SyncStrategy::Parachain,
1202				Arc::new(test_sync_oracle),
1203				pubsub_notification_sinks_inner,
1204			)
1205			.await
1206		});
1207		// Enough time for indexing
1208		futures_timer::Delay::new(Duration::from_millis(1500)).await;
1209
1210		// Test the reorged chain is correctly indexed.
1211		let actual_imported_blocks =
1212			sqlx::query("SELECT substrate_block_hash, is_canon, block_number FROM blocks")
1213				.fetch_all(&pool)
1214				.await
1215				.expect("test query result")
1216				.iter()
1217				.map(|row| H256::from_slice(&row.get::<Vec<u8>, _>(0)[..]))
1218				.collect::<Vec<H256>>();
1219		let expected_imported_blocks = best_block_hashes.clone();
1220		assert_eq!(expected_imported_blocks, actual_imported_blocks);
1221	}
1222
1223	struct TestSyncOracle {
1224		sync_status: Arc<Mutex<bool>>,
1225	}
1226	impl sp_consensus::SyncOracle for TestSyncOracle {
1227		fn is_major_syncing(&self) -> bool {
1228			*self.sync_status.lock().expect("failed getting lock")
1229		}
1230		fn is_offline(&self) -> bool {
1231			false
1232		}
1233	}
1234
1235	struct TestSyncOracleWrapper {
1236		oracle: Arc<TestSyncOracle>,
1237		sync_status: Arc<Mutex<bool>>,
1238	}
1239	impl TestSyncOracleWrapper {
1240		fn new() -> Self {
1241			let sync_status = Arc::new(Mutex::new(false));
1242			TestSyncOracleWrapper {
1243				oracle: Arc::new(TestSyncOracle {
1244					sync_status: sync_status.clone(),
1245				}),
1246				sync_status,
1247			}
1248		}
1249		fn set_sync_status(&mut self, value: bool) {
1250			*self.sync_status.lock().expect("failed getting lock") = value;
1251		}
1252	}
1253
1254	#[tokio::test]
1255	async fn sync_strategy_normal_indexes_best_blocks_if_not_major_sync() {
1256		let tmp = tempdir().expect("create a temporary directory");
1257		let builder = TestClientBuilder::new().add_extra_storage(
1258			PALLET_ETHEREUM_SCHEMA.to_vec(),
1259			Encode::encode(&EthereumStorageSchema::V3),
1260		);
1261		let backend = builder.backend();
1262		let (client, _) =
1263			builder.build_with_native_executor::<frontier_template_runtime::RuntimeApi, _>(None);
1264		let client = Arc::new(client);
1265		let storage_override = Arc::new(SchemaV3StorageOverride::new(client.clone()));
1266		let indexer_backend = fc_db::sql::Backend::new(
1267			fc_db::sql::BackendConfig::Sqlite(fc_db::sql::SqliteBackendConfig {
1268				path: Path::new("sqlite:///")
1269					.join(tmp.path())
1270					.join("test.db3")
1271					.to_str()
1272					.unwrap(),
1273				create_if_missing: true,
1274				cache_size: 204800,
1275				thread_count: 4,
1276			}),
1277			100,
1278			None,
1279			storage_override.clone(),
1280		)
1281		.await
1282		.expect("indexer pool to be created");
1283
1284		// Pool
1285		let pool = indexer_backend.pool().clone();
1286
1287		// Spawn indexer task
1288		let pubsub_notification_sinks: crate::EthereumBlockNotificationSinks<
1289			crate::EthereumBlockNotification<OpaqueBlock>,
1290		> = Default::default();
1291		let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
1292		let mut sync_oracle_wrapper = TestSyncOracleWrapper::new();
1293		let sync_oracle = sync_oracle_wrapper.oracle.clone();
1294		let client_inner = client.clone();
1295		tokio::task::spawn(async move {
1296			crate::sql::SyncWorker::run(
1297				client_inner.clone(),
1298				backend.clone(),
1299				Arc::new(indexer_backend),
1300				client_inner.import_notification_stream(),
1301				SyncWorkerConfig {
1302					read_notification_timeout: Duration::from_secs(10),
1303					check_indexed_blocks_interval: Duration::from_secs(60),
1304				},
1305				SyncStrategy::Normal,
1306				Arc::new(sync_oracle),
1307				pubsub_notification_sinks.clone(),
1308			)
1309			.await
1310		});
1311		// Enough time for startup
1312		futures_timer::Delay::new(Duration::from_millis(200)).await;
1313
1314		// Import 3 blocks as part of normal operation, storing them oldest first.
1315		sync_oracle_wrapper.set_sync_status(false);
1316		let mut parent_hash = client
1317			.hash(sp_runtime::traits::Zero::zero())
1318			.unwrap()
1319			.expect("genesis hash");
1320		let mut best_block_hashes: Vec<H256> = vec![];
1321		for _block_number in 1..=3 {
1322			let mut builder = BlockBuilderBuilder::new(&*client)
1323				.on_parent_block(parent_hash)
1324				.fetch_parent_block_number(&*client)
1325				.unwrap()
1326				.build()
1327				.unwrap();
1328			builder
1329				.push_deposit_log_digest_item(ethereum_digest())
1330				.expect("deposit log");
1331			let block = builder.build().unwrap().block;
1332			let block_hash = block.header.hash();
1333
1334			executor::block_on(client.import(BlockOrigin::Own, block)).unwrap();
1335			best_block_hashes.push(block_hash);
1336			parent_hash = block_hash;
1337		}
1338
1339		// Enough time for indexing
1340		futures_timer::Delay::new(Duration::from_millis(3000)).await;
1341
1342		// Test the chain is correctly indexed.
1343		let actual_imported_blocks =
1344			sqlx::query("SELECT substrate_block_hash, is_canon, block_number FROM blocks")
1345				.fetch_all(&pool)
1346				.await
1347				.expect("test query result")
1348				.iter()
1349				.map(|row| H256::from_slice(&row.get::<Vec<u8>, _>(0)[..]))
1350				.collect::<Vec<H256>>();
1351		let expected_imported_blocks = best_block_hashes.clone();
1352		assert_eq!(expected_imported_blocks, actual_imported_blocks);
1353	}
1354
1355	#[tokio::test]
1356	async fn sync_strategy_normal_ignores_non_best_block_if_not_major_sync() {
1357		let tmp = tempdir().expect("create a temporary directory");
1358		let builder = TestClientBuilder::new().add_extra_storage(
1359			PALLET_ETHEREUM_SCHEMA.to_vec(),
1360			Encode::encode(&EthereumStorageSchema::V3),
1361		);
1362		let backend = builder.backend();
1363		let (client, _) =
1364			builder.build_with_native_executor::<frontier_template_runtime::RuntimeApi, _>(None);
1365		let client = Arc::new(client);
1366		let storage_override = Arc::new(SchemaV3StorageOverride::new(client.clone()));
1367		let indexer_backend = fc_db::sql::Backend::new(
1368			fc_db::sql::BackendConfig::Sqlite(fc_db::sql::SqliteBackendConfig {
1369				path: Path::new("sqlite:///")
1370					.join(tmp.path())
1371					.join("test.db3")
1372					.to_str()
1373					.unwrap(),
1374				create_if_missing: true,
1375				cache_size: 204800,
1376				thread_count: 4,
1377			}),
1378			100,
1379			None,
1380			storage_override.clone(),
1381		)
1382		.await
1383		.expect("indexer pool to be created");
1384
1385		// Pool
1386		let pool = indexer_backend.pool().clone();
1387
1388		// Spawn indexer task
1389		let pubsub_notification_sinks: crate::EthereumBlockNotificationSinks<
1390			crate::EthereumBlockNotification<OpaqueBlock>,
1391		> = Default::default();
1392		let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
1393		let mut sync_oracle_wrapper = TestSyncOracleWrapper::new();
1394		let sync_oracle = sync_oracle_wrapper.oracle.clone();
1395		let client_inner = client.clone();
1396		tokio::task::spawn(async move {
1397			crate::sql::SyncWorker::run(
1398				client_inner.clone(),
1399				backend.clone(),
1400				Arc::new(indexer_backend),
1401				client_inner.import_notification_stream(),
1402				SyncWorkerConfig {
1403					read_notification_timeout: Duration::from_secs(10),
1404					check_indexed_blocks_interval: Duration::from_secs(60),
1405				},
1406				SyncStrategy::Normal,
1407				Arc::new(sync_oracle),
1408				pubsub_notification_sinks.clone(),
1409			)
1410			.await
1411		});
1412		// Enough time for startup
1413		futures_timer::Delay::new(Duration::from_millis(200)).await;
1414
1415		// Import 3 blocks as part of normal operation, storing them oldest first.
1416		sync_oracle_wrapper.set_sync_status(false);
1417		let mut parent_hash = client
1418			.hash(sp_runtime::traits::Zero::zero())
1419			.unwrap()
1420			.expect("genesis hash");
1421		let mut best_block_hashes: Vec<H256> = vec![];
1422		for _block_number in 1..=3 {
1423			let mut builder = BlockBuilderBuilder::new(&*client)
1424				.on_parent_block(parent_hash)
1425				.fetch_parent_block_number(&*client)
1426				.unwrap()
1427				.build()
1428				.unwrap();
1429			builder
1430				.push_deposit_log_digest_item(ethereum_digest())
1431				.expect("deposit log");
1432			let block = builder.build().unwrap().block;
1433			let block_hash = block.header.hash();
1434
1435			executor::block_on(client.import(BlockOrigin::Own, block)).unwrap();
1436			best_block_hashes.push(block_hash);
1437			parent_hash = block_hash;
1438		}
1439
1440		// create non-best block
1441		let mut builder = BlockBuilderBuilder::new(&*client)
1442			.on_parent_block(best_block_hashes[0])
1443			.fetch_parent_block_number(&*client)
1444			.unwrap()
1445			.build()
1446			.unwrap();
1447		builder
1448			.push_deposit_log_digest_item(ethereum_digest())
1449			.expect("deposit log");
1450		let block = builder.build().unwrap().block;
1451
1452		executor::block_on(client.import(BlockOrigin::Own, block)).unwrap();
1453
1454		// Enough time for indexing
1455		futures_timer::Delay::new(Duration::from_millis(3000)).await;
1456
1457		// Test the chain is correctly indexed.
1458		let actual_imported_blocks =
1459			sqlx::query("SELECT substrate_block_hash, is_canon, block_number FROM blocks")
1460				.fetch_all(&pool)
1461				.await
1462				.expect("test query result")
1463				.iter()
1464				.map(|row| H256::from_slice(&row.get::<Vec<u8>, _>(0)[..]))
1465				.collect::<Vec<H256>>();
1466		let expected_imported_blocks = best_block_hashes.clone();
1467		assert_eq!(expected_imported_blocks, actual_imported_blocks);
1468	}
1469
1470	#[tokio::test]
1471	async fn sync_strategy_parachain_indexes_best_blocks_if_not_major_sync() {
1472		let tmp = tempdir().expect("create a temporary directory");
1473		let builder = TestClientBuilder::new().add_extra_storage(
1474			PALLET_ETHEREUM_SCHEMA.to_vec(),
1475			Encode::encode(&EthereumStorageSchema::V3),
1476		);
1477		let backend = builder.backend();
1478		let (client, _) =
1479			builder.build_with_native_executor::<frontier_template_runtime::RuntimeApi, _>(None);
1480		let client = Arc::new(client);
1481		let storage_override = Arc::new(SchemaV3StorageOverride::new(client.clone()));
1482		let indexer_backend = fc_db::sql::Backend::new(
1483			fc_db::sql::BackendConfig::Sqlite(fc_db::sql::SqliteBackendConfig {
1484				path: Path::new("sqlite:///")
1485					.join(tmp.path())
1486					.join("test.db3")
1487					.to_str()
1488					.unwrap(),
1489				create_if_missing: true,
1490				cache_size: 204800,
1491				thread_count: 4,
1492			}),
1493			100,
1494			None,
1495			storage_override.clone(),
1496		)
1497		.await
1498		.expect("indexer pool to be created");
1499
1500		// Pool
1501		let pool = indexer_backend.pool().clone();
1502
1503		// Spawn indexer task
1504		let pubsub_notification_sinks: crate::EthereumBlockNotificationSinks<
1505			crate::EthereumBlockNotification<OpaqueBlock>,
1506		> = Default::default();
1507		let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
1508		let mut sync_oracle_wrapper = TestSyncOracleWrapper::new();
1509		let sync_oracle = sync_oracle_wrapper.oracle.clone();
1510		let client_inner = client.clone();
1511		tokio::task::spawn(async move {
1512			crate::sql::SyncWorker::run(
1513				client_inner.clone(),
1514				backend.clone(),
1515				Arc::new(indexer_backend),
1516				client_inner.import_notification_stream(),
1517				SyncWorkerConfig {
1518					read_notification_timeout: Duration::from_secs(10),
1519					check_indexed_blocks_interval: Duration::from_secs(60),
1520				},
1521				SyncStrategy::Parachain,
1522				Arc::new(sync_oracle),
1523				pubsub_notification_sinks.clone(),
1524			)
1525			.await
1526		});
1527		// Enough time for startup
1528		futures_timer::Delay::new(Duration::from_millis(200)).await;
1529
1530		// Import 3 blocks as part of normal operation, storing them oldest first.
1531		sync_oracle_wrapper.set_sync_status(false);
1532		let mut parent_hash = client
1533			.hash(sp_runtime::traits::Zero::zero())
1534			.unwrap()
1535			.expect("genesis hash");
1536		let mut best_block_hashes: Vec<H256> = vec![];
1537		for _block_number in 1..=3 {
1538			let mut builder = BlockBuilderBuilder::new(&*client)
1539				.on_parent_block(parent_hash)
1540				.fetch_parent_block_number(&*client)
1541				.unwrap()
1542				.build()
1543				.unwrap();
1544			builder
1545				.push_deposit_log_digest_item(ethereum_digest())
1546				.expect("deposit log");
1547			let block = builder.build().unwrap().block;
1548			let block_hash = block.header.hash();
1549
1550			executor::block_on(client.import(BlockOrigin::Own, block)).unwrap();
1551			best_block_hashes.push(block_hash);
1552			parent_hash = block_hash;
1553		}
1554
1555		// Enough time for indexing
1556		futures_timer::Delay::new(Duration::from_millis(3000)).await;
1557
1558		// Test the chain is correctly indexed.
1559		let actual_imported_blocks =
1560			sqlx::query("SELECT substrate_block_hash, is_canon, block_number FROM blocks")
1561				.fetch_all(&pool)
1562				.await
1563				.expect("test query result")
1564				.iter()
1565				.map(|row| H256::from_slice(&row.get::<Vec<u8>, _>(0)[..]))
1566				.collect::<Vec<H256>>();
1567		let expected_imported_blocks = best_block_hashes.clone();
1568		assert_eq!(expected_imported_blocks, actual_imported_blocks);
1569	}
1570
1571	#[tokio::test]
1572	async fn sync_strategy_parachain_ignores_non_best_blocks_if_not_major_sync() {
1573		let tmp = tempdir().expect("create a temporary directory");
1574		let builder = TestClientBuilder::new().add_extra_storage(
1575			PALLET_ETHEREUM_SCHEMA.to_vec(),
1576			Encode::encode(&EthereumStorageSchema::V3),
1577		);
1578		let backend = builder.backend();
1579		let (client, _) =
1580			builder.build_with_native_executor::<frontier_template_runtime::RuntimeApi, _>(None);
1581		let client = Arc::new(client);
1582		let storage_override = Arc::new(SchemaV3StorageOverride::new(client.clone()));
1583		let indexer_backend = fc_db::sql::Backend::new(
1584			fc_db::sql::BackendConfig::Sqlite(fc_db::sql::SqliteBackendConfig {
1585				path: Path::new("sqlite:///")
1586					.join(tmp.path())
1587					.join("test.db3")
1588					.to_str()
1589					.unwrap(),
1590				create_if_missing: true,
1591				cache_size: 204800,
1592				thread_count: 4,
1593			}),
1594			100,
1595			None,
1596			storage_override.clone(),
1597		)
1598		.await
1599		.expect("indexer pool to be created");
1600
1601		// Pool
1602		let pool = indexer_backend.pool().clone();
1603
1604		// Spawn indexer task
1605		let pubsub_notification_sinks: crate::EthereumBlockNotificationSinks<
1606			crate::EthereumBlockNotification<OpaqueBlock>,
1607		> = Default::default();
1608		let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
1609		let mut sync_oracle_wrapper = TestSyncOracleWrapper::new();
1610		let sync_oracle = sync_oracle_wrapper.oracle.clone();
1611		let client_inner = client.clone();
1612		tokio::task::spawn(async move {
1613			crate::sql::SyncWorker::run(
1614				client_inner.clone(),
1615				backend.clone(),
1616				Arc::new(indexer_backend),
1617				client_inner.import_notification_stream(),
1618				SyncWorkerConfig {
1619					read_notification_timeout: Duration::from_secs(10),
1620					check_indexed_blocks_interval: Duration::from_secs(60),
1621				},
1622				SyncStrategy::Parachain,
1623				Arc::new(sync_oracle),
1624				pubsub_notification_sinks.clone(),
1625			)
1626			.await
1627		});
1628		// Enough time for startup
1629		futures_timer::Delay::new(Duration::from_millis(200)).await;
1630
1631		// Import 3 blocks as part of normal operation, storing them oldest first.
1632		sync_oracle_wrapper.set_sync_status(false);
1633		let mut parent_hash = client
1634			.hash(sp_runtime::traits::Zero::zero())
1635			.unwrap()
1636			.expect("genesis hash");
1637		let mut best_block_hashes: Vec<H256> = vec![];
1638		for _block_number in 1..=3 {
1639			let mut builder = BlockBuilderBuilder::new(&*client)
1640				.on_parent_block(parent_hash)
1641				.fetch_parent_block_number(&*client)
1642				.unwrap()
1643				.build()
1644				.unwrap();
1645			builder
1646				.push_deposit_log_digest_item(ethereum_digest())
1647				.expect("deposit log");
1648			let block = builder.build().unwrap().block;
1649			let block_hash = block.header.hash();
1650
1651			executor::block_on(client.import(BlockOrigin::Own, block)).unwrap();
1652			best_block_hashes.push(block_hash);
1653			parent_hash = block_hash;
1654		}
1655
1656		// create non-best block
1657		let mut builder = BlockBuilderBuilder::new(&*client)
1658			.on_parent_block(best_block_hashes[0])
1659			.fetch_parent_block_number(&*client)
1660			.unwrap()
1661			.build()
1662			.unwrap();
1663		builder
1664			.push_deposit_log_digest_item(ethereum_digest())
1665			.expect("deposit log");
1666		let block = builder.build().unwrap().block;
1667
1668		executor::block_on(client.import(BlockOrigin::Own, block)).unwrap();
1669
1670		// Enough time for indexing
1671		futures_timer::Delay::new(Duration::from_millis(3000)).await;
1672
1673		// Test the chain is correctly indexed.
1674		let actual_imported_blocks =
1675			sqlx::query("SELECT substrate_block_hash, is_canon, block_number FROM blocks")
1676				.fetch_all(&pool)
1677				.await
1678				.expect("test query result")
1679				.iter()
1680				.map(|row| H256::from_slice(&row.get::<Vec<u8>, _>(0)[..]))
1681				.collect::<Vec<H256>>();
1682		let expected_imported_blocks = best_block_hashes.clone();
1683		assert_eq!(expected_imported_blocks, actual_imported_blocks);
1684	}
1685
1686	#[tokio::test]
1687	async fn sync_strategy_normal_ignores_best_blocks_if_major_sync() {
1688		let tmp = tempdir().expect("create a temporary directory");
1689		let builder = TestClientBuilder::new().add_extra_storage(
1690			PALLET_ETHEREUM_SCHEMA.to_vec(),
1691			Encode::encode(&EthereumStorageSchema::V3),
1692		);
1693		let backend = builder.backend();
1694		let (client, _) =
1695			builder.build_with_native_executor::<frontier_template_runtime::RuntimeApi, _>(None);
1696		let client = Arc::new(client);
1697		let storage_override = Arc::new(SchemaV3StorageOverride::new(client.clone()));
1698		let indexer_backend = fc_db::sql::Backend::new(
1699			fc_db::sql::BackendConfig::Sqlite(fc_db::sql::SqliteBackendConfig {
1700				path: Path::new("sqlite:///")
1701					.join(tmp.path())
1702					.join("test.db3")
1703					.to_str()
1704					.unwrap(),
1705				create_if_missing: true,
1706				cache_size: 204800,
1707				thread_count: 4,
1708			}),
1709			100,
1710			None,
1711			storage_override.clone(),
1712		)
1713		.await
1714		.expect("indexer pool to be created");
1715
1716		// Pool
1717		let pool = indexer_backend.pool().clone();
1718
1719		// Spawn indexer task
1720		let pubsub_notification_sinks: crate::EthereumBlockNotificationSinks<
1721			crate::EthereumBlockNotification<OpaqueBlock>,
1722		> = Default::default();
1723		let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
1724		let mut sync_oracle_wrapper = TestSyncOracleWrapper::new();
1725		let sync_oracle = sync_oracle_wrapper.oracle.clone();
1726		let client_inner = client.clone();
1727		tokio::task::spawn(async move {
1728			crate::sql::SyncWorker::run(
1729				client_inner.clone(),
1730				backend.clone(),
1731				Arc::new(indexer_backend),
1732				client_inner.import_notification_stream(),
1733				SyncWorkerConfig {
1734					read_notification_timeout: Duration::from_secs(10),
1735					check_indexed_blocks_interval: Duration::from_secs(60),
1736				},
1737				SyncStrategy::Normal,
1738				Arc::new(sync_oracle),
1739				pubsub_notification_sinks.clone(),
1740			)
1741			.await
1742		});
1743		// Enough time for startup
1744		futures_timer::Delay::new(Duration::from_millis(200)).await;
1745
1746		// Import 3 blocks as part of initial network sync, storing them oldest first.
1747		sync_oracle_wrapper.set_sync_status(true);
1748		let mut parent_hash = client
1749			.hash(sp_runtime::traits::Zero::zero())
1750			.unwrap()
1751			.expect("genesis hash");
1752		let mut best_block_hashes: Vec<H256> = vec![];
1753		for _block_number in 1..=3 {
1754			let mut builder = BlockBuilderBuilder::new(&*client)
1755				.on_parent_block(parent_hash)
1756				.fetch_parent_block_number(&*client)
1757				.unwrap()
1758				.build()
1759				.unwrap();
1760			builder
1761				.push_deposit_log_digest_item(ethereum_digest())
1762				.expect("deposit log");
1763			let block = builder.build().unwrap().block;
1764			let block_hash = block.header.hash();
1765
1766			executor::block_on(client.import(BlockOrigin::NetworkInitialSync, block)).unwrap();
1767			best_block_hashes.push(block_hash);
1768			parent_hash = block_hash;
1769		}
1770
1771		// Enough time for indexing
1772		futures_timer::Delay::new(Duration::from_millis(3000)).await;
1773
1774		// Test the chain is correctly indexed.
1775		let actual_imported_blocks =
1776			sqlx::query("SELECT substrate_block_hash, is_canon, block_number FROM blocks")
1777				.fetch_all(&pool)
1778				.await
1779				.expect("test query result")
1780				.iter()
1781				.map(|row| H256::from_slice(&row.get::<Vec<u8>, _>(0)[..]))
1782				.collect::<Vec<H256>>();
1783		let expected_imported_blocks = Vec::<H256>::new();
1784		assert_eq!(expected_imported_blocks, actual_imported_blocks);
1785	}
1786
1787	#[tokio::test]
1788	async fn sync_strategy_parachain_ignores_best_blocks_if_major_sync() {
1789		let tmp = tempdir().expect("create a temporary directory");
1790		let builder = TestClientBuilder::new().add_extra_storage(
1791			PALLET_ETHEREUM_SCHEMA.to_vec(),
1792			Encode::encode(&EthereumStorageSchema::V3),
1793		);
1794		let backend = builder.backend();
1795		let (client, _) =
1796			builder.build_with_native_executor::<frontier_template_runtime::RuntimeApi, _>(None);
1797		let client = Arc::new(client);
1798		let storage_override = Arc::new(SchemaV3StorageOverride::new(client.clone()));
1799		let indexer_backend = fc_db::sql::Backend::new(
1800			fc_db::sql::BackendConfig::Sqlite(fc_db::sql::SqliteBackendConfig {
1801				path: Path::new("sqlite:///")
1802					.join(tmp.path())
1803					.join("test.db3")
1804					.to_str()
1805					.unwrap(),
1806				create_if_missing: true,
1807				cache_size: 204800,
1808				thread_count: 4,
1809			}),
1810			100,
1811			None,
1812			storage_override.clone(),
1813		)
1814		.await
1815		.expect("indexer pool to be created");
1816
1817		// Pool
1818		let pool = indexer_backend.pool().clone();
1819
1820		// Spawn indexer task
1821		let pubsub_notification_sinks: crate::EthereumBlockNotificationSinks<
1822			crate::EthereumBlockNotification<OpaqueBlock>,
1823		> = Default::default();
1824		let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
1825		let mut sync_oracle_wrapper = TestSyncOracleWrapper::new();
1826		let sync_oracle = sync_oracle_wrapper.oracle.clone();
1827		let client_inner = client.clone();
1828		tokio::task::spawn(async move {
1829			crate::sql::SyncWorker::run(
1830				client_inner.clone(),
1831				backend.clone(),
1832				Arc::new(indexer_backend),
1833				client_inner.import_notification_stream(),
1834				SyncWorkerConfig {
1835					read_notification_timeout: Duration::from_secs(10),
1836					check_indexed_blocks_interval: Duration::from_secs(60),
1837				},
1838				SyncStrategy::Parachain,
1839				Arc::new(sync_oracle),
1840				pubsub_notification_sinks.clone(),
1841			)
1842			.await
1843		});
1844		// Enough time for startup
1845		futures_timer::Delay::new(Duration::from_millis(200)).await;
1846
1847		// Import 3 blocks as part of initial network sync, storing them oldest first.
1848		sync_oracle_wrapper.set_sync_status(true);
1849		let mut parent_hash = client
1850			.hash(sp_runtime::traits::Zero::zero())
1851			.unwrap()
1852			.expect("genesis hash");
1853		let mut best_block_hashes: Vec<H256> = vec![];
1854		for _block_number in 1..=3 {
1855			let mut builder = BlockBuilderBuilder::new(&*client)
1856				.on_parent_block(parent_hash)
1857				.fetch_parent_block_number(&*client)
1858				.unwrap()
1859				.build()
1860				.unwrap();
1861			builder
1862				.push_deposit_log_digest_item(ethereum_digest())
1863				.expect("deposit log");
1864			let block = builder.build().unwrap().block;
1865			let block_hash = block.header.hash();
1866
1867			executor::block_on(client.import(BlockOrigin::NetworkInitialSync, block)).unwrap();
1868			best_block_hashes.push(block_hash);
1869			parent_hash = block_hash;
1870		}
1871
1872		// Enough time for indexing
1873		futures_timer::Delay::new(Duration::from_millis(3000)).await;
1874
1875		// Test the chain is correctly indexed.
1876		let actual_imported_blocks =
1877			sqlx::query("SELECT substrate_block_hash, is_canon, block_number FROM blocks")
1878				.fetch_all(&pool)
1879				.await
1880				.expect("test query result")
1881				.iter()
1882				.map(|row| H256::from_slice(&row.get::<Vec<u8>, _>(0)[..]))
1883				.collect::<Vec<H256>>();
1884		let expected_imported_blocks = Vec::<H256>::new();
1885		assert_eq!(expected_imported_blocks, actual_imported_blocks);
1886	}
1887}