fc_mapping_sync/kv/
mod.rs

1// This file is part of Frontier.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19#![allow(clippy::too_many_arguments)]
20
21mod worker;
22
23pub use worker::MappingSyncWorker;
24
25use std::sync::Arc;
26
27// Substrate
28use sc_client_api::backend::{Backend, StorageProvider};
29use sp_api::{ApiExt, ProvideRuntimeApi};
30use sp_blockchain::{Backend as _, HeaderBackend};
31use sp_consensus::SyncOracle;
32use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Zero};
33// Frontier
34use fc_storage::StorageOverride;
35use fp_consensus::{FindLogError, Hashes, Log, PostLog, PreLog};
36use fp_rpc::EthereumRuntimeRPCApi;
37
38use crate::{EthereumBlockNotification, EthereumBlockNotificationSinks, SyncStrategy};
39
40pub fn sync_block<Block: BlockT, C: HeaderBackend<Block>>(
41	storage_override: Arc<dyn StorageOverride<Block>>,
42	backend: &fc_db::kv::Backend<Block, C>,
43	header: &Block::Header,
44) -> Result<(), String> {
45	let substrate_block_hash = header.hash();
46	match fp_consensus::find_log(header.digest()) {
47		Ok(log) => {
48			let gen_from_hashes = |hashes: Hashes| -> fc_db::kv::MappingCommitment<Block> {
49				fc_db::kv::MappingCommitment {
50					block_hash: substrate_block_hash,
51					ethereum_block_hash: hashes.block_hash,
52					ethereum_transaction_hashes: hashes.transaction_hashes,
53				}
54			};
55			let gen_from_block = |block| -> fc_db::kv::MappingCommitment<Block> {
56				let hashes = Hashes::from_block(block);
57				gen_from_hashes(hashes)
58			};
59
60			match log {
61				Log::Pre(PreLog::Block(block)) => {
62					let mapping_commitment = gen_from_block(block);
63					backend.mapping().write_hashes(mapping_commitment)
64				}
65				Log::Post(post_log) => match post_log {
66					PostLog::Hashes(hashes) => {
67						let mapping_commitment = gen_from_hashes(hashes);
68						backend.mapping().write_hashes(mapping_commitment)
69					}
70					PostLog::Block(block) => {
71						let mapping_commitment = gen_from_block(block);
72						backend.mapping().write_hashes(mapping_commitment)
73					}
74					PostLog::BlockHash(expect_eth_block_hash) => {
75						let ethereum_block = storage_override.current_block(substrate_block_hash);
76						match ethereum_block {
77							Some(block) => {
78								let got_eth_block_hash = block.header.hash();
79								if got_eth_block_hash != expect_eth_block_hash {
80									Err(format!(
81										"Ethereum block hash mismatch: \
82										frontier consensus digest ({expect_eth_block_hash:?}), \
83										db state ({got_eth_block_hash:?})"
84									))
85								} else {
86									let mapping_commitment = gen_from_block(block);
87									backend.mapping().write_hashes(mapping_commitment)
88								}
89							}
90							None => backend.mapping().write_none(substrate_block_hash),
91						}
92					}
93				},
94			}
95		}
96		Err(FindLogError::NotFound) => backend.mapping().write_none(substrate_block_hash),
97		Err(FindLogError::MultipleLogs) => Err("Multiple logs found".to_string()),
98	}
99}
100
101pub fn sync_genesis_block<Block: BlockT, C>(
102	client: &C,
103	backend: &fc_db::kv::Backend<Block, C>,
104	header: &Block::Header,
105) -> Result<(), String>
106where
107	C: HeaderBackend<Block> + ProvideRuntimeApi<Block>,
108	C::Api: EthereumRuntimeRPCApi<Block>,
109{
110	let substrate_block_hash = header.hash();
111
112	if let Some(api_version) = client
113		.runtime_api()
114		.api_version::<dyn EthereumRuntimeRPCApi<Block>>(substrate_block_hash)
115		.map_err(|e| format!("{e:?}"))?
116	{
117		let block = if api_version > 1 {
118			client
119				.runtime_api()
120				.current_block(substrate_block_hash)
121				.map_err(|e| format!("{e:?}"))?
122		} else {
123			#[allow(deprecated)]
124			let legacy_block = client
125				.runtime_api()
126				.current_block_before_version_2(substrate_block_hash)
127				.map_err(|e| format!("{e:?}"))?;
128			legacy_block.map(|block| block.into())
129		};
130		let block_hash = block
131			.ok_or_else(|| "Ethereum genesis block not found".to_string())?
132			.header
133			.hash();
134		let mapping_commitment = fc_db::kv::MappingCommitment::<Block> {
135			block_hash: substrate_block_hash,
136			ethereum_block_hash: block_hash,
137			ethereum_transaction_hashes: Vec::new(),
138		};
139		backend.mapping().write_hashes(mapping_commitment)?;
140	} else {
141		backend.mapping().write_none(substrate_block_hash)?;
142	};
143
144	Ok(())
145}
146
147pub fn sync_one_block<Block: BlockT, C, BE>(
148	client: &C,
149	substrate_backend: &BE,
150	storage_override: Arc<dyn StorageOverride<Block>>,
151	frontier_backend: &fc_db::kv::Backend<Block, C>,
152	sync_from: <Block::Header as HeaderT>::Number,
153	strategy: SyncStrategy,
154	sync_oracle: Arc<dyn SyncOracle + Send + Sync + 'static>,
155	pubsub_notification_sinks: Arc<
156		EthereumBlockNotificationSinks<EthereumBlockNotification<Block>>,
157	>,
158) -> Result<bool, String>
159where
160	C: ProvideRuntimeApi<Block>,
161	C::Api: EthereumRuntimeRPCApi<Block>,
162	C: HeaderBackend<Block> + StorageProvider<Block, BE>,
163	BE: Backend<Block>,
164{
165	let mut current_syncing_tips = frontier_backend.meta().current_syncing_tips()?;
166
167	if current_syncing_tips.is_empty() {
168		let mut leaves = substrate_backend
169			.blockchain()
170			.leaves()
171			.map_err(|e| format!("{e:?}"))?;
172		if leaves.is_empty() {
173			return Ok(false);
174		}
175		current_syncing_tips.append(&mut leaves);
176	}
177
178	let mut operating_header = None;
179	while let Some(checking_tip) = current_syncing_tips.pop() {
180		if let Some(checking_header) = fetch_header(
181			substrate_backend.blockchain(),
182			frontier_backend,
183			checking_tip,
184			sync_from,
185		)? {
186			operating_header = Some(checking_header);
187			break;
188		}
189	}
190	let operating_header = match operating_header {
191		Some(operating_header) => operating_header,
192		None => {
193			frontier_backend
194				.meta()
195				.write_current_syncing_tips(current_syncing_tips)?;
196			return Ok(false);
197		}
198	};
199
200	if operating_header.number() == &Zero::zero() {
201		sync_genesis_block(client, frontier_backend, &operating_header)?;
202
203		frontier_backend
204			.meta()
205			.write_current_syncing_tips(current_syncing_tips)?;
206	} else {
207		if SyncStrategy::Parachain == strategy
208			&& operating_header.number() > &client.info().best_number
209		{
210			return Ok(false);
211		}
212		sync_block(storage_override, frontier_backend, &operating_header)?;
213
214		current_syncing_tips.push(*operating_header.parent_hash());
215		frontier_backend
216			.meta()
217			.write_current_syncing_tips(current_syncing_tips)?;
218	}
219	// Notify on import and remove closed channels.
220	// Only notify when the node is node in major syncing.
221	let sinks = &mut pubsub_notification_sinks.lock();
222	sinks.retain(|sink| {
223		if !sync_oracle.is_major_syncing() {
224			let hash = operating_header.hash();
225			let is_new_best = client.info().best_hash == hash;
226			sink.unbounded_send(EthereumBlockNotification { is_new_best, hash })
227				.is_ok()
228		} else {
229			// Remove from the pool if in major syncing.
230			false
231		}
232	});
233	Ok(true)
234}
235
236pub fn sync_blocks<Block: BlockT, C, BE>(
237	client: &C,
238	substrate_backend: &BE,
239	storage_override: Arc<dyn StorageOverride<Block>>,
240	frontier_backend: &fc_db::kv::Backend<Block, C>,
241	limit: usize,
242	sync_from: <Block::Header as HeaderT>::Number,
243	strategy: SyncStrategy,
244	sync_oracle: Arc<dyn SyncOracle + Send + Sync + 'static>,
245	pubsub_notification_sinks: Arc<
246		EthereumBlockNotificationSinks<EthereumBlockNotification<Block>>,
247	>,
248) -> Result<bool, String>
249where
250	C: ProvideRuntimeApi<Block>,
251	C::Api: EthereumRuntimeRPCApi<Block>,
252	C: HeaderBackend<Block> + StorageProvider<Block, BE>,
253	BE: Backend<Block>,
254{
255	let mut synced_any = false;
256
257	for _ in 0..limit {
258		synced_any = synced_any
259			|| sync_one_block(
260				client,
261				substrate_backend,
262				storage_override.clone(),
263				frontier_backend,
264				sync_from,
265				strategy,
266				sync_oracle.clone(),
267				pubsub_notification_sinks.clone(),
268			)?;
269	}
270
271	Ok(synced_any)
272}
273
274pub fn fetch_header<Block: BlockT, C, BE>(
275	substrate_backend: &BE,
276	frontier_backend: &fc_db::kv::Backend<Block, C>,
277	checking_tip: Block::Hash,
278	sync_from: <Block::Header as HeaderT>::Number,
279) -> Result<Option<Block::Header>, String>
280where
281	C: HeaderBackend<Block>,
282	BE: HeaderBackend<Block>,
283{
284	if frontier_backend.mapping().is_synced(&checking_tip)? {
285		return Ok(None);
286	}
287
288	match substrate_backend.header(checking_tip) {
289		Ok(Some(checking_header)) if checking_header.number() >= &sync_from => {
290			Ok(Some(checking_header))
291		}
292		Ok(Some(_)) => Ok(None),
293		Ok(None) | Err(_) => Err("Header not found".to_string()),
294	}
295}