fc_db/kv/
upgrade.rs

1// This file is part of Frontier.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use std::{
20	fmt, fs,
21	io::{self, ErrorKind, Read, Write},
22	path::{Path, PathBuf},
23	sync::Arc,
24};
25
26use scale_codec::{Decode, Encode};
27// Substrate
28use sc_client_db::DatabaseSource;
29use sp_blockchain::HeaderBackend;
30use sp_core::H256;
31use sp_runtime::traits::Block as BlockT;
32
33/// Version file name.
34const VERSION_FILE_NAME: &str = "db_version";
35
36/// Current db version.
37const CURRENT_VERSION: u32 = 2;
38
39/// Number of columns in each version.
40const _V1_NUM_COLUMNS: u32 = 4;
41const V2_NUM_COLUMNS: u32 = 4;
42
43/// Database upgrade errors.
44#[derive(Debug)]
45pub(crate) enum UpgradeError {
46	/// Database version cannot be read from existing db_version file.
47	UnknownDatabaseVersion,
48	/// Database version no longer supported.
49	UnsupportedVersion(u32),
50	/// Database version comes from future version of the client.
51	FutureDatabaseVersion(u32),
52	/// Common io error.
53	Io(io::Error),
54}
55
56pub(crate) type UpgradeResult<T> = Result<T, UpgradeError>;
57
58pub(crate) struct UpgradeVersion1To2Summary {
59	pub success: u32,
60	pub error: Vec<H256>,
61}
62
63impl From<io::Error> for UpgradeError {
64	fn from(err: io::Error) -> Self {
65		UpgradeError::Io(err)
66	}
67}
68
69impl fmt::Display for UpgradeError {
70	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71		match self {
72			UpgradeError::UnknownDatabaseVersion => {
73				write!(
74					f,
75					"Database version cannot be read from existing db_version file"
76				)
77			}
78			UpgradeError::UnsupportedVersion(version) => {
79				write!(f, "Database version no longer supported: {version}")
80			}
81			UpgradeError::FutureDatabaseVersion(version) => {
82				write!(
83					f,
84					"Database version comes from future version of the client: {version}"
85				)
86			}
87			UpgradeError::Io(err) => write!(f, "Io error: {err}"),
88		}
89	}
90}
91
92/// Upgrade database to current version.
93pub(crate) fn upgrade_db<Block: BlockT, C: HeaderBackend<Block>>(
94	client: Arc<C>,
95	db_path: &Path,
96	source: &DatabaseSource,
97) -> UpgradeResult<()> {
98	let db_version = current_version(db_path)?;
99	match db_version {
100		0 => return Err(UpgradeError::UnsupportedVersion(db_version)),
101		1 => {
102			let summary: UpgradeVersion1To2Summary = match source {
103				DatabaseSource::ParityDb { .. } => {
104					migrate_1_to_2_parity_db::<Block, C>(client, db_path)?
105				}
106				#[cfg(feature = "rocksdb")]
107				DatabaseSource::RocksDb { .. } => migrate_1_to_2_rocks_db::<Block, C>(client, db_path)?,
108				_ => panic!("DatabaseSource required for upgrade ParityDb | RocksDb"),
109			};
110			if !summary.error.is_empty() {
111				panic!(
112					"Inconsistent migration from version 1 to 2. Failed on {:?}",
113					summary.error
114				);
115			} else {
116				log::info!("✔️ Successful Frontier DB migration from version 1 to version 2 ({:?} entries).", summary.success);
117			}
118		}
119		CURRENT_VERSION => (),
120		_ => return Err(UpgradeError::FutureDatabaseVersion(db_version)),
121	}
122	update_version(db_path)?;
123	Ok(())
124}
125
126/// Reads current database version from the file at given path.
127/// If the file does not exist it gets created with version 1.
128pub(crate) fn current_version(path: &Path) -> UpgradeResult<u32> {
129	match fs::File::open(version_file_path(path)) {
130		Err(ref err) if err.kind() == ErrorKind::NotFound => {
131			fs::create_dir_all(path)?;
132			let mut file = fs::File::create(version_file_path(path))?;
133			file.write_all(format!("{CURRENT_VERSION}").as_bytes())?;
134			Ok(CURRENT_VERSION)
135		}
136		Err(_) => Err(UpgradeError::UnknownDatabaseVersion),
137		Ok(mut file) => {
138			let mut s = String::new();
139			file.read_to_string(&mut s)
140				.map_err(|_| UpgradeError::UnknownDatabaseVersion)?;
141			s.parse::<u32>()
142				.map_err(|_| UpgradeError::UnknownDatabaseVersion)
143		}
144	}
145}
146
147/// Writes current database version to the file.
148/// Creates a new file if the version file does not exist yet.
149pub(crate) fn update_version(path: &Path) -> io::Result<()> {
150	fs::create_dir_all(path)?;
151	let mut file = fs::File::create(version_file_path(path))?;
152	file.write_all(format!("{CURRENT_VERSION}").as_bytes())?;
153	Ok(())
154}
155
156/// Returns the version file path.
157fn version_file_path(path: &Path) -> PathBuf {
158	let mut file_path = path.to_owned();
159	file_path.push(VERSION_FILE_NAME);
160	file_path
161}
162
163/// Migration from version1 to version2:
164/// - The format of the Ethereum<>Substrate block mapping changed to support equivocation.
165/// - Migrating schema from One-to-one to One-to-many (EthHash: Vec<SubstrateHash>) relationship.
166#[cfg(feature = "rocksdb")]
167pub(crate) fn migrate_1_to_2_rocks_db<Block: BlockT, C: HeaderBackend<Block>>(
168	client: Arc<C>,
169	db_path: &Path,
170) -> UpgradeResult<UpgradeVersion1To2Summary> {
171	log::info!("🔨 Running Frontier DB migration from version 1 to version 2. Please wait.");
172	let mut res = UpgradeVersion1To2Summary {
173		success: 0,
174		error: vec![],
175	};
176	// Process a batch of hashes in a single db transaction
177	#[rustfmt::skip]
178	let mut process_chunk = |
179		db: &kvdb_rocksdb::Database,
180		ethereum_hashes: &[smallvec::SmallVec<[u8; 32]>]
181	| -> UpgradeResult<()> {
182		let mut transaction = db.transaction();
183		for ethereum_hash in ethereum_hashes {
184			let mut maybe_error = true;
185			if let Some(substrate_hash) = db.get(super::columns::BLOCK_MAPPING, ethereum_hash)? {
186				// Only update version1 data
187				let decoded = Vec::<Block::Hash>::decode(&mut &substrate_hash[..]);
188				if decoded.is_err() || decoded.unwrap().is_empty() {
189					// Verify the substrate hash is part of the canonical chain.
190					if let Ok(Some(number)) = client.number(Block::Hash::decode(&mut &substrate_hash[..]).unwrap()) {
191						if let Ok(Some(hash)) = client.hash(number) {
192							transaction.put_vec(
193								super::columns::BLOCK_MAPPING,
194								ethereum_hash,
195								vec![hash].encode(),
196							);
197							res.success += 1;
198							maybe_error = false;
199						}
200					}
201				} else {
202					// If version 2 data, we just consider this hash a success.
203					// This can happen if the process was closed in the middle of the migration.
204					res.success += 1;
205					maybe_error = false;
206				}
207			}
208			if maybe_error {
209				res.error.push(H256::from_slice(ethereum_hash));
210			}
211		}
212		db.write(transaction)
213			.map_err(|_| io::Error::other("Failed to commit on migrate_1_to_2"))?;
214		log::debug!(
215			target: "fc-db-upgrade",
216			"🔨 Success {}, error {}.",
217			res.success,
218			res.error.len()
219		);
220		Ok(())
221	};
222
223	let db_cfg = kvdb_rocksdb::DatabaseConfig::with_columns(V2_NUM_COLUMNS);
224	let db = kvdb_rocksdb::Database::open(&db_cfg, db_path)?;
225
226	// Get all the block hashes we need to update
227	let ethereum_hashes: Vec<_> = db
228		.iter(super::columns::BLOCK_MAPPING)
229		.filter_map(|entry| entry.map_or(None, |r| Some(r.0)))
230		.collect();
231
232	// Read and update each entry in db transaction batches
233	const CHUNK_SIZE: usize = 10_000;
234	let chunks = ethereum_hashes.chunks(CHUNK_SIZE);
235	let all_len = ethereum_hashes.len();
236	for (i, chunk) in chunks.enumerate() {
237		process_chunk(&db, chunk)?;
238		log::debug!(
239			target: "fc-db-upgrade",
240			"🔨 Processed {} of {} entries.",
241			(CHUNK_SIZE * (i + 1)),
242			all_len
243		);
244	}
245	Ok(res)
246}
247
248pub(crate) fn migrate_1_to_2_parity_db<Block: BlockT, C: HeaderBackend<Block>>(
249	client: Arc<C>,
250	db_path: &Path,
251) -> UpgradeResult<UpgradeVersion1To2Summary> {
252	log::info!("🔨 Running Frontier DB migration from version 1 to version 2. Please wait.");
253	let mut res = UpgradeVersion1To2Summary {
254		success: 0,
255		error: vec![],
256	};
257	// Process a batch of hashes in a single db transaction
258	#[rustfmt::skip]
259	let mut process_chunk = |
260		db: &parity_db::Db,
261		ethereum_hashes: &[Vec<u8>]
262	| -> UpgradeResult<()> {
263		let mut transaction = vec![];
264		for ethereum_hash in ethereum_hashes {
265			let mut maybe_error = true;
266			if let Some(substrate_hash) = db.get(super::columns::BLOCK_MAPPING as u8, ethereum_hash).map_err(|_|
267				io::Error::other("Key does not exist")
268			)? {
269				// Only update version1 data
270				let decoded = Vec::<Block::Hash>::decode(&mut &substrate_hash[..]);
271				if decoded.is_err() || decoded.unwrap().is_empty() {
272					// Verify the substrate hash is part of the canonical chain.
273					if let Ok(Some(number)) = client.number(Block::Hash::decode(&mut &substrate_hash[..]).unwrap()) {
274						if let Ok(Some(hash)) = client.hash(number) {
275							transaction.push((
276								super::columns::BLOCK_MAPPING as u8,
277								ethereum_hash,
278								Some(vec![hash].encode()),
279							));
280							res.success += 1;
281							maybe_error = false;
282						}
283					}
284				}
285			}
286			if maybe_error {
287				res.error.push(H256::from_slice(ethereum_hash));
288			}
289		}
290		db.commit(transaction)
291			.map_err(|_| io::Error::other("Failed to commit on migrate_1_to_2"))?;
292		Ok(())
293	};
294
295	let mut db_cfg = parity_db::Options::with_columns(db_path, V2_NUM_COLUMNS as u8);
296	db_cfg.columns[super::columns::BLOCK_MAPPING as usize].btree_index = true;
297
298	let db = parity_db::Db::open_or_create(&db_cfg)
299		.map_err(|_| io::Error::other("Failed to open db"))?;
300
301	// Get all the block hashes we need to update
302	let ethereum_hashes: Vec<_> = match db.iter(super::columns::BLOCK_MAPPING as u8) {
303		Ok(mut iter) => {
304			let mut hashes = vec![];
305			while let Ok(Some((k, _))) = iter.next() {
306				hashes.push(k);
307			}
308			hashes
309		}
310		Err(_) => vec![],
311	};
312	// Read and update each entry in db transaction batches
313	const CHUNK_SIZE: usize = 10_000;
314	let chunks = ethereum_hashes.chunks(CHUNK_SIZE);
315	for chunk in chunks {
316		process_chunk(&db, chunk)?;
317	}
318	Ok(res)
319}
320
321#[cfg(test)]
322mod tests {
323	use std::{
324		io::{Read, Write},
325		sync::Arc,
326	};
327
328	use futures::executor;
329	use scale_codec::Encode;
330	use tempfile::tempdir;
331	// Substrate
332	use sc_block_builder::BlockBuilderBuilder;
333	use sp_blockchain::HeaderBackend;
334	use sp_consensus::BlockOrigin;
335	use sp_core::H256;
336	use sp_runtime::{
337		generic::{Block, Header},
338		traits::{BlakeTwo256, Block as BlockT, Header as HeaderT},
339	};
340	use substrate_test_runtime_client::{
341		prelude::*, DefaultTestClientBuilderExt, TestClientBuilder,
342	};
343
344	type OpaqueBlock =
345		Block<Header<u64, BlakeTwo256>, substrate_test_runtime_client::runtime::Extrinsic>;
346
347	pub fn open_frontier_backend<Block: BlockT, C: HeaderBackend<Block>>(
348		client: Arc<C>,
349		setting: &crate::kv::DatabaseSettings,
350	) -> Result<Arc<crate::kv::Backend<Block, C>>, String> {
351		Ok(Arc::new(crate::kv::Backend::<Block, C>::new(
352			client, setting,
353		)?))
354	}
355
356	#[cfg_attr(not(feature = "rocksdb"), ignore)]
357	#[test]
358	fn upgrade_1_to_2_works() {
359		let settings: Vec<crate::kv::DatabaseSettings> = vec![
360			// Rocks db
361			#[cfg(feature = "rocksdb")]
362			crate::kv::DatabaseSettings {
363				source: sc_client_db::DatabaseSource::RocksDb {
364					path: tempdir()
365						.expect("create a temporary directory")
366						.path()
367						.to_owned(),
368					cache_size: 0,
369				},
370			},
371			// Parity db
372			crate::kv::DatabaseSettings {
373				source: sc_client_db::DatabaseSource::ParityDb {
374					path: tempdir()
375						.expect("create a temporary directory")
376						.path()
377						.to_owned(),
378				},
379			},
380		];
381
382		for setting in settings {
383			let (client, _) = TestClientBuilder::new()
384				.build_with_native_executor::<substrate_test_runtime_client::runtime::RuntimeApi, _>(
385				None,
386			);
387			let client = Arc::new(client);
388
389			// Genesis block
390			let chain_info = client.chain_info();
391			let mut builder = BlockBuilderBuilder::new(&*client)
392				.on_parent_block(chain_info.best_hash)
393				.with_parent_block_number(chain_info.best_number)
394				.build()
395				.unwrap();
396			builder.push_storage_change(vec![1], None).unwrap();
397			let block = builder.build().unwrap().block;
398			let mut previous_canon_block_hash = block.header.hash();
399			let mut previous_canon_block_number = *block.header.number();
400			executor::block_on(client.import(BlockOrigin::Own, block)).unwrap();
401
402			let path = setting.source.path().unwrap();
403
404			let mut ethereum_hashes = vec![];
405			let mut substrate_hashes = vec![];
406			let mut transaction_hashes = vec![];
407			{
408				// Create a temporary frontier secondary DB.
409				let backend = open_frontier_backend::<OpaqueBlock, _>(client.clone(), &setting)
410					.expect("a temporary db was created");
411
412				// Fill the tmp db with some data
413				let mut transaction = sp_database::Transaction::new();
414				for _ in 0..50 {
415					// Ethereum hash
416					let ethhash = H256::random();
417					// Create two branches, and map the orphan one.
418					// Keep track of the canon hash to later verify the migration replaced it.
419					// A1
420					let mut builder = BlockBuilderBuilder::new(&*client)
421						.on_parent_block(previous_canon_block_hash)
422						.with_parent_block_number(previous_canon_block_number)
423						.build()
424						.unwrap();
425					builder.push_storage_change(vec![1], None).unwrap();
426					let block = builder.build().unwrap().block;
427					let next_canon_block_hash = block.header.hash();
428					let next_canon_block_number = *block.header.number();
429					executor::block_on(client.import(BlockOrigin::Own, block)).unwrap();
430					// A2
431					let mut builder = BlockBuilderBuilder::new(&*client)
432						.on_parent_block(previous_canon_block_hash)
433						.with_parent_block_number(previous_canon_block_number)
434						.build()
435						.unwrap();
436					builder.push_storage_change(vec![2], None).unwrap();
437					let block = builder.build().unwrap().block;
438					let orphan_block_hash = block.header.hash();
439					executor::block_on(client.import(BlockOrigin::Own, block)).unwrap();
440
441					// Track canon hash
442					ethereum_hashes.push(ethhash);
443					substrate_hashes.push(next_canon_block_hash);
444					// Set orphan hash block mapping
445					transaction.set(
446						crate::kv::columns::BLOCK_MAPPING,
447						&ethhash.encode(),
448						&orphan_block_hash.encode(),
449					);
450					// Test also that one-to-many transaction data is not affected by the migration logic.
451					// Map a transaction to both canon and orphan block hashes. This is what would have
452					// happened in case of fork or equivocation.
453					let eth_tx_hash = H256::random();
454					let mut metadata = vec![];
455					for hash in [next_canon_block_hash, orphan_block_hash] {
456						metadata.push(crate::kv::TransactionMetadata::<OpaqueBlock> {
457							substrate_block_hash: hash,
458							ethereum_block_hash: ethhash,
459							ethereum_index: 0u32,
460						});
461					}
462					transaction.set(
463						crate::kv::columns::TRANSACTION_MAPPING,
464						&eth_tx_hash.encode(),
465						&metadata.encode(),
466					);
467					transaction_hashes.push(eth_tx_hash);
468					previous_canon_block_hash = next_canon_block_hash;
469					previous_canon_block_number = next_canon_block_number;
470				}
471				let _ = backend.mapping().db.commit(transaction);
472			}
473
474			// Writes version 1 to file.
475			std::fs::create_dir_all(path).expect("db path created");
476			let mut version_path = path.to_owned();
477			version_path.push("db_version");
478			let mut version_file =
479				std::fs::File::create(version_path).expect("db version file path created");
480			version_file
481				.write_all(format!("{}", 1).as_bytes())
482				.expect("write version 1");
483
484			// Upgrade database from version 1 to 2
485			let _ = super::upgrade_db::<OpaqueBlock, _>(client.clone(), path, &setting.source);
486
487			// Check data after migration
488			let backend = open_frontier_backend::<OpaqueBlock, _>(client, &setting)
489				.expect("a temporary db was created");
490			for (i, original_ethereum_hash) in ethereum_hashes.iter().enumerate() {
491				let canon_substrate_block_hash = substrate_hashes.get(i).expect("Block hash");
492				let mapped_block = backend
493					.mapping()
494					.block_hash(original_ethereum_hash)
495					.unwrap()
496					.unwrap();
497				// All entries now hold a single element Vec
498				assert_eq!(mapped_block.len(), 1);
499				// The Vec holds the canon block hash
500				assert_eq!(mapped_block.first(), Some(canon_substrate_block_hash));
501				// Transaction hash still holds canon block data
502				let mapped_transaction = backend
503					.mapping()
504					.transaction_metadata(transaction_hashes.get(i).expect("Transaction hash"))
505					.unwrap();
506				assert!(mapped_transaction
507					.into_iter()
508					.any(|tx| tx.substrate_block_hash == *canon_substrate_block_hash));
509			}
510
511			// Upgrade db version file
512			assert_eq!(super::current_version(path).expect("version"), 2u32);
513		}
514	}
515
516	#[cfg(feature = "rocksdb")]
517	#[test]
518	fn create_db_with_current_version_works() {
519		let tmp = tempdir().expect("create a temporary directory");
520
521		let (client, _) = TestClientBuilder::new()
522			.build_with_native_executor::<substrate_test_runtime_client::runtime::RuntimeApi, _>(
523			None,
524		);
525		let client = Arc::new(client);
526
527		let setting = crate::kv::DatabaseSettings {
528			source: sc_client_db::DatabaseSource::RocksDb {
529				path: tmp.path().to_owned(),
530				cache_size: 0,
531			},
532		};
533		let path = setting.source.path().unwrap();
534		let _ = super::upgrade_db::<OpaqueBlock, _>(client, path, &setting.source);
535
536		let mut file =
537			std::fs::File::open(crate::kv::upgrade::version_file_path(path)).expect("file exist");
538
539		let mut s = String::new();
540		file.read_to_string(&mut s).expect("read file contents");
541		assert_eq!(s.parse::<u32>().expect("parse file contents"), 2u32);
542	}
543}