fc_mapping_sync/kv/
worker.rs

1// This file is part of Frontier.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use std::{pin::Pin, sync::Arc, time::Duration};
20
21use futures::{
22	prelude::*,
23	task::{Context, Poll},
24};
25use futures_timer::Delay;
26use log::debug;
27// Substrate
28use sc_client_api::{
29	backend::{Backend, StorageProvider},
30	client::ImportNotifications,
31};
32use sp_api::ProvideRuntimeApi;
33use sp_blockchain::HeaderBackend;
34use sp_consensus::SyncOracle;
35use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
36// Frontier
37use fc_storage::StorageOverride;
38use fp_rpc::EthereumRuntimeRPCApi;
39
40use crate::SyncStrategy;
41
42pub struct MappingSyncWorker<Block: BlockT, C, BE> {
43	import_notifications: ImportNotifications<Block>,
44	timeout: Duration,
45	inner_delay: Option<Delay>,
46
47	client: Arc<C>,
48	substrate_backend: Arc<BE>,
49	storage_override: Arc<dyn StorageOverride<Block>>,
50	frontier_backend: Arc<fc_db::kv::Backend<Block, C>>,
51
52	have_next: bool,
53	retry_times: usize,
54	sync_from: <Block::Header as HeaderT>::Number,
55	strategy: SyncStrategy,
56
57	sync_oracle: Arc<dyn SyncOracle + Send + Sync + 'static>,
58	pubsub_notification_sinks:
59		Arc<crate::EthereumBlockNotificationSinks<crate::EthereumBlockNotification<Block>>>,
60}
61
62impl<Block: BlockT, C, BE> Unpin for MappingSyncWorker<Block, C, BE> {}
63
64impl<Block: BlockT, C, BE> MappingSyncWorker<Block, C, BE> {
65	pub fn new(
66		import_notifications: ImportNotifications<Block>,
67		timeout: Duration,
68		client: Arc<C>,
69		substrate_backend: Arc<BE>,
70		storage_override: Arc<dyn StorageOverride<Block>>,
71		frontier_backend: Arc<fc_db::kv::Backend<Block, C>>,
72		retry_times: usize,
73		sync_from: <Block::Header as HeaderT>::Number,
74		strategy: SyncStrategy,
75		sync_oracle: Arc<dyn SyncOracle + Send + Sync + 'static>,
76		pubsub_notification_sinks: Arc<
77			crate::EthereumBlockNotificationSinks<crate::EthereumBlockNotification<Block>>,
78		>,
79	) -> Self {
80		Self {
81			import_notifications,
82			timeout,
83			inner_delay: None,
84
85			client,
86			substrate_backend,
87			storage_override,
88			frontier_backend,
89
90			have_next: true,
91			retry_times,
92			sync_from,
93			strategy,
94
95			sync_oracle,
96			pubsub_notification_sinks,
97		}
98	}
99}
100
101impl<Block, C, BE> Stream for MappingSyncWorker<Block, C, BE>
102where
103	Block: BlockT,
104	C: ProvideRuntimeApi<Block>,
105	C::Api: EthereumRuntimeRPCApi<Block>,
106	C: HeaderBackend<Block> + StorageProvider<Block, BE>,
107	BE: Backend<Block>,
108{
109	type Item = ();
110
111	fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<()>> {
112		let mut fire = false;
113
114		loop {
115			match Stream::poll_next(Pin::new(&mut self.import_notifications), cx) {
116				Poll::Pending => break,
117				Poll::Ready(Some(_)) => {
118					fire = true;
119				}
120				Poll::Ready(None) => return Poll::Ready(None),
121			}
122		}
123
124		let timeout = self.timeout;
125		let inner_delay = self.inner_delay.get_or_insert_with(|| Delay::new(timeout));
126
127		match Future::poll(Pin::new(inner_delay), cx) {
128			Poll::Pending => (),
129			Poll::Ready(()) => {
130				fire = true;
131			}
132		}
133
134		if self.have_next {
135			fire = true;
136		}
137
138		if fire {
139			self.inner_delay = None;
140
141			match crate::kv::sync_blocks(
142				self.client.as_ref(),
143				self.substrate_backend.as_ref(),
144				self.storage_override.clone(),
145				self.frontier_backend.as_ref(),
146				self.retry_times,
147				self.sync_from,
148				self.strategy,
149				self.sync_oracle.clone(),
150				self.pubsub_notification_sinks.clone(),
151			) {
152				Ok(have_next) => {
153					self.have_next = have_next;
154					Poll::Ready(Some(()))
155				}
156				Err(e) => {
157					self.have_next = false;
158					debug!(target: "mapping-sync", "Syncing failed with error {e:?}, retrying.");
159					Poll::Ready(Some(()))
160				}
161			}
162		} else {
163			Poll::Pending
164		}
165	}
166}
167
168#[cfg(test)]
169mod tests {
170	use super::*;
171	use crate::{EthereumBlockNotification, EthereumBlockNotificationSinks};
172	use fc_storage::SchemaV3StorageOverride;
173	use fp_storage::{EthereumStorageSchema, PALLET_ETHEREUM_SCHEMA};
174	use sc_block_builder::BlockBuilderBuilder;
175	use sc_client_api::BlockchainEvents;
176	use scale_codec::Encode;
177	use sp_consensus::BlockOrigin;
178	use sp_core::{H160, H256, U256};
179	use sp_runtime::{generic::Header, traits::BlakeTwo256, Digest};
180	use substrate_test_runtime_client::{
181		ClientBlockImportExt, DefaultTestClientBuilderExt, TestClientBuilder, TestClientBuilderExt,
182	};
183	use tempfile::tempdir;
184
185	type OpaqueBlock = sp_runtime::generic::Block<
186		Header<u64, BlakeTwo256>,
187		substrate_test_runtime_client::runtime::Extrinsic,
188	>;
189
190	fn ethereum_digest() -> Digest {
191		let partial_header = ethereum::PartialHeader {
192			parent_hash: H256::random(),
193			beneficiary: H160::default(),
194			state_root: H256::default(),
195			receipts_root: H256::default(),
196			logs_bloom: ethereum_types::Bloom::default(),
197			difficulty: U256::zero(),
198			number: U256::zero(),
199			gas_limit: U256::zero(),
200			gas_used: U256::zero(),
201			timestamp: 0u64,
202			extra_data: Vec::new(),
203			mix_hash: H256::default(),
204			nonce: ethereum_types::H64::default(),
205		};
206		let ethereum_block = ethereum::Block::new(partial_header, vec![], vec![]);
207		Digest {
208			logs: vec![sp_runtime::generic::DigestItem::Consensus(
209				fp_consensus::FRONTIER_ENGINE_ID,
210				fp_consensus::PostLog::Hashes(fp_consensus::Hashes::from_block(ethereum_block))
211					.encode(),
212			)],
213		}
214	}
215
216	struct TestSyncOracleNotSyncing;
217	impl sp_consensus::SyncOracle for TestSyncOracleNotSyncing {
218		fn is_major_syncing(&self) -> bool {
219			false
220		}
221		fn is_offline(&self) -> bool {
222			false
223		}
224	}
225
226	struct TestSyncOracleSyncing;
227	impl sp_consensus::SyncOracle for TestSyncOracleSyncing {
228		fn is_major_syncing(&self) -> bool {
229			true
230		}
231		fn is_offline(&self) -> bool {
232			false
233		}
234	}
235
236	#[tokio::test]
237	async fn block_import_notification_works() {
238		let tmp = tempdir().expect("create a temporary directory");
239		let builder = TestClientBuilder::new().add_extra_storage(
240			PALLET_ETHEREUM_SCHEMA.to_vec(),
241			Encode::encode(&EthereumStorageSchema::V3),
242		);
243		let test_sync_oracle = TestSyncOracleNotSyncing {};
244		// Backend
245		let backend = builder.backend();
246		// Client
247		let (client, _) =
248			builder.build_with_native_executor::<frontier_template_runtime::RuntimeApi, _>(None);
249		let client = Arc::new(client);
250		// Overrides
251		let storage_override = Arc::new(SchemaV3StorageOverride::new(client.clone()));
252
253		let frontier_backend = Arc::new(
254			fc_db::kv::Backend::<OpaqueBlock, _>::new(
255				client.clone(),
256				&fc_db::kv::DatabaseSettings {
257					source: sc_client_db::DatabaseSource::RocksDb {
258						path: tmp.path().to_path_buf(),
259						cache_size: 0,
260					},
261				},
262			)
263			.expect("frontier backend"),
264		);
265
266		let notification_stream = client.clone().import_notification_stream();
267		let client_inner = client.clone();
268
269		let pubsub_notification_sinks: EthereumBlockNotificationSinks<
270			EthereumBlockNotification<OpaqueBlock>,
271		> = Default::default();
272		let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
273
274		let pubsub_notification_sinks_inner = pubsub_notification_sinks.clone();
275
276		tokio::task::spawn(async move {
277			MappingSyncWorker::new(
278				notification_stream,
279				Duration::new(6, 0),
280				client_inner,
281				backend,
282				storage_override.clone(),
283				frontier_backend,
284				3,
285				0,
286				SyncStrategy::Normal,
287				Arc::new(test_sync_oracle),
288				pubsub_notification_sinks_inner,
289			)
290			.for_each(|()| future::ready(()))
291			.await
292		});
293
294		{
295			// A new mpsc channel
296			let (inner_sink, mut block_notification_stream) =
297				sc_utils::mpsc::tracing_unbounded("pubsub_notification_stream", 100_000);
298
299			{
300				// This scope represents a call to eth_subscribe, where it briefly locks the pool
301				// to push the new sink.
302				let sinks = &mut pubsub_notification_sinks.lock();
303				// Push to sink pool
304				sinks.push(inner_sink);
305			}
306
307			// Let's produce a block, which we expect to trigger a channel message
308			let chain_info = client.chain_info();
309			let builder = BlockBuilderBuilder::new(&*client)
310				.on_parent_block(chain_info.best_hash)
311				.with_parent_block_number(chain_info.best_number)
312				.with_inherent_digests(ethereum_digest())
313				.build()
314				.unwrap();
315			let block = builder.build().unwrap().block;
316			let block_hash = block.header.hash();
317			let _res = client.import(BlockOrigin::Own, block).await;
318
319			// Receive
320			assert_eq!(
321				block_notification_stream
322					.next()
323					.await
324					.expect("a message")
325					.hash,
326				block_hash
327			);
328		}
329
330		{
331			// Assert we still hold a sink in the pool after switching scopes
332			let sinks = pubsub_notification_sinks.lock();
333			assert_eq!(sinks.len(), 1);
334		}
335
336		{
337			// Create yet another mpsc channel
338			let (inner_sink, mut block_notification_stream) =
339				sc_utils::mpsc::tracing_unbounded("pubsub_notification_stream", 100_000);
340
341			{
342				let sinks = &mut pubsub_notification_sinks.lock();
343				// Push it
344				sinks.push(inner_sink);
345				// Now we expect two sinks in the pool
346				assert_eq!(sinks.len(), 2);
347			}
348
349			// Let's produce another block, this not only triggers a message in the new channel
350			// but also removes the closed channels from the pool.
351			let chain_info = client.chain_info();
352			let builder = BlockBuilderBuilder::new(&*client)
353				.on_parent_block(chain_info.best_hash)
354				.with_parent_block_number(chain_info.best_number)
355				.with_inherent_digests(ethereum_digest())
356				.build()
357				.unwrap();
358			let block = builder.build().unwrap().block;
359			let block_hash = block.header.hash();
360			let _res = client.import(BlockOrigin::Own, block).await;
361
362			// Receive
363			assert_eq!(
364				block_notification_stream
365					.next()
366					.await
367					.expect("a message")
368					.hash,
369				block_hash
370			);
371
372			// So we expect the pool to hold one sink only after cleanup
373			let sinks = &mut pubsub_notification_sinks.lock();
374			assert_eq!(sinks.len(), 1);
375		}
376	}
377
378	#[tokio::test]
379	async fn sink_removal_when_syncing_works() {
380		let tmp = tempdir().expect("create a temporary directory");
381		let builder = TestClientBuilder::new().add_extra_storage(
382			PALLET_ETHEREUM_SCHEMA.to_vec(),
383			Encode::encode(&EthereumStorageSchema::V3),
384		);
385		let test_sync_oracle = TestSyncOracleSyncing {};
386		// Backend
387		let backend = builder.backend();
388		// Client
389		let (client, _) =
390			builder.build_with_native_executor::<frontier_template_runtime::RuntimeApi, _>(None);
391		let client = Arc::new(client);
392		// Overrides
393		let storage_override = Arc::new(SchemaV3StorageOverride::new(client.clone()));
394
395		let frontier_backend = Arc::new(
396			fc_db::kv::Backend::<OpaqueBlock, _>::new(
397				client.clone(),
398				&fc_db::kv::DatabaseSettings {
399					source: sc_client_db::DatabaseSource::RocksDb {
400						path: tmp.path().to_path_buf(),
401						cache_size: 0,
402					},
403				},
404			)
405			.expect("frontier backend"),
406		);
407
408		let notification_stream = client.clone().import_notification_stream();
409		let client_inner = client.clone();
410
411		let pubsub_notification_sinks: EthereumBlockNotificationSinks<
412			EthereumBlockNotification<OpaqueBlock>,
413		> = Default::default();
414		let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
415
416		let pubsub_notification_sinks_inner = pubsub_notification_sinks.clone();
417
418		tokio::task::spawn(async move {
419			MappingSyncWorker::new(
420				notification_stream,
421				Duration::new(6, 0),
422				client_inner,
423				backend,
424				storage_override.clone(),
425				frontier_backend,
426				3,
427				0,
428				SyncStrategy::Normal,
429				Arc::new(test_sync_oracle),
430				pubsub_notification_sinks_inner,
431			)
432			.for_each(|()| future::ready(()))
433			.await
434		});
435
436		{
437			// A new mpsc channel
438			let (inner_sink, mut block_notification_stream) =
439				sc_utils::mpsc::tracing_unbounded("pubsub_notification_stream", 100_000);
440
441			{
442				// This scope represents a call to eth_subscribe, where it briefly locks the pool
443				// to push the new sink.
444				let sinks = &mut pubsub_notification_sinks.lock();
445				// Push to sink pool
446				sinks.push(inner_sink);
447			}
448
449			// Let's produce a block, which we expect to trigger a channel message
450			let chain_info = client.chain_info();
451			let builder = BlockBuilderBuilder::new(&*client)
452				.on_parent_block(chain_info.best_hash)
453				.with_parent_block_number(chain_info.best_number)
454				.with_inherent_digests(ethereum_digest())
455				.build()
456				.unwrap();
457			let block = builder.build().unwrap().block;
458			let _res = client.import(BlockOrigin::Own, block).await;
459
460			// Not received, channel closed because major syncing
461			assert!(block_notification_stream.next().await.is_none());
462		}
463
464		{
465			// Assert sink was removed from pool on major syncing
466			let sinks = pubsub_notification_sinks.lock();
467			assert_eq!(sinks.len(), 0);
468		}
469	}
470}