1use std::{ops::DerefMut, sync::Arc, time::Duration};
20
21use futures::prelude::*;
22use sc_client_api::backend::{Backend as BackendT, StorageProvider};
24use sp_api::ProvideRuntimeApi;
25use sp_blockchain::{Backend, HeaderBackend};
26use sp_consensus::SyncOracle;
27use sp_core::H256;
28use sp_runtime::traits::{Block as BlockT, Header as HeaderT, UniqueSaturatedInto};
29use fp_rpc::EthereumRuntimeRPCApi;
31
32use crate::{EthereumBlockNotification, EthereumBlockNotificationSinks, SyncStrategy};
33
34#[derive(Debug)]
36pub enum WorkerCommand {
37 ResumeSync,
39 IndexLeaves(Vec<H256>),
41 IndexBestBlock(H256),
43 Canonicalize {
45 common: H256,
46 enacted: Vec<H256>,
47 retracted: Vec<H256>,
48 },
49 CheckIndexedBlocks,
53}
54
55pub struct SyncWorkerConfig {
57 pub check_indexed_blocks_interval: Duration,
58 pub read_notification_timeout: Duration,
59}
60
61pub struct SyncWorker<Block, Backend, Client> {
63 _phantom: std::marker::PhantomData<(Block, Backend, Client)>,
64}
65
66impl<Block, Backend, Client> SyncWorker<Block, Backend, Client>
67where
68 Block: BlockT<Hash = H256>,
69 Client: ProvideRuntimeApi<Block>,
70 Client::Api: EthereumRuntimeRPCApi<Block>,
71 Client: HeaderBackend<Block> + StorageProvider<Block, Backend> + 'static,
72 Backend: BackendT<Block> + 'static,
73{
74 pub async fn spawn_worker(
77 client: Arc<Client>,
78 substrate_backend: Arc<Backend>,
79 indexer_backend: Arc<fc_db::sql::Backend<Block>>,
80 pubsub_notification_sinks: Arc<
81 EthereumBlockNotificationSinks<EthereumBlockNotification<Block>>,
82 >,
83 ) -> tokio::sync::mpsc::Sender<WorkerCommand> {
84 let (tx, mut rx) = tokio::sync::mpsc::channel(100);
85 tokio::task::spawn(async move {
86 while let Some(cmd) = rx.recv().await {
87 log::debug!(target: "frontier-sql", "💬 Recv Worker Command {cmd:?}");
88 match cmd {
89 WorkerCommand::ResumeSync => {
90 match indexer_backend.last_indexed_canon_block().await.ok() {
92 Some(last_block_hash) => {
93 log::debug!(target: "frontier-sql", "Resume from last block {last_block_hash:?}");
94 if let Some(parent_hash) = client
95 .header(last_block_hash)
96 .ok()
97 .flatten()
98 .map(|header| *header.parent_hash())
99 {
100 index_canonical_block_and_ancestors(
101 client.clone(),
102 substrate_backend.clone(),
103 indexer_backend.clone(),
104 parent_hash,
105 )
106 .await;
107 }
108 }
109 None => {
110 index_genesis_block(client.clone(), indexer_backend.clone()).await;
111 }
112 };
113 }
114 WorkerCommand::IndexLeaves(leaves) => {
115 for leaf in leaves {
116 index_block_and_ancestors(
117 client.clone(),
118 substrate_backend.clone(),
119 indexer_backend.clone(),
120 leaf,
121 )
122 .await;
123 }
124 }
125 WorkerCommand::IndexBestBlock(block_hash) => {
126 index_canonical_block_and_ancestors(
127 client.clone(),
128 substrate_backend.clone(),
129 indexer_backend.clone(),
130 block_hash,
131 )
132 .await;
133 let sinks = &mut pubsub_notification_sinks.lock();
134 for sink in sinks.iter() {
135 let _ = sink.unbounded_send(EthereumBlockNotification {
136 is_new_best: true,
137 hash: block_hash,
138 });
139 }
140 }
141 WorkerCommand::Canonicalize {
142 common,
143 enacted,
144 retracted,
145 } => {
146 canonicalize_blocks(indexer_backend.clone(), common, enacted, retracted)
147 .await;
148 }
149 WorkerCommand::CheckIndexedBlocks => {
150 if let Some(block_hash) =
152 indexer_backend.get_first_pending_canon_block().await
153 {
154 log::debug!(target: "frontier-sql", "Indexing pending canonical block {block_hash:?}");
155 indexer_backend.index_block_logs(block_hash).await;
156 }
157
158 index_missing_blocks(
160 client.clone(),
161 substrate_backend.clone(),
162 indexer_backend.clone(),
163 )
164 .await;
165 }
166 }
167 }
168 });
169
170 tx
171 }
172
173 pub async fn run(
175 client: Arc<Client>,
176 substrate_backend: Arc<Backend>,
177 indexer_backend: Arc<fc_db::sql::Backend<Block>>,
178 import_notifications: sc_client_api::ImportNotifications<Block>,
179 worker_config: SyncWorkerConfig,
180 _sync_strategy: SyncStrategy,
181 sync_oracle: Arc<dyn SyncOracle + Send + Sync + 'static>,
182 pubsub_notification_sinks: Arc<
183 EthereumBlockNotificationSinks<EthereumBlockNotification<Block>>,
184 >,
185 ) {
186 let tx = Self::spawn_worker(
187 client.clone(),
188 substrate_backend.clone(),
189 indexer_backend.clone(),
190 pubsub_notification_sinks.clone(),
191 )
192 .await;
193
194 tx.send(WorkerCommand::ResumeSync).await.ok();
196 let tx2 = tx.clone();
198 tokio::task::spawn(async move {
199 loop {
200 futures_timer::Delay::new(worker_config.check_indexed_blocks_interval).await;
201 tx2.send(WorkerCommand::CheckIndexedBlocks).await.ok();
202 }
203 });
204
205 let mut notifications = import_notifications.fuse();
207 loop {
208 let mut timeout =
209 futures_timer::Delay::new(worker_config.read_notification_timeout).fuse();
210 futures::select! {
211 _ = timeout => {
212 if let Ok(leaves) = substrate_backend.blockchain().leaves() {
213 tx.send(WorkerCommand::IndexLeaves(leaves)).await.ok();
214 }
215 if sync_oracle.is_major_syncing() {
216 let sinks = &mut pubsub_notification_sinks.lock();
217 if !sinks.is_empty() {
218 *sinks.deref_mut() = vec![];
219 }
220 }
221 }
222 notification = notifications.next() => if let Some(notification) = notification {
223 log::debug!(
224 target: "frontier-sql",
225 "📣 New notification: #{} {:?} (parent {}), best = {}",
226 notification.header.number(),
227 notification.hash,
228 notification.header.parent_hash(),
229 notification.is_new_best,
230 );
231 if notification.is_new_best {
232 if let Some(tree_route) = notification.tree_route {
233 log::debug!(
234 target: "frontier-sql",
235 "🔀 Re-org happened at new best {}, proceeding to canonicalize db",
236 notification.hash
237 );
238 let retracted = tree_route
239 .retracted()
240 .iter()
241 .map(|hash_and_number| hash_and_number.hash)
242 .collect::<Vec<_>>();
243 let enacted = tree_route
244 .enacted()
245 .iter()
246 .map(|hash_and_number| hash_and_number.hash)
247 .collect::<Vec<_>>();
248
249 let common = tree_route.common_block().hash;
250 tx.send(WorkerCommand::Canonicalize {
251 common,
252 enacted,
253 retracted,
254 }).await.ok();
255 }
256
257 tx.send(WorkerCommand::IndexBestBlock(notification.hash)).await.ok();
258 }
259 }
260 }
261 }
262 }
263}
264
265async fn index_block_and_ancestors<Block, Backend, Client>(
270 client: Arc<Client>,
271 substrate_backend: Arc<Backend>,
272 indexer_backend: Arc<fc_db::sql::Backend<Block>>,
273 hash: H256,
274) where
275 Block: BlockT<Hash = H256>,
276 Client: ProvideRuntimeApi<Block>,
277 Client::Api: EthereumRuntimeRPCApi<Block>,
278 Client: HeaderBackend<Block> + StorageProvider<Block, Backend> + 'static,
279 Backend: BackendT<Block> + 'static,
280{
281 let blockchain_backend = substrate_backend.blockchain();
282 let mut hashes = vec![hash];
283 while let Some(hash) = hashes.pop() {
284 if hash == H256::default() {
286 break;
287 }
288
289 if indexer_backend.is_block_indexed(hash).await {
291 log::debug!(target: "frontier-sql", "🔴 Block {hash:?} already imported");
292 break;
293 }
294
295 log::debug!(target: "frontier-sql", "🛠️ Importing {hash:?}");
296 let _ = indexer_backend
297 .insert_block_metadata(client.clone(), hash)
298 .await
299 .map_err(|e| {
300 log::error!(target: "frontier-sql", "{e}");
301 });
302 log::debug!(target: "frontier-sql", "Inserted block metadata");
303 indexer_backend.index_block_logs(hash).await;
304
305 if let Ok(Some(header)) = blockchain_backend.header(hash) {
306 let parent_hash = header.parent_hash();
307 hashes.push(*parent_hash);
308 }
309 }
310}
311
312async fn index_canonical_block_and_ancestors<Block, Backend, Client>(
317 client: Arc<Client>,
318 substrate_backend: Arc<Backend>,
319 indexer_backend: Arc<fc_db::sql::Backend<Block>>,
320 hash: H256,
321) where
322 Block: BlockT<Hash = H256>,
323 Client: ProvideRuntimeApi<Block>,
324 Client::Api: EthereumRuntimeRPCApi<Block>,
325 Client: HeaderBackend<Block> + StorageProvider<Block, Backend> + 'static,
326 Backend: BackendT<Block> + 'static,
327{
328 let blockchain_backend = substrate_backend.blockchain();
329 let mut hashes = vec![hash];
330 while let Some(hash) = hashes.pop() {
331 if hash == H256::default() {
333 break;
334 }
335
336 let status = indexer_backend.block_indexed_and_canon_status(hash).await;
337
338 if status.indexed && status.canon {
340 log::debug!(target: "frontier-sql", "🔴 Block {hash:?} already imported");
341 break;
342 }
343
344 if status.indexed && !status.canon {
346 if let Err(err) = indexer_backend.set_block_as_canon(hash).await {
347 log::error!(target: "frontier-sql", "Failed setting block {hash:?} as canon: {err:?}");
348 continue;
349 }
350
351 log::debug!(target: "frontier-sql", "🛠️ Marked block as canon {hash:?}");
352
353 if let Ok(Some(header)) = blockchain_backend.header(hash) {
355 let parent_hash = header.parent_hash();
356 hashes.push(*parent_hash);
357 }
358 continue;
359 }
360
361 log::debug!(target: "frontier-sql", "🛠️ Importing {hash:?}");
363 let _ = indexer_backend
364 .insert_block_metadata(client.clone(), hash)
365 .await
366 .map_err(|e| {
367 log::error!(target: "frontier-sql", "{e}");
368 });
369 log::debug!(target: "frontier-sql", "Inserted block metadata {hash:?}");
370 indexer_backend.index_block_logs(hash).await;
371
372 if let Ok(Some(header)) = blockchain_backend.header(hash) {
373 let parent_hash = header.parent_hash();
374 hashes.push(*parent_hash);
375 }
376 }
377}
378
379async fn canonicalize_blocks<Block: BlockT<Hash = H256>>(
382 indexer_backend: Arc<fc_db::sql::Backend<Block>>,
383 common: H256,
384 enacted: Vec<H256>,
385 retracted: Vec<H256>,
386) {
387 if (indexer_backend.canonicalize(&retracted, &enacted).await).is_err() {
388 log::error!(
389 target: "frontier-sql",
390 "❌ Canonicalization failed for common ancestor {common}, potentially corrupted db. Retracted: {retracted:?}, Enacted: {enacted:?}"
391 );
392 }
393}
394
395async fn index_missing_blocks<Block, Client, Backend>(
399 client: Arc<Client>,
400 substrate_backend: Arc<Backend>,
401 indexer_backend: Arc<fc_db::sql::Backend<Block>>,
402) where
403 Block: BlockT<Hash = H256>,
404 Client: ProvideRuntimeApi<Block>,
405 Client::Api: EthereumRuntimeRPCApi<Block>,
406 Client: HeaderBackend<Block> + StorageProvider<Block, Backend> + 'static,
407 Backend: BackendT<Block> + 'static,
408{
409 if let Some(block_number) = indexer_backend.get_first_missing_canon_block().await {
410 log::debug!(target: "frontier-sql", "Missing {block_number:?}");
411 if block_number == 0 {
412 index_genesis_block(client.clone(), indexer_backend.clone()).await;
413 } else if let Ok(Some(block_hash)) = client.hash(block_number.unique_saturated_into()) {
414 log::debug!(
415 target: "frontier-sql",
416 "Indexing past canonical blocks from #{block_number} {block_hash:?}"
417 );
418 index_canonical_block_and_ancestors(
419 client.clone(),
420 substrate_backend.clone(),
421 indexer_backend.clone(),
422 block_hash,
423 )
424 .await;
425 } else {
426 log::debug!(target: "frontier-sql", "Failed retrieving hash for block #{block_number}");
427 }
428 }
429}
430
431async fn index_genesis_block<Block, Client, Backend>(
435 client: Arc<Client>,
436 indexer_backend: Arc<fc_db::sql::Backend<Block>>,
437) where
438 Block: BlockT<Hash = H256>,
439 Client: ProvideRuntimeApi<Block>,
440 Client::Api: EthereumRuntimeRPCApi<Block>,
441 Client: HeaderBackend<Block> + StorageProvider<Block, Backend> + 'static,
442 Backend: BackendT<Block> + 'static,
443{
444 log::info!(
445 target: "frontier-sql",
446 "Import genesis",
447 );
448 if let Ok(Some(substrate_genesis_hash)) = indexer_backend
449 .insert_genesis_block_metadata(client.clone())
450 .await
451 .map_err(|e| {
452 log::error!(target: "frontier-sql", "💔 Cannot sync genesis block: {e}");
453 }) {
454 log::debug!(target: "frontier-sql", "Imported genesis block {substrate_genesis_hash:?}");
455 }
456}
457
458#[cfg(test)]
459mod test {
460 use super::*;
461
462 use std::{
463 path::Path,
464 sync::{Arc, Mutex},
465 };
466
467 use futures::executor;
468 use scale_codec::Encode;
469 use sqlx::Row;
470 use tempfile::tempdir;
471 use sc_block_builder::BlockBuilderBuilder;
473 use sc_client_api::{BlockchainEvents, HeaderBackend};
474 use sp_consensus::BlockOrigin;
475 use sp_core::{H160, H256, U256};
476 use sp_io::hashing::twox_128;
477 use sp_runtime::{
478 generic::{DigestItem, Header},
479 traits::BlakeTwo256,
480 };
481 use substrate_test_runtime_client::{
482 prelude::*, DefaultTestClientBuilderExt, TestClientBuilder, TestClientBuilderExt,
483 };
484 use fc_storage::SchemaV3StorageOverride;
486 use fp_storage::{constants::*, EthereumStorageSchema, PALLET_ETHEREUM_SCHEMA};
487
488 type OpaqueBlock = sp_runtime::generic::Block<
489 Header<u64, BlakeTwo256>,
490 substrate_test_runtime_client::runtime::Extrinsic,
491 >;
492
493 struct TestSyncOracleNotSyncing;
494 impl sp_consensus::SyncOracle for TestSyncOracleNotSyncing {
495 fn is_major_syncing(&self) -> bool {
496 false
497 }
498 fn is_offline(&self) -> bool {
499 false
500 }
501 }
502
503 fn storage_prefix_build(module: &[u8], storage: &[u8]) -> Vec<u8> {
504 [twox_128(module), twox_128(storage)].concat().to_vec()
505 }
506
507 fn ethereum_digest() -> DigestItem {
508 let partial_header = ethereum::PartialHeader {
509 parent_hash: H256::random(),
510 beneficiary: H160::default(),
511 state_root: H256::default(),
512 receipts_root: H256::default(),
513 logs_bloom: ethereum_types::Bloom::default(),
514 difficulty: U256::zero(),
515 number: U256::zero(),
516 gas_limit: U256::zero(),
517 gas_used: U256::zero(),
518 timestamp: 0u64,
519 extra_data: Vec::new(),
520 mix_hash: H256::default(),
521 nonce: ethereum_types::H64::default(),
522 };
523 let ethereum_transactions: Vec<ethereum::TransactionV3> = vec![];
524 let ethereum_block = ethereum::Block::new(partial_header, ethereum_transactions, vec![]);
525 DigestItem::Consensus(
526 fp_consensus::FRONTIER_ENGINE_ID,
527 fp_consensus::PostLog::Hashes(fp_consensus::Hashes::from_block(ethereum_block))
528 .encode(),
529 )
530 }
531
532 #[tokio::test]
533 async fn interval_indexing_works() {
534 let tmp = tempdir().expect("create a temporary directory");
535 let builder = TestClientBuilder::new().add_extra_storage(
537 PALLET_ETHEREUM_SCHEMA.to_vec(),
538 Encode::encode(&EthereumStorageSchema::V3),
539 );
540 let backend = builder.backend();
542 let (client, _) =
544 builder.build_with_native_executor::<frontier_template_runtime::RuntimeApi, _>(None);
545 let client = Arc::new(client);
546 let storage_override = Arc::new(SchemaV3StorageOverride::new(client.clone()));
548 let indexer_backend = fc_db::sql::Backend::new(
550 fc_db::sql::BackendConfig::Sqlite(fc_db::sql::SqliteBackendConfig {
551 path: Path::new("sqlite:///")
552 .join(tmp.path())
553 .join("test.db3")
554 .to_str()
555 .unwrap(),
556 create_if_missing: true,
557 cache_size: 204800,
558 thread_count: 4,
559 }),
560 100,
561 None,
562 storage_override.clone(),
563 )
564 .await
565 .expect("indexer pool to be created");
566 let pool = indexer_backend.pool().clone();
568
569 let mut logs: Vec<(i32, fc_db::sql::Log)> = vec![];
571 for block_number in 1..11 {
572 let chain = client.chain_info();
574 let mut builder = BlockBuilderBuilder::new(&*client)
575 .on_parent_block(chain.best_hash)
576 .with_parent_block_number(chain.best_number)
577 .build()
578 .unwrap();
579 builder
580 .push_deposit_log_digest_item(ethereum_digest())
581 .expect("deposit log");
582 let address_1 = H160::repeat_byte(0x01);
584 let address_2 = H160::repeat_byte(0x02);
585 let topics_1_1 = H256::repeat_byte(0x01);
587 let topics_1_2 = H256::repeat_byte(0x02);
588 let topics_2_1 = H256::repeat_byte(0x03);
589 let topics_2_2 = H256::repeat_byte(0x04);
590 let topics_2_3 = H256::repeat_byte(0x05);
591 let topics_2_4 = H256::repeat_byte(0x06);
592
593 let receipts = Encode::encode(&vec![
594 ethereum::ReceiptV4::EIP1559(ethereum::EIP1559ReceiptData {
595 status_code: 0u8,
596 used_gas: U256::zero(),
597 logs_bloom: ethereum_types::Bloom::zero(),
598 logs: vec![ethereum::Log {
599 address: address_1,
600 topics: vec![topics_1_1, topics_1_2],
601 data: vec![],
602 }],
603 }),
604 ethereum::ReceiptV4::EIP1559(ethereum::EIP1559ReceiptData {
605 status_code: 0u8,
606 used_gas: U256::zero(),
607 logs_bloom: ethereum_types::Bloom::zero(),
608 logs: vec![ethereum::Log {
609 address: address_2,
610 topics: vec![topics_2_1, topics_2_2, topics_2_3, topics_2_4],
611 data: vec![],
612 }],
613 }),
614 ]);
615 builder
616 .push_storage_change(
617 storage_prefix_build(PALLET_ETHEREUM, ETHEREUM_CURRENT_RECEIPTS),
618 Some(receipts),
619 )
620 .unwrap();
621 let block = builder.build().unwrap().block;
622 let block_hash = block.header.hash();
623 executor::block_on(client.import(BlockOrigin::Own, block)).unwrap();
624 logs.push((
625 block_number,
626 fc_db::sql::Log {
627 address: address_1.as_bytes().to_owned(),
628 topic_1: Some(topics_1_1.as_bytes().to_owned()),
629 topic_2: Some(topics_1_2.as_bytes().to_owned()),
630 topic_3: None,
631 topic_4: None,
632 log_index: 0i32,
633 transaction_index: 0i32,
634 substrate_block_hash: block_hash.as_bytes().to_owned(),
635 },
636 ));
637 logs.push((
638 block_number,
639 fc_db::sql::Log {
640 address: address_2.as_bytes().to_owned(),
641 topic_1: Some(topics_2_1.as_bytes().to_owned()),
642 topic_2: Some(topics_2_2.as_bytes().to_owned()),
643 topic_3: Some(topics_2_3.as_bytes().to_owned()),
644 topic_4: Some(topics_2_4.as_bytes().to_owned()),
645 log_index: 0i32,
646 transaction_index: 1i32,
647 substrate_block_hash: block_hash.as_bytes().to_owned(),
648 },
649 ));
650 }
651
652 let test_sync_oracle = TestSyncOracleNotSyncing {};
653 let pubsub_notification_sinks: EthereumBlockNotificationSinks<
654 EthereumBlockNotification<OpaqueBlock>,
655 > = Default::default();
656 let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
657
658 let pubsub_notification_sinks_inner = pubsub_notification_sinks.clone();
659
660 tokio::task::spawn(async move {
664 crate::sql::SyncWorker::run(
665 client.clone(),
666 backend.clone(),
667 Arc::new(indexer_backend),
668 client.clone().import_notification_stream(),
669 SyncWorkerConfig {
670 read_notification_timeout: Duration::from_secs(1),
671 check_indexed_blocks_interval: Duration::from_secs(60),
672 },
673 SyncStrategy::Parachain,
674 Arc::new(test_sync_oracle),
675 pubsub_notification_sinks_inner,
676 )
677 .await
678 });
679
680 futures_timer::Delay::new(Duration::from_millis(1500)).await;
682
683 let db_logs = sqlx::query(
685 "SELECT
686 b.block_number,
687 address,
688 topic_1,
689 topic_2,
690 topic_3,
691 topic_4,
692 log_index,
693 transaction_index,
694 a.substrate_block_hash
695 FROM logs AS a INNER JOIN blocks AS b ON a.substrate_block_hash = b.substrate_block_hash
696 ORDER BY b.block_number ASC, log_index ASC, transaction_index ASC",
697 )
698 .fetch_all(&pool)
699 .await
700 .expect("test query result")
701 .iter()
702 .map(|row| {
703 let block_number = row.get::<i32, _>(0);
704 let address = row.get::<Vec<u8>, _>(1);
705 let topic_1 = row.get::<Option<Vec<u8>>, _>(2);
706 let topic_2 = row.get::<Option<Vec<u8>>, _>(3);
707 let topic_3 = row.get::<Option<Vec<u8>>, _>(4);
708 let topic_4 = row.get::<Option<Vec<u8>>, _>(5);
709 let log_index = row.get::<i32, _>(6);
710 let transaction_index = row.get::<i32, _>(7);
711 let substrate_block_hash = row.get::<Vec<u8>, _>(8);
712 (
713 block_number,
714 fc_db::sql::Log {
715 address,
716 topic_1,
717 topic_2,
718 topic_3,
719 topic_4,
720 log_index,
721 transaction_index,
722 substrate_block_hash,
723 },
724 )
725 })
726 .collect::<Vec<(i32, fc_db::sql::Log)>>();
727
728 assert_eq!(db_logs, logs);
733 }
734
735 #[tokio::test]
736 async fn notification_indexing_works() {
737 let tmp = tempdir().expect("create a temporary directory");
738 let builder = TestClientBuilder::new().add_extra_storage(
740 PALLET_ETHEREUM_SCHEMA.to_vec(),
741 Encode::encode(&EthereumStorageSchema::V3),
742 );
743 let backend = builder.backend();
745 let (client, _) =
747 builder.build_with_native_executor::<frontier_template_runtime::RuntimeApi, _>(None);
748 let client = Arc::new(client);
749 let storage_override = Arc::new(SchemaV3StorageOverride::new(client.clone()));
751 let indexer_backend = fc_db::sql::Backend::new(
753 fc_db::sql::BackendConfig::Sqlite(fc_db::sql::SqliteBackendConfig {
754 path: Path::new("sqlite:///")
755 .join(tmp.path())
756 .join("test.db3")
757 .to_str()
758 .unwrap(),
759 create_if_missing: true,
760 cache_size: 204800,
761 thread_count: 4,
762 }),
763 100,
764 None,
765 storage_override.clone(),
766 )
767 .await
768 .expect("indexer pool to be created");
769 let pool = indexer_backend.pool().clone();
771
772 let test_sync_oracle = TestSyncOracleNotSyncing {};
773 let pubsub_notification_sinks: EthereumBlockNotificationSinks<
774 EthereumBlockNotification<OpaqueBlock>,
775 > = Default::default();
776 let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
777
778 let pubsub_notification_sinks_inner = pubsub_notification_sinks.clone();
779
780 let notification_stream = client.clone().import_notification_stream();
784 let client_inner = client.clone();
785 tokio::task::spawn(async move {
786 crate::sql::SyncWorker::run(
787 client_inner,
788 backend.clone(),
789 Arc::new(indexer_backend),
790 notification_stream,
791 SyncWorkerConfig {
792 read_notification_timeout: Duration::from_secs(10),
793 check_indexed_blocks_interval: Duration::from_secs(60),
794 },
795 SyncStrategy::Parachain,
796 Arc::new(test_sync_oracle),
797 pubsub_notification_sinks_inner,
798 )
799 .await
800 });
801
802 let mut logs: Vec<(i32, fc_db::sql::Log)> = vec![];
804 for block_number in 1..11 {
805 let chain = client.chain_info();
807 let mut builder = BlockBuilderBuilder::new(&*client)
808 .on_parent_block(chain.best_hash)
809 .with_parent_block_number(chain.best_number)
810 .build()
811 .unwrap();
812 builder
813 .push_deposit_log_digest_item(ethereum_digest())
814 .expect("deposit log");
815 let address_1 = H160::random();
817 let address_2 = H160::random();
818 let topics_1_1 = H256::random();
820 let topics_1_2 = H256::random();
821 let topics_2_1 = H256::random();
822 let topics_2_2 = H256::random();
823 let topics_2_3 = H256::random();
824 let topics_2_4 = H256::random();
825
826 let receipts = Encode::encode(&vec![
827 ethereum::ReceiptV4::EIP1559(ethereum::EIP1559ReceiptData {
828 status_code: 0u8,
829 used_gas: U256::zero(),
830 logs_bloom: ethereum_types::Bloom::zero(),
831 logs: vec![ethereum::Log {
832 address: address_1,
833 topics: vec![topics_1_1, topics_1_2],
834 data: vec![],
835 }],
836 }),
837 ethereum::ReceiptV4::EIP1559(ethereum::EIP1559ReceiptData {
838 status_code: 0u8,
839 used_gas: U256::zero(),
840 logs_bloom: ethereum_types::Bloom::zero(),
841 logs: vec![ethereum::Log {
842 address: address_2,
843 topics: vec![topics_2_1, topics_2_2, topics_2_3, topics_2_4],
844 data: vec![],
845 }],
846 }),
847 ]);
848 builder
849 .push_storage_change(
850 storage_prefix_build(PALLET_ETHEREUM, ETHEREUM_CURRENT_RECEIPTS),
851 Some(receipts),
852 )
853 .unwrap();
854 let block = builder.build().unwrap().block;
855 let block_hash = block.header.hash();
856 executor::block_on(client.import(BlockOrigin::Own, block)).unwrap();
857 logs.push((
858 block_number,
859 fc_db::sql::Log {
860 address: address_1.as_bytes().to_owned(),
861 topic_1: Some(topics_1_1.as_bytes().to_owned()),
862 topic_2: Some(topics_1_2.as_bytes().to_owned()),
863 topic_3: None,
864 topic_4: None,
865 log_index: 0i32,
866 transaction_index: 0i32,
867 substrate_block_hash: block_hash.as_bytes().to_owned(),
868 },
869 ));
870 logs.push((
871 block_number,
872 fc_db::sql::Log {
873 address: address_2.as_bytes().to_owned(),
874 topic_1: Some(topics_2_1.as_bytes().to_owned()),
875 topic_2: Some(topics_2_2.as_bytes().to_owned()),
876 topic_3: Some(topics_2_3.as_bytes().to_owned()),
877 topic_4: Some(topics_2_4.as_bytes().to_owned()),
878 log_index: 0i32,
879 transaction_index: 1i32,
880 substrate_block_hash: block_hash.as_bytes().to_owned(),
881 },
882 ));
883 futures_timer::Delay::new(Duration::from_millis(100)).await;
885 }
886
887 let db_logs = sqlx::query(
889 "SELECT
890 b.block_number,
891 address,
892 topic_1,
893 topic_2,
894 topic_3,
895 topic_4,
896 log_index,
897 transaction_index,
898 a.substrate_block_hash
899 FROM logs AS a INNER JOIN blocks AS b ON a.substrate_block_hash = b.substrate_block_hash
900 ORDER BY b.block_number ASC, log_index ASC, transaction_index ASC",
901 )
902 .fetch_all(&pool)
903 .await
904 .expect("test query result")
905 .iter()
906 .map(|row| {
907 let block_number = row.get::<i32, _>(0);
908 let address = row.get::<Vec<u8>, _>(1);
909 let topic_1 = row.get::<Option<Vec<u8>>, _>(2);
910 let topic_2 = row.get::<Option<Vec<u8>>, _>(3);
911 let topic_3 = row.get::<Option<Vec<u8>>, _>(4);
912 let topic_4 = row.get::<Option<Vec<u8>>, _>(5);
913 let log_index = row.get::<i32, _>(6);
914 let transaction_index = row.get::<i32, _>(7);
915 let substrate_block_hash = row.get::<Vec<u8>, _>(8);
916 (
917 block_number,
918 fc_db::sql::Log {
919 address,
920 topic_1,
921 topic_2,
922 topic_3,
923 topic_4,
924 log_index,
925 transaction_index,
926 substrate_block_hash,
927 },
928 )
929 })
930 .collect::<Vec<(i32, fc_db::sql::Log)>>();
931
932 assert_eq!(db_logs, logs);
937 }
938
939 #[tokio::test]
940 async fn canonicalize_works() {
941 let tmp = tempdir().expect("create a temporary directory");
942 let builder = TestClientBuilder::new().add_extra_storage(
944 PALLET_ETHEREUM_SCHEMA.to_vec(),
945 Encode::encode(&EthereumStorageSchema::V3),
946 );
947 let backend = builder.backend();
949 let (client, _) =
951 builder.build_with_native_executor::<frontier_template_runtime::RuntimeApi, _>(None);
952 let client = Arc::new(client);
953 let storage_override = Arc::new(SchemaV3StorageOverride::new(client.clone()));
955 let indexer_backend = fc_db::sql::Backend::new(
957 fc_db::sql::BackendConfig::Sqlite(fc_db::sql::SqliteBackendConfig {
958 path: Path::new("sqlite:///")
959 .join(tmp.path())
960 .join("test.db3")
961 .to_str()
962 .unwrap(),
963 create_if_missing: true,
964 cache_size: 204800,
965 thread_count: 4,
966 }),
967 100,
968 None,
969 storage_override.clone(),
970 )
971 .await
972 .expect("indexer pool to be created");
973
974 let pool = indexer_backend.pool().clone();
976
977 let test_sync_oracle = TestSyncOracleNotSyncing {};
979 let pubsub_notification_sinks: EthereumBlockNotificationSinks<
980 EthereumBlockNotification<OpaqueBlock>,
981 > = Default::default();
982 let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
983
984 let pubsub_notification_sinks_inner = pubsub_notification_sinks.clone();
985
986 let notification_stream = client.clone().import_notification_stream();
987 let client_inner = client.clone();
988 tokio::task::spawn(async move {
989 crate::sql::SyncWorker::run(
990 client_inner,
991 backend.clone(),
992 Arc::new(indexer_backend),
993 notification_stream,
994 SyncWorkerConfig {
995 read_notification_timeout: Duration::from_secs(10),
996 check_indexed_blocks_interval: Duration::from_secs(60),
997 },
998 SyncStrategy::Parachain,
999 Arc::new(test_sync_oracle),
1000 pubsub_notification_sinks_inner,
1001 )
1002 .await
1003 });
1004
1005 let mut parent_hash = client
1007 .hash(sp_runtime::traits::Zero::zero())
1008 .unwrap()
1009 .expect("genesis hash");
1010 let mut common_ancestor = parent_hash;
1011 let mut hashes_to_be_orphaned: Vec<H256> = vec![];
1012 for block_number in 1..11 {
1013 let mut builder = BlockBuilderBuilder::new(&*client)
1015 .on_parent_block(parent_hash)
1016 .fetch_parent_block_number(&*client)
1017 .unwrap()
1018 .build()
1019 .unwrap();
1020 builder
1021 .push_deposit_log_digest_item(ethereum_digest())
1022 .expect("deposit log");
1023 let block = builder.build().unwrap().block;
1024 let block_hash = block.header.hash();
1025 executor::block_on(client.import(BlockOrigin::Own, block)).unwrap();
1026 if block_number == 8 {
1027 common_ancestor = block_hash;
1028 }
1029 if block_number == 9 || block_number == 10 {
1030 hashes_to_be_orphaned.push(block_hash);
1031 }
1032 parent_hash = block_hash;
1033 futures_timer::Delay::new(Duration::from_millis(100)).await;
1035 }
1036
1037 let mut res = sqlx::query("SELECT is_canon FROM blocks")
1039 .fetch_all(&pool)
1040 .await
1041 .expect("test query result")
1042 .iter()
1043 .map(|row| row.get::<i32, _>(0))
1044 .collect::<Vec<i32>>();
1045
1046 assert_eq!(res.len(), 10);
1047 res.dedup();
1048 assert_eq!(res.len(), 1);
1049
1050 parent_hash = common_ancestor;
1052 for _ in 1..11 {
1053 let mut builder = BlockBuilderBuilder::new(&*client)
1055 .on_parent_block(parent_hash)
1056 .fetch_parent_block_number(&*client)
1057 .unwrap()
1058 .build()
1059 .unwrap();
1060 builder
1061 .push_deposit_log_digest_item(ethereum_digest())
1062 .expect("deposit log");
1063 let block = builder.build().unwrap().block;
1064 let block_hash = block.header.hash();
1065 executor::block_on(client.import(BlockOrigin::Own, block)).unwrap();
1066 parent_hash = block_hash;
1067 futures_timer::Delay::new(Duration::from_millis(100)).await;
1069 }
1070
1071 let res = sqlx::query("SELECT substrate_block_hash, is_canon, block_number FROM blocks")
1073 .fetch_all(&pool)
1074 .await
1075 .expect("test query result")
1076 .iter()
1077 .map(|row| {
1078 let substrate_block_hash = H256::from_slice(&row.get::<Vec<u8>, _>(0)[..]);
1079 let is_canon = row.get::<i32, _>(1);
1080 let block_number = row.get::<i32, _>(2);
1081 (substrate_block_hash, is_canon, block_number)
1082 })
1083 .collect::<Vec<(H256, i32, i32)>>();
1084
1085 assert_eq!(res.len(), 20);
1087
1088 let canon = res
1090 .clone()
1091 .into_iter()
1092 .filter(|&it| it.1 == 1)
1093 .collect::<Vec<(H256, i32, i32)>>();
1094 assert_eq!(canon.len(), 18);
1095
1096 let not_canon = res
1098 .into_iter()
1099 .filter_map(|it| if it.1 == 0 { Some(it.0) } else { None })
1100 .collect::<Vec<H256>>();
1101 assert_eq!(not_canon.len(), hashes_to_be_orphaned.len());
1102 assert!(not_canon.iter().all(|h| hashes_to_be_orphaned.contains(h)));
1103 }
1104
1105 #[tokio::test]
1106 async fn resuming_from_last_indexed_block_works() {
1107 let tmp = tempdir().expect("create a temporary directory");
1108 let builder = TestClientBuilder::new().add_extra_storage(
1110 PALLET_ETHEREUM_SCHEMA.to_vec(),
1111 Encode::encode(&EthereumStorageSchema::V3),
1112 );
1113 let backend = builder.backend();
1115 let (client, _) =
1117 builder.build_with_native_executor::<frontier_template_runtime::RuntimeApi, _>(None);
1118 let client = Arc::new(client);
1119 let storage_override = Arc::new(SchemaV3StorageOverride::new(client.clone()));
1121 let indexer_backend = fc_db::sql::Backend::new(
1123 fc_db::sql::BackendConfig::Sqlite(fc_db::sql::SqliteBackendConfig {
1124 path: Path::new("sqlite:///")
1125 .join(tmp.path())
1126 .join("test.db3")
1127 .to_str()
1128 .unwrap(),
1129 create_if_missing: true,
1130 cache_size: 204800,
1131 thread_count: 4,
1132 }),
1133 100,
1134 None,
1135 storage_override.clone(),
1136 )
1137 .await
1138 .expect("indexer pool to be created");
1139
1140 let pool = indexer_backend.pool().clone();
1142
1143 let mut parent_hash = client
1145 .hash(sp_runtime::traits::Zero::zero())
1146 .unwrap()
1147 .expect("genesis hash");
1148 let mut best_block_hashes: Vec<H256> = vec![];
1149 for _block_number in 1..=5 {
1150 let mut builder = BlockBuilderBuilder::new(&*client)
1151 .on_parent_block(parent_hash)
1152 .fetch_parent_block_number(&*client)
1153 .unwrap()
1154 .build()
1155 .unwrap();
1156 builder
1157 .push_deposit_log_digest_item(ethereum_digest())
1158 .expect("deposit log");
1159 let block = builder.build().unwrap().block;
1160 let block_hash = block.header.hash();
1161 executor::block_on(client.import(BlockOrigin::Own, block)).unwrap();
1162 best_block_hashes.insert(0, block_hash);
1163 parent_hash = block_hash;
1164 }
1165
1166 let block_resume_at = best_block_hashes[0];
1168 sqlx::query("INSERT INTO blocks(substrate_block_hash, ethereum_block_hash, ethereum_storage_schema, block_number, is_canon) VALUES (?, ?, ?, 5, 1)")
1169 .bind(block_resume_at.as_bytes())
1170 .bind(H256::zero().as_bytes())
1171 .bind(H256::zero().as_bytes())
1172 .execute(&pool)
1173 .await
1174 .expect("sql query must succeed");
1175 sqlx::query("INSERT INTO sync_status(substrate_block_hash, status) VALUES (?, 1)")
1176 .bind(block_resume_at.as_bytes())
1177 .execute(&pool)
1178 .await
1179 .expect("sql query must succeed");
1180
1181 let test_sync_oracle = TestSyncOracleNotSyncing {};
1183 let pubsub_notification_sinks: EthereumBlockNotificationSinks<
1184 EthereumBlockNotification<OpaqueBlock>,
1185 > = Default::default();
1186 let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
1187
1188 let pubsub_notification_sinks_inner = pubsub_notification_sinks.clone();
1189
1190 let client_inner = client.clone();
1191 tokio::task::spawn(async move {
1192 crate::sql::SyncWorker::run(
1193 client_inner,
1194 backend.clone(),
1195 Arc::new(indexer_backend),
1196 client.clone().import_notification_stream(),
1197 SyncWorkerConfig {
1198 read_notification_timeout: Duration::from_secs(10),
1199 check_indexed_blocks_interval: Duration::from_secs(60),
1200 },
1201 SyncStrategy::Parachain,
1202 Arc::new(test_sync_oracle),
1203 pubsub_notification_sinks_inner,
1204 )
1205 .await
1206 });
1207 futures_timer::Delay::new(Duration::from_millis(1500)).await;
1209
1210 let actual_imported_blocks =
1212 sqlx::query("SELECT substrate_block_hash, is_canon, block_number FROM blocks")
1213 .fetch_all(&pool)
1214 .await
1215 .expect("test query result")
1216 .iter()
1217 .map(|row| H256::from_slice(&row.get::<Vec<u8>, _>(0)[..]))
1218 .collect::<Vec<H256>>();
1219 let expected_imported_blocks = best_block_hashes.clone();
1220 assert_eq!(expected_imported_blocks, actual_imported_blocks);
1221 }
1222
1223 struct TestSyncOracle {
1224 sync_status: Arc<Mutex<bool>>,
1225 }
1226 impl sp_consensus::SyncOracle for TestSyncOracle {
1227 fn is_major_syncing(&self) -> bool {
1228 *self.sync_status.lock().expect("failed getting lock")
1229 }
1230 fn is_offline(&self) -> bool {
1231 false
1232 }
1233 }
1234
1235 struct TestSyncOracleWrapper {
1236 oracle: Arc<TestSyncOracle>,
1237 sync_status: Arc<Mutex<bool>>,
1238 }
1239 impl TestSyncOracleWrapper {
1240 fn new() -> Self {
1241 let sync_status = Arc::new(Mutex::new(false));
1242 TestSyncOracleWrapper {
1243 oracle: Arc::new(TestSyncOracle {
1244 sync_status: sync_status.clone(),
1245 }),
1246 sync_status,
1247 }
1248 }
1249 fn set_sync_status(&mut self, value: bool) {
1250 *self.sync_status.lock().expect("failed getting lock") = value;
1251 }
1252 }
1253
1254 #[tokio::test]
1255 async fn sync_strategy_normal_indexes_best_blocks_if_not_major_sync() {
1256 let tmp = tempdir().expect("create a temporary directory");
1257 let builder = TestClientBuilder::new().add_extra_storage(
1258 PALLET_ETHEREUM_SCHEMA.to_vec(),
1259 Encode::encode(&EthereumStorageSchema::V3),
1260 );
1261 let backend = builder.backend();
1262 let (client, _) =
1263 builder.build_with_native_executor::<frontier_template_runtime::RuntimeApi, _>(None);
1264 let client = Arc::new(client);
1265 let storage_override = Arc::new(SchemaV3StorageOverride::new(client.clone()));
1266 let indexer_backend = fc_db::sql::Backend::new(
1267 fc_db::sql::BackendConfig::Sqlite(fc_db::sql::SqliteBackendConfig {
1268 path: Path::new("sqlite:///")
1269 .join(tmp.path())
1270 .join("test.db3")
1271 .to_str()
1272 .unwrap(),
1273 create_if_missing: true,
1274 cache_size: 204800,
1275 thread_count: 4,
1276 }),
1277 100,
1278 None,
1279 storage_override.clone(),
1280 )
1281 .await
1282 .expect("indexer pool to be created");
1283
1284 let pool = indexer_backend.pool().clone();
1286
1287 let pubsub_notification_sinks: crate::EthereumBlockNotificationSinks<
1289 crate::EthereumBlockNotification<OpaqueBlock>,
1290 > = Default::default();
1291 let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
1292 let mut sync_oracle_wrapper = TestSyncOracleWrapper::new();
1293 let sync_oracle = sync_oracle_wrapper.oracle.clone();
1294 let client_inner = client.clone();
1295 tokio::task::spawn(async move {
1296 crate::sql::SyncWorker::run(
1297 client_inner.clone(),
1298 backend.clone(),
1299 Arc::new(indexer_backend),
1300 client_inner.import_notification_stream(),
1301 SyncWorkerConfig {
1302 read_notification_timeout: Duration::from_secs(10),
1303 check_indexed_blocks_interval: Duration::from_secs(60),
1304 },
1305 SyncStrategy::Normal,
1306 Arc::new(sync_oracle),
1307 pubsub_notification_sinks.clone(),
1308 )
1309 .await
1310 });
1311 futures_timer::Delay::new(Duration::from_millis(200)).await;
1313
1314 sync_oracle_wrapper.set_sync_status(false);
1316 let mut parent_hash = client
1317 .hash(sp_runtime::traits::Zero::zero())
1318 .unwrap()
1319 .expect("genesis hash");
1320 let mut best_block_hashes: Vec<H256> = vec![];
1321 for _block_number in 1..=3 {
1322 let mut builder = BlockBuilderBuilder::new(&*client)
1323 .on_parent_block(parent_hash)
1324 .fetch_parent_block_number(&*client)
1325 .unwrap()
1326 .build()
1327 .unwrap();
1328 builder
1329 .push_deposit_log_digest_item(ethereum_digest())
1330 .expect("deposit log");
1331 let block = builder.build().unwrap().block;
1332 let block_hash = block.header.hash();
1333
1334 executor::block_on(client.import(BlockOrigin::Own, block)).unwrap();
1335 best_block_hashes.push(block_hash);
1336 parent_hash = block_hash;
1337 }
1338
1339 futures_timer::Delay::new(Duration::from_millis(3000)).await;
1341
1342 let actual_imported_blocks =
1344 sqlx::query("SELECT substrate_block_hash, is_canon, block_number FROM blocks")
1345 .fetch_all(&pool)
1346 .await
1347 .expect("test query result")
1348 .iter()
1349 .map(|row| H256::from_slice(&row.get::<Vec<u8>, _>(0)[..]))
1350 .collect::<Vec<H256>>();
1351 let expected_imported_blocks = best_block_hashes.clone();
1352 assert_eq!(expected_imported_blocks, actual_imported_blocks);
1353 }
1354
1355 #[tokio::test]
1356 async fn sync_strategy_normal_ignores_non_best_block_if_not_major_sync() {
1357 let tmp = tempdir().expect("create a temporary directory");
1358 let builder = TestClientBuilder::new().add_extra_storage(
1359 PALLET_ETHEREUM_SCHEMA.to_vec(),
1360 Encode::encode(&EthereumStorageSchema::V3),
1361 );
1362 let backend = builder.backend();
1363 let (client, _) =
1364 builder.build_with_native_executor::<frontier_template_runtime::RuntimeApi, _>(None);
1365 let client = Arc::new(client);
1366 let storage_override = Arc::new(SchemaV3StorageOverride::new(client.clone()));
1367 let indexer_backend = fc_db::sql::Backend::new(
1368 fc_db::sql::BackendConfig::Sqlite(fc_db::sql::SqliteBackendConfig {
1369 path: Path::new("sqlite:///")
1370 .join(tmp.path())
1371 .join("test.db3")
1372 .to_str()
1373 .unwrap(),
1374 create_if_missing: true,
1375 cache_size: 204800,
1376 thread_count: 4,
1377 }),
1378 100,
1379 None,
1380 storage_override.clone(),
1381 )
1382 .await
1383 .expect("indexer pool to be created");
1384
1385 let pool = indexer_backend.pool().clone();
1387
1388 let pubsub_notification_sinks: crate::EthereumBlockNotificationSinks<
1390 crate::EthereumBlockNotification<OpaqueBlock>,
1391 > = Default::default();
1392 let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
1393 let mut sync_oracle_wrapper = TestSyncOracleWrapper::new();
1394 let sync_oracle = sync_oracle_wrapper.oracle.clone();
1395 let client_inner = client.clone();
1396 tokio::task::spawn(async move {
1397 crate::sql::SyncWorker::run(
1398 client_inner.clone(),
1399 backend.clone(),
1400 Arc::new(indexer_backend),
1401 client_inner.import_notification_stream(),
1402 SyncWorkerConfig {
1403 read_notification_timeout: Duration::from_secs(10),
1404 check_indexed_blocks_interval: Duration::from_secs(60),
1405 },
1406 SyncStrategy::Normal,
1407 Arc::new(sync_oracle),
1408 pubsub_notification_sinks.clone(),
1409 )
1410 .await
1411 });
1412 futures_timer::Delay::new(Duration::from_millis(200)).await;
1414
1415 sync_oracle_wrapper.set_sync_status(false);
1417 let mut parent_hash = client
1418 .hash(sp_runtime::traits::Zero::zero())
1419 .unwrap()
1420 .expect("genesis hash");
1421 let mut best_block_hashes: Vec<H256> = vec![];
1422 for _block_number in 1..=3 {
1423 let mut builder = BlockBuilderBuilder::new(&*client)
1424 .on_parent_block(parent_hash)
1425 .fetch_parent_block_number(&*client)
1426 .unwrap()
1427 .build()
1428 .unwrap();
1429 builder
1430 .push_deposit_log_digest_item(ethereum_digest())
1431 .expect("deposit log");
1432 let block = builder.build().unwrap().block;
1433 let block_hash = block.header.hash();
1434
1435 executor::block_on(client.import(BlockOrigin::Own, block)).unwrap();
1436 best_block_hashes.push(block_hash);
1437 parent_hash = block_hash;
1438 }
1439
1440 let mut builder = BlockBuilderBuilder::new(&*client)
1442 .on_parent_block(best_block_hashes[0])
1443 .fetch_parent_block_number(&*client)
1444 .unwrap()
1445 .build()
1446 .unwrap();
1447 builder
1448 .push_deposit_log_digest_item(ethereum_digest())
1449 .expect("deposit log");
1450 let block = builder.build().unwrap().block;
1451
1452 executor::block_on(client.import(BlockOrigin::Own, block)).unwrap();
1453
1454 futures_timer::Delay::new(Duration::from_millis(3000)).await;
1456
1457 let actual_imported_blocks =
1459 sqlx::query("SELECT substrate_block_hash, is_canon, block_number FROM blocks")
1460 .fetch_all(&pool)
1461 .await
1462 .expect("test query result")
1463 .iter()
1464 .map(|row| H256::from_slice(&row.get::<Vec<u8>, _>(0)[..]))
1465 .collect::<Vec<H256>>();
1466 let expected_imported_blocks = best_block_hashes.clone();
1467 assert_eq!(expected_imported_blocks, actual_imported_blocks);
1468 }
1469
1470 #[tokio::test]
1471 async fn sync_strategy_parachain_indexes_best_blocks_if_not_major_sync() {
1472 let tmp = tempdir().expect("create a temporary directory");
1473 let builder = TestClientBuilder::new().add_extra_storage(
1474 PALLET_ETHEREUM_SCHEMA.to_vec(),
1475 Encode::encode(&EthereumStorageSchema::V3),
1476 );
1477 let backend = builder.backend();
1478 let (client, _) =
1479 builder.build_with_native_executor::<frontier_template_runtime::RuntimeApi, _>(None);
1480 let client = Arc::new(client);
1481 let storage_override = Arc::new(SchemaV3StorageOverride::new(client.clone()));
1482 let indexer_backend = fc_db::sql::Backend::new(
1483 fc_db::sql::BackendConfig::Sqlite(fc_db::sql::SqliteBackendConfig {
1484 path: Path::new("sqlite:///")
1485 .join(tmp.path())
1486 .join("test.db3")
1487 .to_str()
1488 .unwrap(),
1489 create_if_missing: true,
1490 cache_size: 204800,
1491 thread_count: 4,
1492 }),
1493 100,
1494 None,
1495 storage_override.clone(),
1496 )
1497 .await
1498 .expect("indexer pool to be created");
1499
1500 let pool = indexer_backend.pool().clone();
1502
1503 let pubsub_notification_sinks: crate::EthereumBlockNotificationSinks<
1505 crate::EthereumBlockNotification<OpaqueBlock>,
1506 > = Default::default();
1507 let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
1508 let mut sync_oracle_wrapper = TestSyncOracleWrapper::new();
1509 let sync_oracle = sync_oracle_wrapper.oracle.clone();
1510 let client_inner = client.clone();
1511 tokio::task::spawn(async move {
1512 crate::sql::SyncWorker::run(
1513 client_inner.clone(),
1514 backend.clone(),
1515 Arc::new(indexer_backend),
1516 client_inner.import_notification_stream(),
1517 SyncWorkerConfig {
1518 read_notification_timeout: Duration::from_secs(10),
1519 check_indexed_blocks_interval: Duration::from_secs(60),
1520 },
1521 SyncStrategy::Parachain,
1522 Arc::new(sync_oracle),
1523 pubsub_notification_sinks.clone(),
1524 )
1525 .await
1526 });
1527 futures_timer::Delay::new(Duration::from_millis(200)).await;
1529
1530 sync_oracle_wrapper.set_sync_status(false);
1532 let mut parent_hash = client
1533 .hash(sp_runtime::traits::Zero::zero())
1534 .unwrap()
1535 .expect("genesis hash");
1536 let mut best_block_hashes: Vec<H256> = vec![];
1537 for _block_number in 1..=3 {
1538 let mut builder = BlockBuilderBuilder::new(&*client)
1539 .on_parent_block(parent_hash)
1540 .fetch_parent_block_number(&*client)
1541 .unwrap()
1542 .build()
1543 .unwrap();
1544 builder
1545 .push_deposit_log_digest_item(ethereum_digest())
1546 .expect("deposit log");
1547 let block = builder.build().unwrap().block;
1548 let block_hash = block.header.hash();
1549
1550 executor::block_on(client.import(BlockOrigin::Own, block)).unwrap();
1551 best_block_hashes.push(block_hash);
1552 parent_hash = block_hash;
1553 }
1554
1555 futures_timer::Delay::new(Duration::from_millis(3000)).await;
1557
1558 let actual_imported_blocks =
1560 sqlx::query("SELECT substrate_block_hash, is_canon, block_number FROM blocks")
1561 .fetch_all(&pool)
1562 .await
1563 .expect("test query result")
1564 .iter()
1565 .map(|row| H256::from_slice(&row.get::<Vec<u8>, _>(0)[..]))
1566 .collect::<Vec<H256>>();
1567 let expected_imported_blocks = best_block_hashes.clone();
1568 assert_eq!(expected_imported_blocks, actual_imported_blocks);
1569 }
1570
1571 #[tokio::test]
1572 async fn sync_strategy_parachain_ignores_non_best_blocks_if_not_major_sync() {
1573 let tmp = tempdir().expect("create a temporary directory");
1574 let builder = TestClientBuilder::new().add_extra_storage(
1575 PALLET_ETHEREUM_SCHEMA.to_vec(),
1576 Encode::encode(&EthereumStorageSchema::V3),
1577 );
1578 let backend = builder.backend();
1579 let (client, _) =
1580 builder.build_with_native_executor::<frontier_template_runtime::RuntimeApi, _>(None);
1581 let client = Arc::new(client);
1582 let storage_override = Arc::new(SchemaV3StorageOverride::new(client.clone()));
1583 let indexer_backend = fc_db::sql::Backend::new(
1584 fc_db::sql::BackendConfig::Sqlite(fc_db::sql::SqliteBackendConfig {
1585 path: Path::new("sqlite:///")
1586 .join(tmp.path())
1587 .join("test.db3")
1588 .to_str()
1589 .unwrap(),
1590 create_if_missing: true,
1591 cache_size: 204800,
1592 thread_count: 4,
1593 }),
1594 100,
1595 None,
1596 storage_override.clone(),
1597 )
1598 .await
1599 .expect("indexer pool to be created");
1600
1601 let pool = indexer_backend.pool().clone();
1603
1604 let pubsub_notification_sinks: crate::EthereumBlockNotificationSinks<
1606 crate::EthereumBlockNotification<OpaqueBlock>,
1607 > = Default::default();
1608 let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
1609 let mut sync_oracle_wrapper = TestSyncOracleWrapper::new();
1610 let sync_oracle = sync_oracle_wrapper.oracle.clone();
1611 let client_inner = client.clone();
1612 tokio::task::spawn(async move {
1613 crate::sql::SyncWorker::run(
1614 client_inner.clone(),
1615 backend.clone(),
1616 Arc::new(indexer_backend),
1617 client_inner.import_notification_stream(),
1618 SyncWorkerConfig {
1619 read_notification_timeout: Duration::from_secs(10),
1620 check_indexed_blocks_interval: Duration::from_secs(60),
1621 },
1622 SyncStrategy::Parachain,
1623 Arc::new(sync_oracle),
1624 pubsub_notification_sinks.clone(),
1625 )
1626 .await
1627 });
1628 futures_timer::Delay::new(Duration::from_millis(200)).await;
1630
1631 sync_oracle_wrapper.set_sync_status(false);
1633 let mut parent_hash = client
1634 .hash(sp_runtime::traits::Zero::zero())
1635 .unwrap()
1636 .expect("genesis hash");
1637 let mut best_block_hashes: Vec<H256> = vec![];
1638 for _block_number in 1..=3 {
1639 let mut builder = BlockBuilderBuilder::new(&*client)
1640 .on_parent_block(parent_hash)
1641 .fetch_parent_block_number(&*client)
1642 .unwrap()
1643 .build()
1644 .unwrap();
1645 builder
1646 .push_deposit_log_digest_item(ethereum_digest())
1647 .expect("deposit log");
1648 let block = builder.build().unwrap().block;
1649 let block_hash = block.header.hash();
1650
1651 executor::block_on(client.import(BlockOrigin::Own, block)).unwrap();
1652 best_block_hashes.push(block_hash);
1653 parent_hash = block_hash;
1654 }
1655
1656 let mut builder = BlockBuilderBuilder::new(&*client)
1658 .on_parent_block(best_block_hashes[0])
1659 .fetch_parent_block_number(&*client)
1660 .unwrap()
1661 .build()
1662 .unwrap();
1663 builder
1664 .push_deposit_log_digest_item(ethereum_digest())
1665 .expect("deposit log");
1666 let block = builder.build().unwrap().block;
1667
1668 executor::block_on(client.import(BlockOrigin::Own, block)).unwrap();
1669
1670 futures_timer::Delay::new(Duration::from_millis(3000)).await;
1672
1673 let actual_imported_blocks =
1675 sqlx::query("SELECT substrate_block_hash, is_canon, block_number FROM blocks")
1676 .fetch_all(&pool)
1677 .await
1678 .expect("test query result")
1679 .iter()
1680 .map(|row| H256::from_slice(&row.get::<Vec<u8>, _>(0)[..]))
1681 .collect::<Vec<H256>>();
1682 let expected_imported_blocks = best_block_hashes.clone();
1683 assert_eq!(expected_imported_blocks, actual_imported_blocks);
1684 }
1685
1686 #[tokio::test]
1687 async fn sync_strategy_normal_ignores_best_blocks_if_major_sync() {
1688 let tmp = tempdir().expect("create a temporary directory");
1689 let builder = TestClientBuilder::new().add_extra_storage(
1690 PALLET_ETHEREUM_SCHEMA.to_vec(),
1691 Encode::encode(&EthereumStorageSchema::V3),
1692 );
1693 let backend = builder.backend();
1694 let (client, _) =
1695 builder.build_with_native_executor::<frontier_template_runtime::RuntimeApi, _>(None);
1696 let client = Arc::new(client);
1697 let storage_override = Arc::new(SchemaV3StorageOverride::new(client.clone()));
1698 let indexer_backend = fc_db::sql::Backend::new(
1699 fc_db::sql::BackendConfig::Sqlite(fc_db::sql::SqliteBackendConfig {
1700 path: Path::new("sqlite:///")
1701 .join(tmp.path())
1702 .join("test.db3")
1703 .to_str()
1704 .unwrap(),
1705 create_if_missing: true,
1706 cache_size: 204800,
1707 thread_count: 4,
1708 }),
1709 100,
1710 None,
1711 storage_override.clone(),
1712 )
1713 .await
1714 .expect("indexer pool to be created");
1715
1716 let pool = indexer_backend.pool().clone();
1718
1719 let pubsub_notification_sinks: crate::EthereumBlockNotificationSinks<
1721 crate::EthereumBlockNotification<OpaqueBlock>,
1722 > = Default::default();
1723 let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
1724 let mut sync_oracle_wrapper = TestSyncOracleWrapper::new();
1725 let sync_oracle = sync_oracle_wrapper.oracle.clone();
1726 let client_inner = client.clone();
1727 tokio::task::spawn(async move {
1728 crate::sql::SyncWorker::run(
1729 client_inner.clone(),
1730 backend.clone(),
1731 Arc::new(indexer_backend),
1732 client_inner.import_notification_stream(),
1733 SyncWorkerConfig {
1734 read_notification_timeout: Duration::from_secs(10),
1735 check_indexed_blocks_interval: Duration::from_secs(60),
1736 },
1737 SyncStrategy::Normal,
1738 Arc::new(sync_oracle),
1739 pubsub_notification_sinks.clone(),
1740 )
1741 .await
1742 });
1743 futures_timer::Delay::new(Duration::from_millis(200)).await;
1745
1746 sync_oracle_wrapper.set_sync_status(true);
1748 let mut parent_hash = client
1749 .hash(sp_runtime::traits::Zero::zero())
1750 .unwrap()
1751 .expect("genesis hash");
1752 let mut best_block_hashes: Vec<H256> = vec![];
1753 for _block_number in 1..=3 {
1754 let mut builder = BlockBuilderBuilder::new(&*client)
1755 .on_parent_block(parent_hash)
1756 .fetch_parent_block_number(&*client)
1757 .unwrap()
1758 .build()
1759 .unwrap();
1760 builder
1761 .push_deposit_log_digest_item(ethereum_digest())
1762 .expect("deposit log");
1763 let block = builder.build().unwrap().block;
1764 let block_hash = block.header.hash();
1765
1766 executor::block_on(client.import(BlockOrigin::NetworkInitialSync, block)).unwrap();
1767 best_block_hashes.push(block_hash);
1768 parent_hash = block_hash;
1769 }
1770
1771 futures_timer::Delay::new(Duration::from_millis(3000)).await;
1773
1774 let actual_imported_blocks =
1776 sqlx::query("SELECT substrate_block_hash, is_canon, block_number FROM blocks")
1777 .fetch_all(&pool)
1778 .await
1779 .expect("test query result")
1780 .iter()
1781 .map(|row| H256::from_slice(&row.get::<Vec<u8>, _>(0)[..]))
1782 .collect::<Vec<H256>>();
1783 let expected_imported_blocks = Vec::<H256>::new();
1784 assert_eq!(expected_imported_blocks, actual_imported_blocks);
1785 }
1786
1787 #[tokio::test]
1788 async fn sync_strategy_parachain_ignores_best_blocks_if_major_sync() {
1789 let tmp = tempdir().expect("create a temporary directory");
1790 let builder = TestClientBuilder::new().add_extra_storage(
1791 PALLET_ETHEREUM_SCHEMA.to_vec(),
1792 Encode::encode(&EthereumStorageSchema::V3),
1793 );
1794 let backend = builder.backend();
1795 let (client, _) =
1796 builder.build_with_native_executor::<frontier_template_runtime::RuntimeApi, _>(None);
1797 let client = Arc::new(client);
1798 let storage_override = Arc::new(SchemaV3StorageOverride::new(client.clone()));
1799 let indexer_backend = fc_db::sql::Backend::new(
1800 fc_db::sql::BackendConfig::Sqlite(fc_db::sql::SqliteBackendConfig {
1801 path: Path::new("sqlite:///")
1802 .join(tmp.path())
1803 .join("test.db3")
1804 .to_str()
1805 .unwrap(),
1806 create_if_missing: true,
1807 cache_size: 204800,
1808 thread_count: 4,
1809 }),
1810 100,
1811 None,
1812 storage_override.clone(),
1813 )
1814 .await
1815 .expect("indexer pool to be created");
1816
1817 let pool = indexer_backend.pool().clone();
1819
1820 let pubsub_notification_sinks: crate::EthereumBlockNotificationSinks<
1822 crate::EthereumBlockNotification<OpaqueBlock>,
1823 > = Default::default();
1824 let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
1825 let mut sync_oracle_wrapper = TestSyncOracleWrapper::new();
1826 let sync_oracle = sync_oracle_wrapper.oracle.clone();
1827 let client_inner = client.clone();
1828 tokio::task::spawn(async move {
1829 crate::sql::SyncWorker::run(
1830 client_inner.clone(),
1831 backend.clone(),
1832 Arc::new(indexer_backend),
1833 client_inner.import_notification_stream(),
1834 SyncWorkerConfig {
1835 read_notification_timeout: Duration::from_secs(10),
1836 check_indexed_blocks_interval: Duration::from_secs(60),
1837 },
1838 SyncStrategy::Parachain,
1839 Arc::new(sync_oracle),
1840 pubsub_notification_sinks.clone(),
1841 )
1842 .await
1843 });
1844 futures_timer::Delay::new(Duration::from_millis(200)).await;
1846
1847 sync_oracle_wrapper.set_sync_status(true);
1849 let mut parent_hash = client
1850 .hash(sp_runtime::traits::Zero::zero())
1851 .unwrap()
1852 .expect("genesis hash");
1853 let mut best_block_hashes: Vec<H256> = vec![];
1854 for _block_number in 1..=3 {
1855 let mut builder = BlockBuilderBuilder::new(&*client)
1856 .on_parent_block(parent_hash)
1857 .fetch_parent_block_number(&*client)
1858 .unwrap()
1859 .build()
1860 .unwrap();
1861 builder
1862 .push_deposit_log_digest_item(ethereum_digest())
1863 .expect("deposit log");
1864 let block = builder.build().unwrap().block;
1865 let block_hash = block.header.hash();
1866
1867 executor::block_on(client.import(BlockOrigin::NetworkInitialSync, block)).unwrap();
1868 best_block_hashes.push(block_hash);
1869 parent_hash = block_hash;
1870 }
1871
1872 futures_timer::Delay::new(Duration::from_millis(3000)).await;
1874
1875 let actual_imported_blocks =
1877 sqlx::query("SELECT substrate_block_hash, is_canon, block_number FROM blocks")
1878 .fetch_all(&pool)
1879 .await
1880 .expect("test query result")
1881 .iter()
1882 .map(|row| H256::from_slice(&row.get::<Vec<u8>, _>(0)[..]))
1883 .collect::<Vec<H256>>();
1884 let expected_imported_blocks = Vec::<H256>::new();
1885 assert_eq!(expected_imported_blocks, actual_imported_blocks);
1886 }
1887}