1use std::{pin::Pin, sync::Arc, time::Duration};
20
21use futures::{
22 prelude::*,
23 task::{Context, Poll},
24};
25use futures_timer::Delay;
26use log::debug;
27use sc_client_api::{
29 backend::{Backend, StorageProvider},
30 client::ImportNotifications,
31};
32use sp_api::ProvideRuntimeApi;
33use sp_blockchain::HeaderBackend;
34use sp_consensus::SyncOracle;
35use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
36use fc_storage::StorageOverride;
38use fp_rpc::EthereumRuntimeRPCApi;
39
40use crate::SyncStrategy;
41
42pub struct MappingSyncWorker<Block: BlockT, C, BE> {
43 import_notifications: ImportNotifications<Block>,
44 timeout: Duration,
45 inner_delay: Option<Delay>,
46
47 client: Arc<C>,
48 substrate_backend: Arc<BE>,
49 storage_override: Arc<dyn StorageOverride<Block>>,
50 frontier_backend: Arc<fc_db::kv::Backend<Block, C>>,
51
52 have_next: bool,
53 retry_times: usize,
54 sync_from: <Block::Header as HeaderT>::Number,
55 strategy: SyncStrategy,
56
57 sync_oracle: Arc<dyn SyncOracle + Send + Sync + 'static>,
58 pubsub_notification_sinks:
59 Arc<crate::EthereumBlockNotificationSinks<crate::EthereumBlockNotification<Block>>>,
60}
61
62impl<Block: BlockT, C, BE> Unpin for MappingSyncWorker<Block, C, BE> {}
63
64impl<Block: BlockT, C, BE> MappingSyncWorker<Block, C, BE> {
65 pub fn new(
66 import_notifications: ImportNotifications<Block>,
67 timeout: Duration,
68 client: Arc<C>,
69 substrate_backend: Arc<BE>,
70 storage_override: Arc<dyn StorageOverride<Block>>,
71 frontier_backend: Arc<fc_db::kv::Backend<Block, C>>,
72 retry_times: usize,
73 sync_from: <Block::Header as HeaderT>::Number,
74 strategy: SyncStrategy,
75 sync_oracle: Arc<dyn SyncOracle + Send + Sync + 'static>,
76 pubsub_notification_sinks: Arc<
77 crate::EthereumBlockNotificationSinks<crate::EthereumBlockNotification<Block>>,
78 >,
79 ) -> Self {
80 Self {
81 import_notifications,
82 timeout,
83 inner_delay: None,
84
85 client,
86 substrate_backend,
87 storage_override,
88 frontier_backend,
89
90 have_next: true,
91 retry_times,
92 sync_from,
93 strategy,
94
95 sync_oracle,
96 pubsub_notification_sinks,
97 }
98 }
99}
100
101impl<Block, C, BE> Stream for MappingSyncWorker<Block, C, BE>
102where
103 Block: BlockT,
104 C: ProvideRuntimeApi<Block>,
105 C::Api: EthereumRuntimeRPCApi<Block>,
106 C: HeaderBackend<Block> + StorageProvider<Block, BE>,
107 BE: Backend<Block>,
108{
109 type Item = ();
110
111 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<()>> {
112 let mut fire = false;
113
114 loop {
115 match Stream::poll_next(Pin::new(&mut self.import_notifications), cx) {
116 Poll::Pending => break,
117 Poll::Ready(Some(_)) => {
118 fire = true;
119 }
120 Poll::Ready(None) => return Poll::Ready(None),
121 }
122 }
123
124 let timeout = self.timeout;
125 let inner_delay = self.inner_delay.get_or_insert_with(|| Delay::new(timeout));
126
127 match Future::poll(Pin::new(inner_delay), cx) {
128 Poll::Pending => (),
129 Poll::Ready(()) => {
130 fire = true;
131 }
132 }
133
134 if self.have_next {
135 fire = true;
136 }
137
138 if fire {
139 self.inner_delay = None;
140
141 match crate::kv::sync_blocks(
142 self.client.as_ref(),
143 self.substrate_backend.as_ref(),
144 self.storage_override.clone(),
145 self.frontier_backend.as_ref(),
146 self.retry_times,
147 self.sync_from,
148 self.strategy,
149 self.sync_oracle.clone(),
150 self.pubsub_notification_sinks.clone(),
151 ) {
152 Ok(have_next) => {
153 self.have_next = have_next;
154 Poll::Ready(Some(()))
155 }
156 Err(e) => {
157 self.have_next = false;
158 debug!(target: "mapping-sync", "Syncing failed with error {e:?}, retrying.");
159 Poll::Ready(Some(()))
160 }
161 }
162 } else {
163 Poll::Pending
164 }
165 }
166}
167
168#[cfg(test)]
169mod tests {
170 use super::*;
171 use crate::{EthereumBlockNotification, EthereumBlockNotificationSinks};
172 use fc_storage::SchemaV3StorageOverride;
173 use fp_storage::{EthereumStorageSchema, PALLET_ETHEREUM_SCHEMA};
174 use sc_block_builder::BlockBuilderBuilder;
175 use sc_client_api::BlockchainEvents;
176 use scale_codec::Encode;
177 use sp_consensus::BlockOrigin;
178 use sp_core::{H160, H256, U256};
179 use sp_runtime::{generic::Header, traits::BlakeTwo256, Digest};
180 use substrate_test_runtime_client::{
181 ClientBlockImportExt, DefaultTestClientBuilderExt, TestClientBuilder, TestClientBuilderExt,
182 };
183 use tempfile::tempdir;
184
185 type OpaqueBlock = sp_runtime::generic::Block<
186 Header<u64, BlakeTwo256>,
187 substrate_test_runtime_client::runtime::Extrinsic,
188 >;
189
190 fn ethereum_digest() -> Digest {
191 let partial_header = ethereum::PartialHeader {
192 parent_hash: H256::random(),
193 beneficiary: H160::default(),
194 state_root: H256::default(),
195 receipts_root: H256::default(),
196 logs_bloom: ethereum_types::Bloom::default(),
197 difficulty: U256::zero(),
198 number: U256::zero(),
199 gas_limit: U256::zero(),
200 gas_used: U256::zero(),
201 timestamp: 0u64,
202 extra_data: Vec::new(),
203 mix_hash: H256::default(),
204 nonce: ethereum_types::H64::default(),
205 };
206 let ethereum_block = ethereum::Block::new(partial_header, vec![], vec![]);
207 Digest {
208 logs: vec![sp_runtime::generic::DigestItem::Consensus(
209 fp_consensus::FRONTIER_ENGINE_ID,
210 fp_consensus::PostLog::Hashes(fp_consensus::Hashes::from_block(ethereum_block))
211 .encode(),
212 )],
213 }
214 }
215
216 struct TestSyncOracleNotSyncing;
217 impl sp_consensus::SyncOracle for TestSyncOracleNotSyncing {
218 fn is_major_syncing(&self) -> bool {
219 false
220 }
221 fn is_offline(&self) -> bool {
222 false
223 }
224 }
225
226 struct TestSyncOracleSyncing;
227 impl sp_consensus::SyncOracle for TestSyncOracleSyncing {
228 fn is_major_syncing(&self) -> bool {
229 true
230 }
231 fn is_offline(&self) -> bool {
232 false
233 }
234 }
235
236 #[tokio::test]
237 async fn block_import_notification_works() {
238 let tmp = tempdir().expect("create a temporary directory");
239 let builder = TestClientBuilder::new().add_extra_storage(
240 PALLET_ETHEREUM_SCHEMA.to_vec(),
241 Encode::encode(&EthereumStorageSchema::V3),
242 );
243 let test_sync_oracle = TestSyncOracleNotSyncing {};
244 let backend = builder.backend();
246 let (client, _) =
248 builder.build_with_native_executor::<frontier_template_runtime::RuntimeApi, _>(None);
249 let client = Arc::new(client);
250 let storage_override = Arc::new(SchemaV3StorageOverride::new(client.clone()));
252
253 let frontier_backend = Arc::new(
254 fc_db::kv::Backend::<OpaqueBlock, _>::new(
255 client.clone(),
256 &fc_db::kv::DatabaseSettings {
257 source: sc_client_db::DatabaseSource::RocksDb {
258 path: tmp.path().to_path_buf(),
259 cache_size: 0,
260 },
261 },
262 )
263 .expect("frontier backend"),
264 );
265
266 let notification_stream = client.clone().import_notification_stream();
267 let client_inner = client.clone();
268
269 let pubsub_notification_sinks: EthereumBlockNotificationSinks<
270 EthereumBlockNotification<OpaqueBlock>,
271 > = Default::default();
272 let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
273
274 let pubsub_notification_sinks_inner = pubsub_notification_sinks.clone();
275
276 tokio::task::spawn(async move {
277 MappingSyncWorker::new(
278 notification_stream,
279 Duration::new(6, 0),
280 client_inner,
281 backend,
282 storage_override.clone(),
283 frontier_backend,
284 3,
285 0,
286 SyncStrategy::Normal,
287 Arc::new(test_sync_oracle),
288 pubsub_notification_sinks_inner,
289 )
290 .for_each(|()| future::ready(()))
291 .await
292 });
293
294 {
295 let (inner_sink, mut block_notification_stream) =
297 sc_utils::mpsc::tracing_unbounded("pubsub_notification_stream", 100_000);
298
299 {
300 let sinks = &mut pubsub_notification_sinks.lock();
303 sinks.push(inner_sink);
305 }
306
307 let chain_info = client.chain_info();
309 let builder = BlockBuilderBuilder::new(&*client)
310 .on_parent_block(chain_info.best_hash)
311 .with_parent_block_number(chain_info.best_number)
312 .with_inherent_digests(ethereum_digest())
313 .build()
314 .unwrap();
315 let block = builder.build().unwrap().block;
316 let block_hash = block.header.hash();
317 let _res = client.import(BlockOrigin::Own, block).await;
318
319 assert_eq!(
321 block_notification_stream
322 .next()
323 .await
324 .expect("a message")
325 .hash,
326 block_hash
327 );
328 }
329
330 {
331 let sinks = pubsub_notification_sinks.lock();
333 assert_eq!(sinks.len(), 1);
334 }
335
336 {
337 let (inner_sink, mut block_notification_stream) =
339 sc_utils::mpsc::tracing_unbounded("pubsub_notification_stream", 100_000);
340
341 {
342 let sinks = &mut pubsub_notification_sinks.lock();
343 sinks.push(inner_sink);
345 assert_eq!(sinks.len(), 2);
347 }
348
349 let chain_info = client.chain_info();
352 let builder = BlockBuilderBuilder::new(&*client)
353 .on_parent_block(chain_info.best_hash)
354 .with_parent_block_number(chain_info.best_number)
355 .with_inherent_digests(ethereum_digest())
356 .build()
357 .unwrap();
358 let block = builder.build().unwrap().block;
359 let block_hash = block.header.hash();
360 let _res = client.import(BlockOrigin::Own, block).await;
361
362 assert_eq!(
364 block_notification_stream
365 .next()
366 .await
367 .expect("a message")
368 .hash,
369 block_hash
370 );
371
372 let sinks = &mut pubsub_notification_sinks.lock();
374 assert_eq!(sinks.len(), 1);
375 }
376 }
377
378 #[tokio::test]
379 async fn sink_removal_when_syncing_works() {
380 let tmp = tempdir().expect("create a temporary directory");
381 let builder = TestClientBuilder::new().add_extra_storage(
382 PALLET_ETHEREUM_SCHEMA.to_vec(),
383 Encode::encode(&EthereumStorageSchema::V3),
384 );
385 let test_sync_oracle = TestSyncOracleSyncing {};
386 let backend = builder.backend();
388 let (client, _) =
390 builder.build_with_native_executor::<frontier_template_runtime::RuntimeApi, _>(None);
391 let client = Arc::new(client);
392 let storage_override = Arc::new(SchemaV3StorageOverride::new(client.clone()));
394
395 let frontier_backend = Arc::new(
396 fc_db::kv::Backend::<OpaqueBlock, _>::new(
397 client.clone(),
398 &fc_db::kv::DatabaseSettings {
399 source: sc_client_db::DatabaseSource::RocksDb {
400 path: tmp.path().to_path_buf(),
401 cache_size: 0,
402 },
403 },
404 )
405 .expect("frontier backend"),
406 );
407
408 let notification_stream = client.clone().import_notification_stream();
409 let client_inner = client.clone();
410
411 let pubsub_notification_sinks: EthereumBlockNotificationSinks<
412 EthereumBlockNotification<OpaqueBlock>,
413 > = Default::default();
414 let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
415
416 let pubsub_notification_sinks_inner = pubsub_notification_sinks.clone();
417
418 tokio::task::spawn(async move {
419 MappingSyncWorker::new(
420 notification_stream,
421 Duration::new(6, 0),
422 client_inner,
423 backend,
424 storage_override.clone(),
425 frontier_backend,
426 3,
427 0,
428 SyncStrategy::Normal,
429 Arc::new(test_sync_oracle),
430 pubsub_notification_sinks_inner,
431 )
432 .for_each(|()| future::ready(()))
433 .await
434 });
435
436 {
437 let (inner_sink, mut block_notification_stream) =
439 sc_utils::mpsc::tracing_unbounded("pubsub_notification_stream", 100_000);
440
441 {
442 let sinks = &mut pubsub_notification_sinks.lock();
445 sinks.push(inner_sink);
447 }
448
449 let chain_info = client.chain_info();
451 let builder = BlockBuilderBuilder::new(&*client)
452 .on_parent_block(chain_info.best_hash)
453 .with_parent_block_number(chain_info.best_number)
454 .with_inherent_digests(ethereum_digest())
455 .build()
456 .unwrap();
457 let block = builder.build().unwrap().block;
458 let _res = client.import(BlockOrigin::Own, block).await;
459
460 assert!(block_notification_stream.next().await.is_none());
462 }
463
464 {
465 let sinks = pubsub_notification_sinks.lock();
467 assert_eq!(sinks.len(), 0);
468 }
469 }
470}