1use std::{
20 fmt, fs,
21 io::{self, ErrorKind, Read, Write},
22 path::{Path, PathBuf},
23 sync::Arc,
24};
25
26use scale_codec::{Decode, Encode};
27use sc_client_db::DatabaseSource;
29use sp_blockchain::HeaderBackend;
30use sp_core::H256;
31use sp_runtime::traits::Block as BlockT;
32
33const VERSION_FILE_NAME: &str = "db_version";
35
36const CURRENT_VERSION: u32 = 2;
38
39const _V1_NUM_COLUMNS: u32 = 4;
41const V2_NUM_COLUMNS: u32 = 4;
42
43#[derive(Debug)]
45pub(crate) enum UpgradeError {
46 UnknownDatabaseVersion,
48 UnsupportedVersion(u32),
50 FutureDatabaseVersion(u32),
52 Io(io::Error),
54}
55
56pub(crate) type UpgradeResult<T> = Result<T, UpgradeError>;
57
58pub(crate) struct UpgradeVersion1To2Summary {
59 pub success: u32,
60 pub error: Vec<H256>,
61}
62
63impl From<io::Error> for UpgradeError {
64 fn from(err: io::Error) -> Self {
65 UpgradeError::Io(err)
66 }
67}
68
69impl fmt::Display for UpgradeError {
70 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71 match self {
72 UpgradeError::UnknownDatabaseVersion => {
73 write!(
74 f,
75 "Database version cannot be read from existing db_version file"
76 )
77 }
78 UpgradeError::UnsupportedVersion(version) => {
79 write!(f, "Database version no longer supported: {version}")
80 }
81 UpgradeError::FutureDatabaseVersion(version) => {
82 write!(
83 f,
84 "Database version comes from future version of the client: {version}"
85 )
86 }
87 UpgradeError::Io(err) => write!(f, "Io error: {err}"),
88 }
89 }
90}
91
92pub(crate) fn upgrade_db<Block: BlockT, C: HeaderBackend<Block>>(
94 client: Arc<C>,
95 db_path: &Path,
96 source: &DatabaseSource,
97) -> UpgradeResult<()> {
98 let db_version = current_version(db_path)?;
99 match db_version {
100 0 => return Err(UpgradeError::UnsupportedVersion(db_version)),
101 1 => {
102 let summary: UpgradeVersion1To2Summary = match source {
103 DatabaseSource::ParityDb { .. } => {
104 migrate_1_to_2_parity_db::<Block, C>(client, db_path)?
105 }
106 #[cfg(feature = "rocksdb")]
107 DatabaseSource::RocksDb { .. } => migrate_1_to_2_rocks_db::<Block, C>(client, db_path)?,
108 _ => panic!("DatabaseSource required for upgrade ParityDb | RocksDb"),
109 };
110 if !summary.error.is_empty() {
111 panic!(
112 "Inconsistent migration from version 1 to 2. Failed on {:?}",
113 summary.error
114 );
115 } else {
116 log::info!("✔️ Successful Frontier DB migration from version 1 to version 2 ({:?} entries).", summary.success);
117 }
118 }
119 CURRENT_VERSION => (),
120 _ => return Err(UpgradeError::FutureDatabaseVersion(db_version)),
121 }
122 update_version(db_path)?;
123 Ok(())
124}
125
126pub(crate) fn current_version(path: &Path) -> UpgradeResult<u32> {
129 match fs::File::open(version_file_path(path)) {
130 Err(ref err) if err.kind() == ErrorKind::NotFound => {
131 fs::create_dir_all(path)?;
132 let mut file = fs::File::create(version_file_path(path))?;
133 file.write_all(format!("{CURRENT_VERSION}").as_bytes())?;
134 Ok(CURRENT_VERSION)
135 }
136 Err(_) => Err(UpgradeError::UnknownDatabaseVersion),
137 Ok(mut file) => {
138 let mut s = String::new();
139 file.read_to_string(&mut s)
140 .map_err(|_| UpgradeError::UnknownDatabaseVersion)?;
141 s.parse::<u32>()
142 .map_err(|_| UpgradeError::UnknownDatabaseVersion)
143 }
144 }
145}
146
147pub(crate) fn update_version(path: &Path) -> io::Result<()> {
150 fs::create_dir_all(path)?;
151 let mut file = fs::File::create(version_file_path(path))?;
152 file.write_all(format!("{CURRENT_VERSION}").as_bytes())?;
153 Ok(())
154}
155
156fn version_file_path(path: &Path) -> PathBuf {
158 let mut file_path = path.to_owned();
159 file_path.push(VERSION_FILE_NAME);
160 file_path
161}
162
163#[cfg(feature = "rocksdb")]
167pub(crate) fn migrate_1_to_2_rocks_db<Block: BlockT, C: HeaderBackend<Block>>(
168 client: Arc<C>,
169 db_path: &Path,
170) -> UpgradeResult<UpgradeVersion1To2Summary> {
171 log::info!("🔨 Running Frontier DB migration from version 1 to version 2. Please wait.");
172 let mut res = UpgradeVersion1To2Summary {
173 success: 0,
174 error: vec![],
175 };
176 #[rustfmt::skip]
178 let mut process_chunk = |
179 db: &kvdb_rocksdb::Database,
180 ethereum_hashes: &[smallvec::SmallVec<[u8; 32]>]
181 | -> UpgradeResult<()> {
182 let mut transaction = db.transaction();
183 for ethereum_hash in ethereum_hashes {
184 let mut maybe_error = true;
185 if let Some(substrate_hash) = db.get(super::columns::BLOCK_MAPPING, ethereum_hash)? {
186 let decoded = Vec::<Block::Hash>::decode(&mut &substrate_hash[..]);
188 if decoded.is_err() || decoded.unwrap().is_empty() {
189 if let Ok(Some(number)) = client.number(Block::Hash::decode(&mut &substrate_hash[..]).unwrap()) {
191 if let Ok(Some(hash)) = client.hash(number) {
192 transaction.put_vec(
193 super::columns::BLOCK_MAPPING,
194 ethereum_hash,
195 vec![hash].encode(),
196 );
197 res.success += 1;
198 maybe_error = false;
199 }
200 }
201 } else {
202 res.success += 1;
205 maybe_error = false;
206 }
207 }
208 if maybe_error {
209 res.error.push(H256::from_slice(ethereum_hash));
210 }
211 }
212 db.write(transaction)
213 .map_err(|_| io::Error::other("Failed to commit on migrate_1_to_2"))?;
214 log::debug!(
215 target: "fc-db-upgrade",
216 "🔨 Success {}, error {}.",
217 res.success,
218 res.error.len()
219 );
220 Ok(())
221 };
222
223 let db_cfg = kvdb_rocksdb::DatabaseConfig::with_columns(V2_NUM_COLUMNS);
224 let db = kvdb_rocksdb::Database::open(&db_cfg, db_path)?;
225
226 let ethereum_hashes: Vec<_> = db
228 .iter(super::columns::BLOCK_MAPPING)
229 .filter_map(|entry| entry.map_or(None, |r| Some(r.0)))
230 .collect();
231
232 const CHUNK_SIZE: usize = 10_000;
234 let chunks = ethereum_hashes.chunks(CHUNK_SIZE);
235 let all_len = ethereum_hashes.len();
236 for (i, chunk) in chunks.enumerate() {
237 process_chunk(&db, chunk)?;
238 log::debug!(
239 target: "fc-db-upgrade",
240 "🔨 Processed {} of {} entries.",
241 (CHUNK_SIZE * (i + 1)),
242 all_len
243 );
244 }
245 Ok(res)
246}
247
248pub(crate) fn migrate_1_to_2_parity_db<Block: BlockT, C: HeaderBackend<Block>>(
249 client: Arc<C>,
250 db_path: &Path,
251) -> UpgradeResult<UpgradeVersion1To2Summary> {
252 log::info!("🔨 Running Frontier DB migration from version 1 to version 2. Please wait.");
253 let mut res = UpgradeVersion1To2Summary {
254 success: 0,
255 error: vec![],
256 };
257 #[rustfmt::skip]
259 let mut process_chunk = |
260 db: &parity_db::Db,
261 ethereum_hashes: &[Vec<u8>]
262 | -> UpgradeResult<()> {
263 let mut transaction = vec![];
264 for ethereum_hash in ethereum_hashes {
265 let mut maybe_error = true;
266 if let Some(substrate_hash) = db.get(super::columns::BLOCK_MAPPING as u8, ethereum_hash).map_err(|_|
267 io::Error::other("Key does not exist")
268 )? {
269 let decoded = Vec::<Block::Hash>::decode(&mut &substrate_hash[..]);
271 if decoded.is_err() || decoded.unwrap().is_empty() {
272 if let Ok(Some(number)) = client.number(Block::Hash::decode(&mut &substrate_hash[..]).unwrap()) {
274 if let Ok(Some(hash)) = client.hash(number) {
275 transaction.push((
276 super::columns::BLOCK_MAPPING as u8,
277 ethereum_hash,
278 Some(vec![hash].encode()),
279 ));
280 res.success += 1;
281 maybe_error = false;
282 }
283 }
284 }
285 }
286 if maybe_error {
287 res.error.push(H256::from_slice(ethereum_hash));
288 }
289 }
290 db.commit(transaction)
291 .map_err(|_| io::Error::other("Failed to commit on migrate_1_to_2"))?;
292 Ok(())
293 };
294
295 let mut db_cfg = parity_db::Options::with_columns(db_path, V2_NUM_COLUMNS as u8);
296 db_cfg.columns[super::columns::BLOCK_MAPPING as usize].btree_index = true;
297
298 let db = parity_db::Db::open_or_create(&db_cfg)
299 .map_err(|_| io::Error::other("Failed to open db"))?;
300
301 let ethereum_hashes: Vec<_> = match db.iter(super::columns::BLOCK_MAPPING as u8) {
303 Ok(mut iter) => {
304 let mut hashes = vec![];
305 while let Ok(Some((k, _))) = iter.next() {
306 hashes.push(k);
307 }
308 hashes
309 }
310 Err(_) => vec![],
311 };
312 const CHUNK_SIZE: usize = 10_000;
314 let chunks = ethereum_hashes.chunks(CHUNK_SIZE);
315 for chunk in chunks {
316 process_chunk(&db, chunk)?;
317 }
318 Ok(res)
319}
320
321#[cfg(test)]
322mod tests {
323 use std::{
324 io::{Read, Write},
325 sync::Arc,
326 };
327
328 use futures::executor;
329 use scale_codec::Encode;
330 use tempfile::tempdir;
331 use sc_block_builder::BlockBuilderBuilder;
333 use sp_blockchain::HeaderBackend;
334 use sp_consensus::BlockOrigin;
335 use sp_core::H256;
336 use sp_runtime::{
337 generic::{Block, Header},
338 traits::{BlakeTwo256, Block as BlockT, Header as HeaderT},
339 };
340 use substrate_test_runtime_client::{
341 prelude::*, DefaultTestClientBuilderExt, TestClientBuilder,
342 };
343
344 type OpaqueBlock =
345 Block<Header<u64, BlakeTwo256>, substrate_test_runtime_client::runtime::Extrinsic>;
346
347 pub fn open_frontier_backend<Block: BlockT, C: HeaderBackend<Block>>(
348 client: Arc<C>,
349 setting: &crate::kv::DatabaseSettings,
350 ) -> Result<Arc<crate::kv::Backend<Block, C>>, String> {
351 Ok(Arc::new(crate::kv::Backend::<Block, C>::new(
352 client, setting,
353 )?))
354 }
355
356 #[cfg_attr(not(feature = "rocksdb"), ignore)]
357 #[test]
358 fn upgrade_1_to_2_works() {
359 let settings: Vec<crate::kv::DatabaseSettings> = vec![
360 #[cfg(feature = "rocksdb")]
362 crate::kv::DatabaseSettings {
363 source: sc_client_db::DatabaseSource::RocksDb {
364 path: tempdir()
365 .expect("create a temporary directory")
366 .path()
367 .to_owned(),
368 cache_size: 0,
369 },
370 },
371 crate::kv::DatabaseSettings {
373 source: sc_client_db::DatabaseSource::ParityDb {
374 path: tempdir()
375 .expect("create a temporary directory")
376 .path()
377 .to_owned(),
378 },
379 },
380 ];
381
382 for setting in settings {
383 let (client, _) = TestClientBuilder::new()
384 .build_with_native_executor::<substrate_test_runtime_client::runtime::RuntimeApi, _>(
385 None,
386 );
387 let client = Arc::new(client);
388
389 let chain_info = client.chain_info();
391 let mut builder = BlockBuilderBuilder::new(&*client)
392 .on_parent_block(chain_info.best_hash)
393 .with_parent_block_number(chain_info.best_number)
394 .build()
395 .unwrap();
396 builder.push_storage_change(vec![1], None).unwrap();
397 let block = builder.build().unwrap().block;
398 let mut previous_canon_block_hash = block.header.hash();
399 let mut previous_canon_block_number = *block.header.number();
400 executor::block_on(client.import(BlockOrigin::Own, block)).unwrap();
401
402 let path = setting.source.path().unwrap();
403
404 let mut ethereum_hashes = vec![];
405 let mut substrate_hashes = vec![];
406 let mut transaction_hashes = vec![];
407 {
408 let backend = open_frontier_backend::<OpaqueBlock, _>(client.clone(), &setting)
410 .expect("a temporary db was created");
411
412 let mut transaction = sp_database::Transaction::new();
414 for _ in 0..50 {
415 let ethhash = H256::random();
417 let mut builder = BlockBuilderBuilder::new(&*client)
421 .on_parent_block(previous_canon_block_hash)
422 .with_parent_block_number(previous_canon_block_number)
423 .build()
424 .unwrap();
425 builder.push_storage_change(vec![1], None).unwrap();
426 let block = builder.build().unwrap().block;
427 let next_canon_block_hash = block.header.hash();
428 let next_canon_block_number = *block.header.number();
429 executor::block_on(client.import(BlockOrigin::Own, block)).unwrap();
430 let mut builder = BlockBuilderBuilder::new(&*client)
432 .on_parent_block(previous_canon_block_hash)
433 .with_parent_block_number(previous_canon_block_number)
434 .build()
435 .unwrap();
436 builder.push_storage_change(vec![2], None).unwrap();
437 let block = builder.build().unwrap().block;
438 let orphan_block_hash = block.header.hash();
439 executor::block_on(client.import(BlockOrigin::Own, block)).unwrap();
440
441 ethereum_hashes.push(ethhash);
443 substrate_hashes.push(next_canon_block_hash);
444 transaction.set(
446 crate::kv::columns::BLOCK_MAPPING,
447 ðhash.encode(),
448 &orphan_block_hash.encode(),
449 );
450 let eth_tx_hash = H256::random();
454 let mut metadata = vec![];
455 for hash in [next_canon_block_hash, orphan_block_hash] {
456 metadata.push(crate::kv::TransactionMetadata::<OpaqueBlock> {
457 substrate_block_hash: hash,
458 ethereum_block_hash: ethhash,
459 ethereum_index: 0u32,
460 });
461 }
462 transaction.set(
463 crate::kv::columns::TRANSACTION_MAPPING,
464 ð_tx_hash.encode(),
465 &metadata.encode(),
466 );
467 transaction_hashes.push(eth_tx_hash);
468 previous_canon_block_hash = next_canon_block_hash;
469 previous_canon_block_number = next_canon_block_number;
470 }
471 let _ = backend.mapping().db.commit(transaction);
472 }
473
474 std::fs::create_dir_all(path).expect("db path created");
476 let mut version_path = path.to_owned();
477 version_path.push("db_version");
478 let mut version_file =
479 std::fs::File::create(version_path).expect("db version file path created");
480 version_file
481 .write_all(format!("{}", 1).as_bytes())
482 .expect("write version 1");
483
484 let _ = super::upgrade_db::<OpaqueBlock, _>(client.clone(), path, &setting.source);
486
487 let backend = open_frontier_backend::<OpaqueBlock, _>(client, &setting)
489 .expect("a temporary db was created");
490 for (i, original_ethereum_hash) in ethereum_hashes.iter().enumerate() {
491 let canon_substrate_block_hash = substrate_hashes.get(i).expect("Block hash");
492 let mapped_block = backend
493 .mapping()
494 .block_hash(original_ethereum_hash)
495 .unwrap()
496 .unwrap();
497 assert_eq!(mapped_block.len(), 1);
499 assert_eq!(mapped_block.first(), Some(canon_substrate_block_hash));
501 let mapped_transaction = backend
503 .mapping()
504 .transaction_metadata(transaction_hashes.get(i).expect("Transaction hash"))
505 .unwrap();
506 assert!(mapped_transaction
507 .into_iter()
508 .any(|tx| tx.substrate_block_hash == *canon_substrate_block_hash));
509 }
510
511 assert_eq!(super::current_version(path).expect("version"), 2u32);
513 }
514 }
515
516 #[cfg(feature = "rocksdb")]
517 #[test]
518 fn create_db_with_current_version_works() {
519 let tmp = tempdir().expect("create a temporary directory");
520
521 let (client, _) = TestClientBuilder::new()
522 .build_with_native_executor::<substrate_test_runtime_client::runtime::RuntimeApi, _>(
523 None,
524 );
525 let client = Arc::new(client);
526
527 let setting = crate::kv::DatabaseSettings {
528 source: sc_client_db::DatabaseSource::RocksDb {
529 path: tmp.path().to_owned(),
530 cache_size: 0,
531 },
532 };
533 let path = setting.source.path().unwrap();
534 let _ = super::upgrade_db::<OpaqueBlock, _>(client, path, &setting.source);
535
536 let mut file =
537 std::fs::File::open(crate::kv::upgrade::version_file_path(path)).expect("file exist");
538
539 let mut s = String::new();
540 file.read_to_string(&mut s).expect("read file contents");
541 assert_eq!(s.parse::<u32>().expect("parse file contents"), 2u32);
542 }
543}