1#![allow(clippy::too_many_arguments)]
20
21mod worker;
22
23pub use worker::MappingSyncWorker;
24
25use std::sync::Arc;
26
27use sc_client_api::backend::{Backend, StorageProvider};
29use sp_api::{ApiExt, ProvideRuntimeApi};
30use sp_blockchain::{Backend as _, HeaderBackend};
31use sp_consensus::SyncOracle;
32use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Zero};
33use fc_storage::StorageOverride;
35use fp_consensus::{FindLogError, Hashes, Log, PostLog, PreLog};
36use fp_rpc::EthereumRuntimeRPCApi;
37
38use crate::{EthereumBlockNotification, EthereumBlockNotificationSinks, SyncStrategy};
39
40pub fn sync_block<Block: BlockT, C: HeaderBackend<Block>>(
41 storage_override: Arc<dyn StorageOverride<Block>>,
42 backend: &fc_db::kv::Backend<Block, C>,
43 header: &Block::Header,
44) -> Result<(), String> {
45 let substrate_block_hash = header.hash();
46 match fp_consensus::find_log(header.digest()) {
47 Ok(log) => {
48 let gen_from_hashes = |hashes: Hashes| -> fc_db::kv::MappingCommitment<Block> {
49 fc_db::kv::MappingCommitment {
50 block_hash: substrate_block_hash,
51 ethereum_block_hash: hashes.block_hash,
52 ethereum_transaction_hashes: hashes.transaction_hashes,
53 }
54 };
55 let gen_from_block = |block| -> fc_db::kv::MappingCommitment<Block> {
56 let hashes = Hashes::from_block(block);
57 gen_from_hashes(hashes)
58 };
59
60 match log {
61 Log::Pre(PreLog::Block(block)) => {
62 let mapping_commitment = gen_from_block(block);
63 backend.mapping().write_hashes(mapping_commitment)
64 }
65 Log::Post(post_log) => match post_log {
66 PostLog::Hashes(hashes) => {
67 let mapping_commitment = gen_from_hashes(hashes);
68 backend.mapping().write_hashes(mapping_commitment)
69 }
70 PostLog::Block(block) => {
71 let mapping_commitment = gen_from_block(block);
72 backend.mapping().write_hashes(mapping_commitment)
73 }
74 PostLog::BlockHash(expect_eth_block_hash) => {
75 let ethereum_block = storage_override.current_block(substrate_block_hash);
76 match ethereum_block {
77 Some(block) => {
78 let got_eth_block_hash = block.header.hash();
79 if got_eth_block_hash != expect_eth_block_hash {
80 Err(format!(
81 "Ethereum block hash mismatch: \
82 frontier consensus digest ({expect_eth_block_hash:?}), \
83 db state ({got_eth_block_hash:?})"
84 ))
85 } else {
86 let mapping_commitment = gen_from_block(block);
87 backend.mapping().write_hashes(mapping_commitment)
88 }
89 }
90 None => backend.mapping().write_none(substrate_block_hash),
91 }
92 }
93 },
94 }
95 }
96 Err(FindLogError::NotFound) => backend.mapping().write_none(substrate_block_hash),
97 Err(FindLogError::MultipleLogs) => Err("Multiple logs found".to_string()),
98 }
99}
100
101pub fn sync_genesis_block<Block: BlockT, C>(
102 client: &C,
103 backend: &fc_db::kv::Backend<Block, C>,
104 header: &Block::Header,
105) -> Result<(), String>
106where
107 C: HeaderBackend<Block> + ProvideRuntimeApi<Block>,
108 C::Api: EthereumRuntimeRPCApi<Block>,
109{
110 let substrate_block_hash = header.hash();
111
112 if let Some(api_version) = client
113 .runtime_api()
114 .api_version::<dyn EthereumRuntimeRPCApi<Block>>(substrate_block_hash)
115 .map_err(|e| format!("{e:?}"))?
116 {
117 let block = if api_version > 1 {
118 client
119 .runtime_api()
120 .current_block(substrate_block_hash)
121 .map_err(|e| format!("{e:?}"))?
122 } else {
123 #[allow(deprecated)]
124 let legacy_block = client
125 .runtime_api()
126 .current_block_before_version_2(substrate_block_hash)
127 .map_err(|e| format!("{e:?}"))?;
128 legacy_block.map(|block| block.into())
129 };
130 let block_hash = block
131 .ok_or_else(|| "Ethereum genesis block not found".to_string())?
132 .header
133 .hash();
134 let mapping_commitment = fc_db::kv::MappingCommitment::<Block> {
135 block_hash: substrate_block_hash,
136 ethereum_block_hash: block_hash,
137 ethereum_transaction_hashes: Vec::new(),
138 };
139 backend.mapping().write_hashes(mapping_commitment)?;
140 } else {
141 backend.mapping().write_none(substrate_block_hash)?;
142 };
143
144 Ok(())
145}
146
147pub fn sync_one_block<Block: BlockT, C, BE>(
148 client: &C,
149 substrate_backend: &BE,
150 storage_override: Arc<dyn StorageOverride<Block>>,
151 frontier_backend: &fc_db::kv::Backend<Block, C>,
152 sync_from: <Block::Header as HeaderT>::Number,
153 strategy: SyncStrategy,
154 sync_oracle: Arc<dyn SyncOracle + Send + Sync + 'static>,
155 pubsub_notification_sinks: Arc<
156 EthereumBlockNotificationSinks<EthereumBlockNotification<Block>>,
157 >,
158) -> Result<bool, String>
159where
160 C: ProvideRuntimeApi<Block>,
161 C::Api: EthereumRuntimeRPCApi<Block>,
162 C: HeaderBackend<Block> + StorageProvider<Block, BE>,
163 BE: Backend<Block>,
164{
165 let mut current_syncing_tips = frontier_backend.meta().current_syncing_tips()?;
166
167 if current_syncing_tips.is_empty() {
168 let mut leaves = substrate_backend
169 .blockchain()
170 .leaves()
171 .map_err(|e| format!("{e:?}"))?;
172 if leaves.is_empty() {
173 return Ok(false);
174 }
175 current_syncing_tips.append(&mut leaves);
176 }
177
178 let mut operating_header = None;
179 while let Some(checking_tip) = current_syncing_tips.pop() {
180 if let Some(checking_header) = fetch_header(
181 substrate_backend.blockchain(),
182 frontier_backend,
183 checking_tip,
184 sync_from,
185 )? {
186 operating_header = Some(checking_header);
187 break;
188 }
189 }
190 let operating_header = match operating_header {
191 Some(operating_header) => operating_header,
192 None => {
193 frontier_backend
194 .meta()
195 .write_current_syncing_tips(current_syncing_tips)?;
196 return Ok(false);
197 }
198 };
199
200 if operating_header.number() == &Zero::zero() {
201 sync_genesis_block(client, frontier_backend, &operating_header)?;
202
203 frontier_backend
204 .meta()
205 .write_current_syncing_tips(current_syncing_tips)?;
206 } else {
207 if SyncStrategy::Parachain == strategy
208 && operating_header.number() > &client.info().best_number
209 {
210 return Ok(false);
211 }
212 sync_block(storage_override, frontier_backend, &operating_header)?;
213
214 current_syncing_tips.push(*operating_header.parent_hash());
215 frontier_backend
216 .meta()
217 .write_current_syncing_tips(current_syncing_tips)?;
218 }
219 let sinks = &mut pubsub_notification_sinks.lock();
222 sinks.retain(|sink| {
223 if !sync_oracle.is_major_syncing() {
224 let hash = operating_header.hash();
225 let is_new_best = client.info().best_hash == hash;
226 sink.unbounded_send(EthereumBlockNotification { is_new_best, hash })
227 .is_ok()
228 } else {
229 false
231 }
232 });
233 Ok(true)
234}
235
236pub fn sync_blocks<Block: BlockT, C, BE>(
237 client: &C,
238 substrate_backend: &BE,
239 storage_override: Arc<dyn StorageOverride<Block>>,
240 frontier_backend: &fc_db::kv::Backend<Block, C>,
241 limit: usize,
242 sync_from: <Block::Header as HeaderT>::Number,
243 strategy: SyncStrategy,
244 sync_oracle: Arc<dyn SyncOracle + Send + Sync + 'static>,
245 pubsub_notification_sinks: Arc<
246 EthereumBlockNotificationSinks<EthereumBlockNotification<Block>>,
247 >,
248) -> Result<bool, String>
249where
250 C: ProvideRuntimeApi<Block>,
251 C::Api: EthereumRuntimeRPCApi<Block>,
252 C: HeaderBackend<Block> + StorageProvider<Block, BE>,
253 BE: Backend<Block>,
254{
255 let mut synced_any = false;
256
257 for _ in 0..limit {
258 synced_any = synced_any
259 || sync_one_block(
260 client,
261 substrate_backend,
262 storage_override.clone(),
263 frontier_backend,
264 sync_from,
265 strategy,
266 sync_oracle.clone(),
267 pubsub_notification_sinks.clone(),
268 )?;
269 }
270
271 Ok(synced_any)
272}
273
274pub fn fetch_header<Block: BlockT, C, BE>(
275 substrate_backend: &BE,
276 frontier_backend: &fc_db::kv::Backend<Block, C>,
277 checking_tip: Block::Hash,
278 sync_from: <Block::Header as HeaderT>::Number,
279) -> Result<Option<Block::Header>, String>
280where
281 C: HeaderBackend<Block>,
282 BE: HeaderBackend<Block>,
283{
284 if frontier_backend.mapping().is_synced(&checking_tip)? {
285 return Ok(None);
286 }
287
288 match substrate_backend.header(checking_tip) {
289 Ok(Some(checking_header)) if checking_header.number() >= &sync_from => {
290 Ok(Some(checking_header))
291 }
292 Ok(Some(_)) => Ok(None),
293 Ok(None) | Err(_) => Err("Header not found".to_string()),
294 }
295}