fc_db/sql/
mod.rs

1// This file is part of Frontier.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use std::{cmp::Ordering, collections::HashSet, num::NonZeroU32, str::FromStr, sync::Arc};
20
21use futures::TryStreamExt;
22use scale_codec::{Decode, Encode};
23use sqlx::{
24	query::Query,
25	sqlite::{
26		SqliteArguments, SqliteConnectOptions, SqlitePool, SqlitePoolOptions, SqliteQueryResult,
27	},
28	ConnectOptions, Error, Execute, QueryBuilder, Row, Sqlite,
29};
30// Substrate
31use sc_client_api::backend::{Backend as BackendT, StorageProvider};
32use sp_api::{ApiExt, ProvideRuntimeApi};
33use sp_blockchain::HeaderBackend;
34use sp_core::{H160, H256};
35use sp_runtime::{
36	generic::BlockId,
37	traits::{Block as BlockT, Header as HeaderT, UniqueSaturatedInto, Zero},
38};
39// Frontier
40use fc_api::{FilteredLog, TransactionMetadata};
41use fc_storage::{StorageOverride, StorageQuerier};
42use fp_consensus::{FindLogError, Hashes, Log as ConsensusLog, PostLog, PreLog};
43use fp_rpc::EthereumRuntimeRPCApi;
44use fp_storage::EthereumStorageSchema;
45
46/// Represents a log item.
47#[derive(Debug, Eq, PartialEq)]
48pub struct Log {
49	pub address: Vec<u8>,
50	pub topic_1: Option<Vec<u8>>,
51	pub topic_2: Option<Vec<u8>>,
52	pub topic_3: Option<Vec<u8>>,
53	pub topic_4: Option<Vec<u8>>,
54	pub log_index: i32,
55	pub transaction_index: i32,
56	pub substrate_block_hash: Vec<u8>,
57}
58
59/// Represents the block metadata.
60#[derive(Eq, PartialEq)]
61struct BlockMetadata {
62	pub substrate_block_hash: H256,
63	pub block_number: i32,
64	pub post_hashes: Hashes,
65	pub schema: EthereumStorageSchema,
66	pub is_canon: i32,
67}
68
69/// Represents the Sqlite connection options that are
70/// used to establish a database connection.
71#[derive(Debug)]
72pub struct SqliteBackendConfig<'a> {
73	pub path: &'a str,
74	pub create_if_missing: bool,
75	pub thread_count: u32,
76	pub cache_size: u64,
77}
78
79/// Represents the indexed status of a block and if it's canon or not.
80#[derive(Debug, Default)]
81pub struct BlockIndexedStatus {
82	pub indexed: bool,
83	pub canon: bool,
84}
85
86/// Represents the backend configurations.
87#[derive(Debug)]
88pub enum BackendConfig<'a> {
89	Sqlite(SqliteBackendConfig<'a>),
90}
91
92#[derive(Clone)]
93pub struct Backend<Block> {
94	/// The Sqlite connection.
95	pool: SqlitePool,
96	/// The additional overrides for the logs handler.
97	storage_override: Arc<dyn StorageOverride<Block>>,
98
99	/// The number of allowed operations for the Sqlite filter call.
100	/// A value of `0` disables the timeout.
101	num_ops_timeout: i32,
102}
103
104impl<Block> Backend<Block>
105where
106	Block: BlockT<Hash = H256>,
107{
108	/// Creates a new instance of the SQL backend.
109	pub async fn new(
110		config: BackendConfig<'_>,
111		pool_size: u32,
112		num_ops_timeout: Option<NonZeroU32>,
113		storage_override: Arc<dyn StorageOverride<Block>>,
114	) -> Result<Self, Error> {
115		let any_pool = SqlitePoolOptions::new()
116			.max_connections(pool_size)
117			.connect_lazy_with(Self::connect_options(&config)?.disable_statement_logging());
118		let _ = Self::create_database_if_not_exists(&any_pool).await?;
119		let _ = Self::create_indexes_if_not_exist(&any_pool).await?;
120		Ok(Self {
121			pool: any_pool,
122			storage_override,
123			num_ops_timeout: num_ops_timeout
124				.map(|n| n.get())
125				.unwrap_or(0)
126				.try_into()
127				.unwrap_or(i32::MAX),
128		})
129	}
130
131	fn connect_options(config: &BackendConfig) -> Result<SqliteConnectOptions, Error> {
132		match config {
133			BackendConfig::Sqlite(config) => {
134				log::info!(target: "frontier-sql", "📑 Connection configuration: {config:?}");
135				let config = sqlx::sqlite::SqliteConnectOptions::from_str(config.path)?
136					.create_if_missing(config.create_if_missing)
137					// https://www.sqlite.org/pragma.html#pragma_busy_timeout
138					.busy_timeout(std::time::Duration::from_secs(8))
139					// 200MB, https://www.sqlite.org/pragma.html#pragma_cache_size
140					.pragma("cache_size", format!("-{}", config.cache_size))
141					// https://www.sqlite.org/pragma.html#pragma_analysis_limit
142					.pragma("analysis_limit", "1000")
143					// https://www.sqlite.org/pragma.html#pragma_threads
144					.pragma("threads", config.thread_count.to_string())
145					// https://www.sqlite.org/pragma.html#pragma_threads
146					.pragma("temp_store", "memory")
147					// https://www.sqlite.org/wal.html
148					.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
149					// https://www.sqlite.org/pragma.html#pragma_synchronous
150					.synchronous(sqlx::sqlite::SqliteSynchronous::Normal);
151				Ok(config)
152			}
153		}
154	}
155
156	/// Get the underlying Sqlite pool.
157	pub fn pool(&self) -> &SqlitePool {
158		&self.pool
159	}
160
161	/// Canonicalize the indexed blocks, marking/demarking them as canon based on the
162	/// provided `retracted` and `enacted` values.
163	pub async fn canonicalize(&self, retracted: &[H256], enacted: &[H256]) -> Result<(), Error> {
164		let mut tx = self.pool().begin().await?;
165
166		// Retracted
167		let mut builder: QueryBuilder<Sqlite> =
168			QueryBuilder::new("UPDATE blocks SET is_canon = 0 WHERE substrate_block_hash IN (");
169		let mut retracted_hashes = builder.separated(", ");
170		for hash in retracted.iter() {
171			let hash = hash.as_bytes();
172			retracted_hashes.push_bind(hash);
173		}
174		retracted_hashes.push_unseparated(")");
175		let query = builder.build();
176		query.execute(&mut *tx).await?;
177
178		// Enacted
179		let mut builder: QueryBuilder<Sqlite> =
180			QueryBuilder::new("UPDATE blocks SET is_canon = 1 WHERE substrate_block_hash IN (");
181		let mut enacted_hashes = builder.separated(", ");
182		for hash in enacted.iter() {
183			let hash = hash.as_bytes();
184			enacted_hashes.push_bind(hash);
185		}
186		enacted_hashes.push_unseparated(")");
187		let query = builder.build();
188		query.execute(&mut *tx).await?;
189
190		tx.commit().await
191	}
192
193	/// Index the block metadata for the genesis block.
194	pub async fn insert_genesis_block_metadata<Client, BE>(
195		&self,
196		client: Arc<Client>,
197	) -> Result<Option<H256>, Error>
198	where
199		Client: StorageProvider<Block, BE> + HeaderBackend<Block> + 'static,
200		Client: ProvideRuntimeApi<Block>,
201		Client::Api: EthereumRuntimeRPCApi<Block>,
202		BE: BackendT<Block> + 'static,
203	{
204		let id = BlockId::Number(Zero::zero());
205		let substrate_genesis_hash = client
206			.expect_block_hash_from_id(&id)
207			.map_err(|_| Error::Protocol("Cannot resolve genesis hash".to_string()))?;
208		let maybe_substrate_hash: Option<H256> = if let Ok(Some(_)) =
209			client.header(substrate_genesis_hash)
210		{
211			let has_api = client
212				.runtime_api()
213				.has_api_with::<dyn EthereumRuntimeRPCApi<Block>, _>(
214					substrate_genesis_hash,
215					|version| version >= 1,
216				)
217				.expect("runtime api reachable");
218
219			log::debug!(target: "frontier-sql", "Index genesis block, has_api={has_api}, hash={substrate_genesis_hash:?}");
220
221			if has_api {
222				// The chain has frontier support from genesis.
223				// Read from the runtime and store the block metadata.
224				let ethereum_block = client
225					.runtime_api()
226					.current_block(substrate_genesis_hash)
227					.expect("runtime api reachable")
228					.expect("ethereum genesis block");
229
230				let schema = StorageQuerier::new(client)
231					.storage_schema(substrate_genesis_hash)
232					.unwrap_or(EthereumStorageSchema::V3)
233					.encode();
234				let ethereum_block_hash = ethereum_block.header.hash().as_bytes().to_owned();
235				let substrate_block_hash = substrate_genesis_hash.as_bytes();
236				let block_number = 0i32;
237				let is_canon = 1i32;
238
239				let _ = sqlx::query(
240					"INSERT OR IGNORE INTO blocks(
241						ethereum_block_hash,
242						substrate_block_hash,
243						block_number,
244						ethereum_storage_schema,
245						is_canon)
246					VALUES (?, ?, ?, ?, ?)",
247				)
248				.bind(ethereum_block_hash)
249				.bind(substrate_block_hash)
250				.bind(block_number)
251				.bind(schema)
252				.bind(is_canon)
253				.execute(self.pool())
254				.await?;
255			}
256			Some(substrate_genesis_hash)
257		} else {
258			None
259		};
260		Ok(maybe_substrate_hash)
261	}
262
263	fn insert_block_metadata_inner<Client, BE>(
264		client: Arc<Client>,
265		hash: H256,
266		storage_override: &dyn StorageOverride<Block>,
267	) -> Result<BlockMetadata, Error>
268	where
269		Client: StorageProvider<Block, BE> + HeaderBackend<Block> + 'static,
270		BE: BackendT<Block> + 'static,
271	{
272		log::trace!(target: "frontier-sql", "🛠️  [Metadata] Retrieving digest data for block {hash:?}");
273		if let Ok(Some(header)) = client.header(hash) {
274			match fp_consensus::find_log(header.digest()) {
275				Ok(log) => {
276					let schema = StorageQuerier::new(client.clone())
277						.storage_schema(hash)
278						.unwrap_or(EthereumStorageSchema::V3);
279					let log_hashes = match log {
280						ConsensusLog::Post(PostLog::Hashes(post_hashes)) => post_hashes,
281						ConsensusLog::Post(PostLog::Block(block)) => Hashes::from_block(block),
282						ConsensusLog::Post(PostLog::BlockHash(expect_eth_block_hash)) => {
283							let ethereum_block = storage_override.current_block(hash);
284							match ethereum_block {
285								Some(block) => {
286									let got_eth_block_hash = block.header.hash();
287									if got_eth_block_hash != expect_eth_block_hash {
288										return Err(Error::Protocol(format!(
289											"Ethereum block hash mismatch: \
290											frontier consensus digest ({expect_eth_block_hash:?}), \
291											db state ({got_eth_block_hash:?})"
292										)));
293									} else {
294										Hashes::from_block(block)
295									}
296								}
297								None => {
298									return Err(Error::Protocol(format!(
299										"Missing ethereum block for hash mismatch {expect_eth_block_hash:?}"
300									)))
301								}
302							}
303						}
304						ConsensusLog::Pre(PreLog::Block(block)) => Hashes::from_block(block),
305					};
306
307					let header_number = *header.number();
308					let block_number =
309						UniqueSaturatedInto::<u32>::unique_saturated_into(header_number) as i32;
310					let is_canon = match client.hash(header_number) {
311						Ok(Some(inner_hash)) => (inner_hash == hash) as i32,
312						Ok(None) => {
313							log::debug!(target: "frontier-sql", "[Metadata] Missing header for block #{block_number} ({hash:?})");
314							0
315						}
316						Err(err) => {
317							log::debug!(
318								target: "frontier-sql",
319								"[Metadata] Failed to retrieve header for block #{block_number} ({hash:?}): {err:?}",
320							);
321							0
322						}
323					};
324
325					log::trace!(
326						target: "frontier-sql",
327						"[Metadata] Prepared block metadata for #{block_number} ({hash:?}) canon={is_canon}",
328					);
329					Ok(BlockMetadata {
330						substrate_block_hash: hash,
331						block_number,
332						post_hashes: log_hashes,
333						schema,
334						is_canon,
335					})
336				}
337				Err(FindLogError::NotFound) => Err(Error::Protocol(format!(
338					"[Metadata] No logs found for hash {hash:?}",
339				))),
340				Err(FindLogError::MultipleLogs) => Err(Error::Protocol(format!(
341					"[Metadata] Multiple logs found for hash {hash:?}",
342				))),
343			}
344		} else {
345			Err(Error::Protocol(format!(
346				"[Metadata] Failed retrieving header for hash {hash:?}"
347			)))
348		}
349	}
350
351	/// Insert the block metadata for the provided block hashes.
352	pub async fn insert_block_metadata<Client, BE>(
353		&self,
354		client: Arc<Client>,
355		hash: H256,
356	) -> Result<(), Error>
357	where
358		Client: StorageProvider<Block, BE> + HeaderBackend<Block> + 'static,
359		BE: BackendT<Block> + 'static,
360	{
361		// Spawn a blocking task to get block metadata from substrate backend.
362		let storage_override = self.storage_override.clone();
363		let metadata = tokio::task::spawn_blocking(move || {
364			Self::insert_block_metadata_inner(client.clone(), hash, &*storage_override)
365		})
366		.await
367		.map_err(|_| Error::Protocol("tokio blocking metadata task failed".to_string()))??;
368
369		let mut tx = self.pool().begin().await?;
370
371		log::debug!(
372			target: "frontier-sql",
373			"🛠️  [Metadata] Starting execution of statements on db transaction"
374		);
375		let post_hashes = metadata.post_hashes;
376		let ethereum_block_hash = post_hashes.block_hash.as_bytes();
377		let substrate_block_hash = metadata.substrate_block_hash.as_bytes();
378		let schema = metadata.schema.encode();
379		let block_number = metadata.block_number;
380		let is_canon = metadata.is_canon;
381
382		let _ = sqlx::query(
383			"INSERT OR IGNORE INTO blocks(
384					ethereum_block_hash,
385					substrate_block_hash,
386					block_number,
387					ethereum_storage_schema,
388					is_canon)
389				VALUES (?, ?, ?, ?, ?)",
390		)
391		.bind(ethereum_block_hash)
392		.bind(substrate_block_hash)
393		.bind(block_number)
394		.bind(schema)
395		.bind(is_canon)
396		.execute(&mut *tx)
397		.await?;
398		for (i, &transaction_hash) in post_hashes.transaction_hashes.iter().enumerate() {
399			let ethereum_transaction_hash = transaction_hash.as_bytes();
400			let ethereum_transaction_index = i as i32;
401			log::trace!(
402				target: "frontier-sql",
403				"[Metadata] Inserting TX for block #{block_number} - {transaction_hash:?} index {ethereum_transaction_index}",
404			);
405			let _ = sqlx::query(
406				"INSERT OR IGNORE INTO transactions(
407						ethereum_transaction_hash,
408						substrate_block_hash,
409						ethereum_block_hash,
410						ethereum_transaction_index)
411					VALUES (?, ?, ?, ?)",
412			)
413			.bind(ethereum_transaction_hash)
414			.bind(substrate_block_hash)
415			.bind(ethereum_block_hash)
416			.bind(ethereum_transaction_index)
417			.execute(&mut *tx)
418			.await?;
419		}
420
421		sqlx::query("INSERT INTO sync_status(substrate_block_hash) VALUES (?)")
422			.bind(hash.as_bytes())
423			.execute(&mut *tx)
424			.await?;
425
426		log::debug!(target: "frontier-sql", "[Metadata] Ready to commit");
427		tx.commit().await
428	}
429
430	/// Index the logs for the newly indexed blocks upto a `max_pending_blocks` value.
431	pub async fn index_block_logs(&self, block_hash: Block::Hash) {
432		let pool = self.pool().clone();
433		let storage_override = self.storage_override.clone();
434		let _ = async {
435			// The overarching db transaction for the task.
436			// Due to the async nature of this task, the same work is likely to happen
437			// more than once. For example when a new batch is scheduled when the previous one
438			// didn't finished yet and the new batch happens to select the same substrate
439			// block hashes for the update.
440			// That is expected, we are exchanging extra work for *acid*ity.
441			// There is no case of unique constrain violation or race condition as already
442			// existing entries are ignored.
443			let mut tx = pool.begin().await?;
444			// Update statement returning the substrate block hashes for this batch.
445			match sqlx::query(
446				"UPDATE sync_status
447			SET status = 1
448			WHERE substrate_block_hash IN
449				(SELECT substrate_block_hash
450				FROM sync_status
451				WHERE status = 0 AND substrate_block_hash = ?) RETURNING substrate_block_hash",
452			)
453			.bind(block_hash.as_bytes())
454			.fetch_one(&mut *tx)
455			.await
456			{
457				Ok(_) => {
458					// Spawn a blocking task to get log data from substrate backend.
459					let logs = tokio::task::spawn_blocking(move || {
460						Self::get_logs(storage_override, block_hash)
461					})
462					.await
463					.map_err(|_| Error::Protocol("tokio blocking task failed".to_string()))?;
464
465					for log in logs {
466						let _ = sqlx::query(
467							"INSERT OR IGNORE INTO logs(
468						address,
469						topic_1,
470						topic_2,
471						topic_3,
472						topic_4,
473						log_index,
474						transaction_index,
475						substrate_block_hash)
476					VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
477						)
478						.bind(log.address)
479						.bind(log.topic_1)
480						.bind(log.topic_2)
481						.bind(log.topic_3)
482						.bind(log.topic_4)
483						.bind(log.log_index)
484						.bind(log.transaction_index)
485						.bind(log.substrate_block_hash)
486						.execute(&mut *tx)
487						.await?;
488					}
489					Ok(tx.commit().await?)
490				}
491				Err(e) => Err(e),
492			}
493		}
494		.await
495		.map_err(|e| {
496			log::error!(target: "frontier-sql", "{e}");
497		});
498		// https://www.sqlite.org/pragma.html#pragma_optimize
499		let _ = sqlx::query("PRAGMA optimize").execute(&pool).await;
500		log::debug!(target: "frontier-sql", "Batch committed");
501	}
502
503	fn get_logs(
504		storage_override: Arc<dyn StorageOverride<Block>>,
505		substrate_block_hash: H256,
506	) -> Vec<Log> {
507		let mut logs: Vec<Log> = vec![];
508		let mut transaction_count: usize = 0;
509		let mut log_count: usize = 0;
510		let receipts = storage_override
511			.current_receipts(substrate_block_hash)
512			.unwrap_or_default();
513
514		transaction_count += receipts.len();
515		for (transaction_index, receipt) in receipts.iter().enumerate() {
516			let receipt_logs = match receipt {
517				ethereum::ReceiptV4::Legacy(d)
518				| ethereum::ReceiptV4::EIP2930(d)
519				| ethereum::ReceiptV4::EIP1559(d)
520				| ethereum::ReceiptV4::EIP7702(d) => &d.logs,
521			};
522			let transaction_index = transaction_index as i32;
523			log_count += receipt_logs.len();
524			for (log_index, log) in receipt_logs.iter().enumerate() {
525				#[allow(clippy::get_first)]
526				logs.push(Log {
527					address: log.address.as_bytes().to_owned(),
528					topic_1: log.topics.get(0).map(|l| l.as_bytes().to_owned()),
529					topic_2: log.topics.get(1).map(|l| l.as_bytes().to_owned()),
530					topic_3: log.topics.get(2).map(|l| l.as_bytes().to_owned()),
531					topic_4: log.topics.get(3).map(|l| l.as_bytes().to_owned()),
532					log_index: log_index as i32,
533					transaction_index,
534					substrate_block_hash: substrate_block_hash.as_bytes().to_owned(),
535				});
536			}
537		}
538		log::debug!(
539			target: "frontier-sql",
540			"Ready to commit {log_count} logs from {transaction_count} transactions"
541		);
542		logs
543	}
544
545	/// Retrieves the status if a block has been already indexed.
546	pub async fn is_block_indexed(&self, block_hash: Block::Hash) -> bool {
547		sqlx::query("SELECT substrate_block_hash FROM sync_status WHERE substrate_block_hash = ?")
548			.bind(block_hash.as_bytes().to_owned())
549			.fetch_optional(self.pool())
550			.await
551			.map(|r| r.is_some())
552			.unwrap_or(false)
553	}
554
555	/// Retrieves the status if a block is indexed and if also marked as canon.
556	pub async fn block_indexed_and_canon_status(
557		&self,
558		block_hash: Block::Hash,
559	) -> BlockIndexedStatus {
560		sqlx::query(
561			"SELECT b.is_canon FROM sync_status AS s
562			INNER JOIN blocks AS b
563			ON s.substrate_block_hash = b.substrate_block_hash
564			WHERE s.substrate_block_hash = ?",
565		)
566		.bind(block_hash.as_bytes().to_owned())
567		.fetch_optional(self.pool())
568		.await
569		.map(|result| {
570			result
571				.map(|row| {
572					let is_canon: i32 = row.get(0);
573					BlockIndexedStatus {
574						indexed: true,
575						canon: is_canon != 0,
576					}
577				})
578				.unwrap_or_default()
579		})
580		.unwrap_or_default()
581	}
582
583	/// Sets the provided block as canon.
584	pub async fn set_block_as_canon(&self, block_hash: H256) -> Result<SqliteQueryResult, Error> {
585		sqlx::query("UPDATE blocks SET is_canon = 1 WHERE substrate_block_hash = ?")
586			.bind(block_hash.as_bytes())
587			.execute(self.pool())
588			.await
589	}
590
591	/// Retrieves the first missing canonical block number in decreasing order that hasn't been indexed yet.
592	/// If no unindexed block exists or the table or the rows do not exist, then the function
593	/// returns `None`.
594	pub async fn get_first_missing_canon_block(&self) -> Option<u32> {
595		match sqlx::query(
596			"SELECT b1.block_number-1
597			FROM blocks as b1
598			WHERE b1.block_number > 0 AND b1.is_canon=1 AND NOT EXISTS (
599				SELECT 1 FROM blocks AS b2
600				WHERE b2.block_number = b1.block_number-1
601				AND b1.is_canon=1
602				AND b2.is_canon=1
603			)
604			ORDER BY block_number LIMIT 1",
605		)
606		.fetch_optional(self.pool())
607		.await
608		{
609			Ok(result) => {
610				if let Some(row) = result {
611					let block_number: u32 = row.get(0);
612					return Some(block_number);
613				}
614			}
615			Err(err) => {
616				log::debug!(target: "frontier-sql", "Failed retrieving missing block {err:?}");
617			}
618		}
619
620		None
621	}
622
623	/// Retrieves the first pending canonical block hash in decreasing order that hasn't had
624	// its logs indexed yet. If no unindexed block exists or the table or the rows do not exist,
625	/// then the function returns `None`.
626	pub async fn get_first_pending_canon_block(&self) -> Option<H256> {
627		match sqlx::query(
628			"SELECT s.substrate_block_hash FROM sync_status AS s
629			INNER JOIN blocks as b
630			ON s.substrate_block_hash = b.substrate_block_hash
631			WHERE b.is_canon = 1 AND s.status = 0
632			ORDER BY b.block_number LIMIT 1",
633		)
634		.fetch_optional(self.pool())
635		.await
636		{
637			Ok(result) => {
638				if let Some(row) = result {
639					let block_hash_bytes: Vec<u8> = row.get(0);
640					let block_hash = H256::from_slice(&block_hash_bytes[..]);
641					return Some(block_hash);
642				}
643			}
644			Err(err) => {
645				log::debug!(target: "frontier-sql", "Failed retrieving missing block {err:?}");
646			}
647		}
648
649		None
650	}
651
652	/// Retrieve the block hash for the last indexed canon block.
653	pub async fn last_indexed_canon_block(&self) -> Result<H256, Error> {
654		let row = sqlx::query(
655			"SELECT b.substrate_block_hash FROM blocks AS b
656			INNER JOIN sync_status AS s
657			ON s.substrate_block_hash = b.substrate_block_hash
658			WHERE b.is_canon=1 AND s.status = 1
659			ORDER BY b.id DESC LIMIT 1",
660		)
661		.fetch_one(self.pool())
662		.await?;
663		Ok(H256::from_slice(
664			&row.try_get::<Vec<u8>, _>(0).unwrap_or_default()[..],
665		))
666	}
667
668	/// Create the Sqlite database if it does not already exist.
669	async fn create_database_if_not_exists(pool: &SqlitePool) -> Result<SqliteQueryResult, Error> {
670		sqlx::query(
671			"BEGIN;
672			CREATE TABLE IF NOT EXISTS logs (
673				id INTEGER PRIMARY KEY,
674				address BLOB NOT NULL,
675				topic_1 BLOB,
676				topic_2 BLOB,
677				topic_3 BLOB,
678				topic_4 BLOB,
679				log_index INTEGER NOT NULL,
680				transaction_index INTEGER NOT NULL,
681				substrate_block_hash BLOB NOT NULL,
682				UNIQUE (
683					log_index,
684					transaction_index,
685					substrate_block_hash
686				)
687			);
688			CREATE TABLE IF NOT EXISTS sync_status (
689				id INTEGER PRIMARY KEY,
690				substrate_block_hash BLOB NOT NULL,
691				status INTEGER DEFAULT 0 NOT NULL,
692				UNIQUE (
693					substrate_block_hash
694				)
695			);
696			CREATE TABLE IF NOT EXISTS blocks (
697				id INTEGER PRIMARY KEY,
698				block_number INTEGER NOT NULL,
699				ethereum_block_hash BLOB NOT NULL,
700				substrate_block_hash BLOB NOT NULL,
701				ethereum_storage_schema BLOB NOT NULL,
702				is_canon INTEGER NOT NULL,
703				UNIQUE (
704					ethereum_block_hash,
705					substrate_block_hash
706				)
707			);
708			CREATE TABLE IF NOT EXISTS transactions (
709				id INTEGER PRIMARY KEY,
710				ethereum_transaction_hash BLOB NOT NULL,
711				substrate_block_hash BLOB NOT NULL,
712				ethereum_block_hash BLOB NOT NULL,
713				ethereum_transaction_index INTEGER NOT NULL,
714				UNIQUE (
715					ethereum_transaction_hash,
716					substrate_block_hash
717				)
718			);
719			COMMIT;",
720		)
721		.execute(pool)
722		.await
723	}
724
725	/// Create the Sqlite database indices if it does not already exist.
726	async fn create_indexes_if_not_exist(pool: &SqlitePool) -> Result<SqliteQueryResult, Error> {
727		sqlx::query(
728			"BEGIN;
729			CREATE INDEX IF NOT EXISTS logs_main_idx ON logs (
730				address,
731				topic_1,
732				topic_2,
733				topic_3,
734				topic_4
735			);
736			CREATE INDEX IF NOT EXISTS logs_substrate_index ON logs (
737				substrate_block_hash
738			);
739			CREATE INDEX IF NOT EXISTS blocks_number_index ON blocks (
740				block_number
741			);
742			CREATE INDEX IF NOT EXISTS blocks_substrate_index ON blocks (
743				substrate_block_hash
744			);
745			CREATE INDEX IF NOT EXISTS eth_block_hash_idx ON blocks (
746				ethereum_block_hash
747			);
748			CREATE INDEX IF NOT EXISTS eth_tx_hash_idx ON transactions (
749				ethereum_transaction_hash
750			);
751			CREATE INDEX IF NOT EXISTS eth_tx_hash_2_idx ON transactions (
752				ethereum_block_hash,
753				ethereum_transaction_index
754			);
755			COMMIT;",
756		)
757		.execute(pool)
758		.await
759	}
760}
761
762#[async_trait::async_trait]
763impl<Block: BlockT<Hash = H256>> fc_api::Backend<Block> for Backend<Block> {
764	async fn block_hash(
765		&self,
766		ethereum_block_hash: &H256,
767	) -> Result<Option<Vec<Block::Hash>>, String> {
768		let ethereum_block_hash = ethereum_block_hash.as_bytes();
769		let res =
770			sqlx::query("SELECT substrate_block_hash FROM blocks WHERE ethereum_block_hash = ?")
771				.bind(ethereum_block_hash)
772				.fetch_all(&self.pool)
773				.await
774				.ok()
775				.map(|rows| {
776					rows.iter()
777						.map(|row| {
778							H256::from_slice(&row.try_get::<Vec<u8>, _>(0).unwrap_or_default()[..])
779						})
780						.collect()
781				});
782		Ok(res)
783	}
784
785	async fn transaction_metadata(
786		&self,
787		ethereum_transaction_hash: &H256,
788	) -> Result<Vec<TransactionMetadata<Block>>, String> {
789		let ethereum_transaction_hash = ethereum_transaction_hash.as_bytes();
790		let out = sqlx::query(
791			"SELECT
792				substrate_block_hash, ethereum_block_hash, ethereum_transaction_index
793			FROM transactions WHERE ethereum_transaction_hash = ?",
794		)
795		.bind(ethereum_transaction_hash)
796		.fetch_all(&self.pool)
797		.await
798		.unwrap_or_default()
799		.iter()
800		.map(|row| {
801			let substrate_block_hash =
802				H256::from_slice(&row.try_get::<Vec<u8>, _>(0).unwrap_or_default()[..]);
803			let ethereum_block_hash =
804				H256::from_slice(&row.try_get::<Vec<u8>, _>(1).unwrap_or_default()[..]);
805			let ethereum_transaction_index = row.try_get::<i32, _>(2).unwrap_or_default() as u32;
806			TransactionMetadata {
807				substrate_block_hash,
808				ethereum_block_hash,
809				ethereum_index: ethereum_transaction_index,
810			}
811		})
812		.collect();
813
814		Ok(out)
815	}
816
817	fn log_indexer(&self) -> &dyn fc_api::LogIndexerBackend<Block> {
818		self
819	}
820
821	async fn first_block_hash(&self) -> Result<Block::Hash, String> {
822		// Retrieves the block hash for the earliest indexed block, maybe it's not canon.
823		sqlx::query("SELECT substrate_block_hash FROM blocks ORDER BY block_number ASC LIMIT 1")
824			.fetch_one(self.pool())
825			.await
826			.map(|row| H256::from_slice(&row.get::<Vec<u8>, _>(0)[..]))
827			.map_err(|e| format!("Failed to fetch oldest block hash: {e}"))
828	}
829
830	async fn latest_block_hash(&self) -> Result<Block::Hash, String> {
831		// Retrieves the block hash for the latest indexed block, maybe it's not canon.
832		sqlx::query("SELECT substrate_block_hash FROM blocks ORDER BY block_number DESC LIMIT 1")
833			.fetch_one(self.pool())
834			.await
835			.map(|row| H256::from_slice(&row.get::<Vec<u8>, _>(0)[..]))
836			.map_err(|e| format!("Failed to fetch best hash: {e}"))
837	}
838}
839
840#[async_trait::async_trait]
841impl<Block: BlockT<Hash = H256>> fc_api::LogIndexerBackend<Block> for Backend<Block> {
842	fn is_indexed(&self) -> bool {
843		true
844	}
845
846	async fn filter_logs(
847		&self,
848		from_block: u64,
849		to_block: u64,
850		addresses: Vec<H160>,
851		topics: Vec<Vec<H256>>,
852	) -> Result<Vec<FilteredLog<Block>>, String> {
853		let mut unique_topics: [HashSet<H256>; 4] = [
854			HashSet::new(),
855			HashSet::new(),
856			HashSet::new(),
857			HashSet::new(),
858		];
859		for (topic_index, topic_options) in topics.into_iter().enumerate() {
860			unique_topics[topic_index].extend(topic_options);
861		}
862
863		let log_key = format!("{from_block}-{to_block}-{addresses:?}-{unique_topics:?}");
864		let mut qb = QueryBuilder::new("");
865		let query = build_query(&mut qb, from_block, to_block, addresses, unique_topics);
866		let sql = query.sql();
867
868		let mut conn = self
869			.pool()
870			.acquire()
871			.await
872			.map_err(|err| format!("failed acquiring sqlite connection: {err}"))?;
873		let log_key2 = log_key.clone();
874		conn.lock_handle()
875			.await
876			.map_err(|err| format!("{err:?}"))?
877			.set_progress_handler(self.num_ops_timeout, move || {
878				log::debug!(target: "frontier-sql", "Sqlite progress_handler triggered for {log_key2}");
879				false
880			});
881		log::debug!(target: "frontier-sql", "Query: {sql:?} - {log_key}");
882
883		let mut out: Vec<FilteredLog<Block>> = vec![];
884		let mut rows = query.fetch(&mut *conn);
885		let maybe_err = loop {
886			match rows.try_next().await {
887				Ok(Some(row)) => {
888					// Substrate block hash
889					let substrate_block_hash =
890						H256::from_slice(&row.try_get::<Vec<u8>, _>(0).unwrap_or_default()[..]);
891					// Ethereum block hash
892					let ethereum_block_hash =
893						H256::from_slice(&row.try_get::<Vec<u8>, _>(1).unwrap_or_default()[..]);
894					// Block number
895					let block_number = row.try_get::<i32, _>(2).unwrap_or_default() as u32;
896					// Ethereum storage schema
897					let ethereum_storage_schema: EthereumStorageSchema =
898						Decode::decode(&mut &row.try_get::<Vec<u8>, _>(3).unwrap_or_default()[..])
899							.map_err(|_| {
900								"Cannot decode EthereumStorageSchema for block".to_string()
901							})?;
902					// Transaction index
903					let transaction_index = row.try_get::<i32, _>(4).unwrap_or_default() as u32;
904					// Log index
905					let log_index = row.try_get::<i32, _>(5).unwrap_or_default() as u32;
906					out.push(FilteredLog {
907						substrate_block_hash,
908						ethereum_block_hash,
909						block_number,
910						ethereum_storage_schema,
911						transaction_index,
912						log_index,
913					});
914				}
915				Ok(None) => break None, // no more rows
916				Err(err) => break Some(err),
917			};
918		};
919		drop(rows);
920		conn.lock_handle()
921			.await
922			.map_err(|err| format!("{err:?}"))?
923			.remove_progress_handler();
924
925		if let Some(err) = maybe_err {
926			log::error!(target: "frontier-sql", "Failed to query sql db: {err:?} - {log_key}");
927			return Err("Failed to query sql db with statement".to_string());
928		}
929
930		log::info!(target: "frontier-sql", "FILTER remove handler - {log_key}");
931		Ok(out)
932	}
933}
934
935/// Build a SQL query to retrieve a list of logs given certain constraints.
936fn build_query<'a>(
937	qb: &'a mut QueryBuilder<Sqlite>,
938	from_block: u64,
939	to_block: u64,
940	addresses: Vec<H160>,
941	topics: [HashSet<H256>; 4],
942) -> Query<'a, Sqlite, SqliteArguments<'a>> {
943	qb.push(
944		"
945SELECT
946	l.substrate_block_hash,
947	b.ethereum_block_hash,
948	b.block_number,
949	b.ethereum_storage_schema,
950	l.transaction_index,
951	l.log_index
952FROM logs AS l
953INNER JOIN blocks AS b
954ON (b.block_number BETWEEN ",
955	);
956	qb.separated(" AND ")
957		.push_bind(from_block as i64)
958		.push_bind(to_block as i64)
959		.push_unseparated(")");
960	qb.push(" AND b.substrate_block_hash = l.substrate_block_hash")
961		.push(" AND b.is_canon = 1")
962		.push("\nWHERE 1");
963
964	if !addresses.is_empty() {
965		qb.push(" AND l.address IN (");
966		let mut qb_addr = qb.separated(", ");
967		addresses.iter().for_each(|addr| {
968			qb_addr.push_bind(addr.as_bytes().to_owned());
969		});
970		qb_addr.push_unseparated(")");
971	}
972
973	for (i, topic_options) in topics.iter().enumerate() {
974		match topic_options.len().cmp(&1) {
975			Ordering::Greater => {
976				qb.push(format!(" AND l.topic_{} IN (", i + 1));
977				let mut qb_topic = qb.separated(", ");
978				topic_options.iter().for_each(|t| {
979					qb_topic.push_bind(t.as_bytes().to_owned());
980				});
981				qb_topic.push_unseparated(")");
982			}
983			Ordering::Equal => {
984				qb.push(format!(" AND l.topic_{} = ", i + 1)).push_bind(
985					topic_options
986						.iter()
987						.next()
988						.expect("length is 1, must exist; qed")
989						.as_bytes()
990						.to_owned(),
991				);
992			}
993			Ordering::Less => {}
994		}
995	}
996
997	qb.push(
998		"
999ORDER BY b.block_number ASC, l.transaction_index ASC, l.log_index ASC
1000LIMIT 10001",
1001	);
1002
1003	qb.build()
1004}
1005
1006#[cfg(test)]
1007mod test {
1008	use super::*;
1009
1010	use std::path::Path;
1011
1012	use maplit::hashset;
1013	use scale_codec::Encode;
1014	use sqlx::{sqlite::SqliteRow, QueryBuilder, Row, SqlitePool};
1015	use tempfile::tempdir;
1016	// Substrate
1017	use sp_core::{H160, H256};
1018	use sp_runtime::{
1019		generic::{Block, Header},
1020		traits::BlakeTwo256,
1021	};
1022	use substrate_test_runtime_client::{
1023		DefaultTestClientBuilderExt, TestClientBuilder, TestClientBuilderExt,
1024	};
1025	// Frontier
1026	use fc_api::Backend as BackendT;
1027	use fc_storage::SchemaV3StorageOverride;
1028	use fp_storage::{EthereumStorageSchema, PALLET_ETHEREUM_SCHEMA};
1029
1030	type OpaqueBlock =
1031		Block<Header<u64, BlakeTwo256>, substrate_test_runtime_client::runtime::Extrinsic>;
1032
1033	struct TestFilter {
1034		pub from_block: u64,
1035		pub to_block: u64,
1036		pub addresses: Vec<H160>,
1037		pub topics: Vec<Vec<H256>>,
1038		pub expected_result: Vec<FilteredLog<OpaqueBlock>>,
1039	}
1040
1041	#[derive(Debug, Clone)]
1042	struct Log {
1043		block_number: u32,
1044		address: H160,
1045		topics: [H256; 4],
1046		substrate_block_hash: H256,
1047		ethereum_block_hash: H256,
1048		transaction_index: u32,
1049		log_index: u32,
1050	}
1051
1052	#[allow(unused)]
1053	struct TestData {
1054		backend: Backend<OpaqueBlock>,
1055		alice: H160,
1056		bob: H160,
1057		topics_a: H256,
1058		topics_b: H256,
1059		topics_c: H256,
1060		topics_d: H256,
1061		substrate_hash_1: H256,
1062		substrate_hash_2: H256,
1063		substrate_hash_3: H256,
1064		ethereum_hash_1: H256,
1065		ethereum_hash_2: H256,
1066		ethereum_hash_3: H256,
1067		log_1_abcd_0_0_alice: Log,
1068		log_1_dcba_1_0_alice: Log,
1069		log_1_badc_2_0_alice: Log,
1070		log_2_abcd_0_0_bob: Log,
1071		log_2_dcba_1_0_bob: Log,
1072		log_2_badc_2_0_bob: Log,
1073		log_3_abcd_0_0_bob: Log,
1074		log_3_dcba_1_0_bob: Log,
1075		log_3_badc_2_0_bob: Log,
1076	}
1077
1078	impl From<Log> for FilteredLog<OpaqueBlock> {
1079		fn from(value: Log) -> Self {
1080			Self {
1081				substrate_block_hash: value.substrate_block_hash,
1082				ethereum_block_hash: value.ethereum_block_hash,
1083				block_number: value.block_number,
1084				ethereum_storage_schema: EthereumStorageSchema::V3,
1085				transaction_index: value.transaction_index,
1086				log_index: value.log_index,
1087			}
1088		}
1089	}
1090
1091	async fn prepare() -> TestData {
1092		let tmp = tempdir().expect("create a temporary directory");
1093		// Initialize storage with schema V3
1094		let builder = TestClientBuilder::new().add_extra_storage(
1095			PALLET_ETHEREUM_SCHEMA.to_vec(),
1096			Encode::encode(&EthereumStorageSchema::V3),
1097		);
1098		// Client
1099		let (client, _) = builder
1100			.build_with_native_executor::<substrate_test_runtime_client::runtime::RuntimeApi, _>(
1101				None,
1102			);
1103		let client = Arc::new(client);
1104		// Overrides
1105		let storage_override = Arc::new(SchemaV3StorageOverride::new(client.clone()));
1106		// Indexer backend
1107		let indexer_backend = Backend::new(
1108			BackendConfig::Sqlite(SqliteBackendConfig {
1109				path: Path::new("sqlite:///")
1110					.join(tmp.path())
1111					.join("test.db3")
1112					.to_str()
1113					.unwrap(),
1114				create_if_missing: true,
1115				cache_size: 20480,
1116				thread_count: 4,
1117			}),
1118			1,
1119			None,
1120			storage_override.clone(),
1121		)
1122		.await
1123		.expect("indexer pool to be created");
1124
1125		// Prepare test db data
1126		// Addresses
1127		let alice = H160::repeat_byte(0x01);
1128		let bob = H160::repeat_byte(0x02);
1129		// Topics
1130		let topics_a = H256::repeat_byte(0x01);
1131		let topics_b = H256::repeat_byte(0x02);
1132		let topics_c = H256::repeat_byte(0x03);
1133		let topics_d = H256::repeat_byte(0x04);
1134		// Substrate block hashes
1135		let substrate_hash_1 = H256::repeat_byte(0x05);
1136		let substrate_hash_2 = H256::repeat_byte(0x06);
1137		let substrate_hash_3 = H256::repeat_byte(0x07);
1138		// Ethereum block hashes
1139		let ethereum_hash_1 = H256::repeat_byte(0x08);
1140		let ethereum_hash_2 = H256::repeat_byte(0x09);
1141		let ethereum_hash_3 = H256::repeat_byte(0x0a);
1142		// Ethereum storage schema
1143		let ethereum_storage_schema = EthereumStorageSchema::V3;
1144
1145		let block_entries = vec![
1146			// Block 1
1147			(
1148				1i32,
1149				ethereum_hash_1,
1150				substrate_hash_1,
1151				ethereum_storage_schema,
1152			),
1153			// Block 2
1154			(
1155				2i32,
1156				ethereum_hash_2,
1157				substrate_hash_2,
1158				ethereum_storage_schema,
1159			),
1160			// Block 3
1161			(
1162				3i32,
1163				ethereum_hash_3,
1164				substrate_hash_3,
1165				ethereum_storage_schema,
1166			),
1167		];
1168		let mut builder = QueryBuilder::new(
1169			"INSERT INTO blocks(
1170				block_number,
1171				ethereum_block_hash,
1172				substrate_block_hash,
1173				ethereum_storage_schema,
1174				is_canon
1175			)",
1176		);
1177		builder.push_values(block_entries, |mut b, entry| {
1178			let block_number = entry.0;
1179			let ethereum_block_hash = entry.1.as_bytes().to_owned();
1180			let substrate_block_hash = entry.2.as_bytes().to_owned();
1181			let ethereum_storage_schema = entry.3.encode();
1182
1183			b.push_bind(block_number);
1184			b.push_bind(ethereum_block_hash);
1185			b.push_bind(substrate_block_hash);
1186			b.push_bind(ethereum_storage_schema);
1187			b.push_bind(1i32);
1188		});
1189		let query = builder.build();
1190		let _ = query
1191			.execute(indexer_backend.pool())
1192			.await
1193			.expect("insert should succeed");
1194
1195		// log_{BLOCK}_{TOPICS}_{LOG_INDEX}_{TX_INDEX}
1196		let log_1_abcd_0_0_alice = Log {
1197			block_number: 1,
1198			address: alice,
1199			topics: [topics_a, topics_b, topics_c, topics_d],
1200			log_index: 0,
1201			transaction_index: 0,
1202			substrate_block_hash: substrate_hash_1,
1203			ethereum_block_hash: ethereum_hash_1,
1204		};
1205		let log_1_dcba_1_0_alice = Log {
1206			block_number: 1,
1207			address: alice,
1208			topics: [topics_d, topics_c, topics_b, topics_a],
1209			log_index: 1,
1210			transaction_index: 0,
1211			substrate_block_hash: substrate_hash_1,
1212			ethereum_block_hash: ethereum_hash_1,
1213		};
1214		let log_1_badc_2_0_alice = Log {
1215			block_number: 1,
1216			address: alice,
1217			topics: [topics_b, topics_a, topics_d, topics_c],
1218			log_index: 2,
1219			transaction_index: 0,
1220			substrate_block_hash: substrate_hash_1,
1221			ethereum_block_hash: ethereum_hash_1,
1222		};
1223		let log_2_abcd_0_0_bob = Log {
1224			block_number: 2,
1225			address: bob,
1226			topics: [topics_a, topics_b, topics_c, topics_d],
1227			log_index: 0,
1228			transaction_index: 0,
1229			substrate_block_hash: substrate_hash_2,
1230			ethereum_block_hash: ethereum_hash_2,
1231		};
1232		let log_2_dcba_1_0_bob = Log {
1233			block_number: 2,
1234			address: bob,
1235			topics: [topics_d, topics_c, topics_b, topics_a],
1236			log_index: 1,
1237			transaction_index: 0,
1238			substrate_block_hash: substrate_hash_2,
1239			ethereum_block_hash: ethereum_hash_2,
1240		};
1241		let log_2_badc_2_0_bob = Log {
1242			block_number: 2,
1243			address: bob,
1244			topics: [topics_b, topics_a, topics_d, topics_c],
1245			log_index: 2,
1246			transaction_index: 0,
1247			substrate_block_hash: substrate_hash_2,
1248			ethereum_block_hash: ethereum_hash_2,
1249		};
1250
1251		let log_3_abcd_0_0_bob = Log {
1252			block_number: 3,
1253			address: bob,
1254			topics: [topics_a, topics_b, topics_c, topics_d],
1255			log_index: 0,
1256			transaction_index: 0,
1257			substrate_block_hash: substrate_hash_3,
1258			ethereum_block_hash: ethereum_hash_3,
1259		};
1260		let log_3_dcba_1_0_bob = Log {
1261			block_number: 3,
1262			address: bob,
1263			topics: [topics_d, topics_c, topics_b, topics_a],
1264			log_index: 1,
1265			transaction_index: 0,
1266			substrate_block_hash: substrate_hash_3,
1267			ethereum_block_hash: ethereum_hash_3,
1268		};
1269		let log_3_badc_2_0_bob = Log {
1270			block_number: 3,
1271			address: bob,
1272			topics: [topics_b, topics_a, topics_d, topics_c],
1273			log_index: 2,
1274			transaction_index: 0,
1275			substrate_block_hash: substrate_hash_3,
1276			ethereum_block_hash: ethereum_hash_3,
1277		};
1278
1279		let log_entries = vec![
1280			// Block 1
1281			log_1_abcd_0_0_alice.clone(),
1282			log_1_dcba_1_0_alice.clone(),
1283			log_1_badc_2_0_alice.clone(),
1284			// Block 2
1285			log_2_abcd_0_0_bob.clone(),
1286			log_2_dcba_1_0_bob.clone(),
1287			log_2_badc_2_0_bob.clone(),
1288			// Block 3
1289			log_3_abcd_0_0_bob.clone(),
1290			log_3_dcba_1_0_bob.clone(),
1291			log_3_badc_2_0_bob.clone(),
1292		];
1293
1294		let mut builder: QueryBuilder<sqlx::Sqlite> = QueryBuilder::new(
1295			"INSERT INTO logs(
1296				address,
1297				topic_1,
1298				topic_2,
1299				topic_3,
1300				topic_4,
1301				log_index,
1302				transaction_index,
1303				substrate_block_hash
1304			)",
1305		);
1306		builder.push_values(log_entries, |mut b, entry| {
1307			let address = entry.address.as_bytes().to_owned();
1308			let topic_1 = entry.topics[0].as_bytes().to_owned();
1309			let topic_2 = entry.topics[1].as_bytes().to_owned();
1310			let topic_3 = entry.topics[2].as_bytes().to_owned();
1311			let topic_4 = entry.topics[3].as_bytes().to_owned();
1312			let log_index = entry.log_index;
1313			let transaction_index = entry.transaction_index;
1314			let substrate_block_hash = entry.substrate_block_hash.as_bytes().to_owned();
1315
1316			b.push_bind(address);
1317			b.push_bind(topic_1);
1318			b.push_bind(topic_2);
1319			b.push_bind(topic_3);
1320			b.push_bind(topic_4);
1321			b.push_bind(log_index);
1322			b.push_bind(transaction_index);
1323			b.push_bind(substrate_block_hash);
1324		});
1325		let query = builder.build();
1326		let _ = query.execute(indexer_backend.pool()).await;
1327
1328		TestData {
1329			alice,
1330			bob,
1331			topics_a,
1332			topics_b,
1333			topics_c,
1334			topics_d,
1335			substrate_hash_1,
1336			substrate_hash_2,
1337			substrate_hash_3,
1338			ethereum_hash_1,
1339			ethereum_hash_2,
1340			ethereum_hash_3,
1341			backend: indexer_backend,
1342			log_1_abcd_0_0_alice,
1343			log_1_dcba_1_0_alice,
1344			log_1_badc_2_0_alice,
1345			log_2_abcd_0_0_bob,
1346			log_2_dcba_1_0_bob,
1347			log_2_badc_2_0_bob,
1348			log_3_abcd_0_0_bob,
1349			log_3_dcba_1_0_bob,
1350			log_3_badc_2_0_bob,
1351		}
1352	}
1353
1354	async fn run_test_case(
1355		backend: Backend<OpaqueBlock>,
1356		test_case: &TestFilter,
1357	) -> Result<Vec<FilteredLog<OpaqueBlock>>, String> {
1358		backend
1359			.log_indexer()
1360			.filter_logs(
1361				test_case.from_block,
1362				test_case.to_block,
1363				test_case.addresses.clone(),
1364				test_case.topics.clone(),
1365			)
1366			.await
1367	}
1368
1369	async fn assert_blocks_canon(pool: &SqlitePool, expected: Vec<(H256, u32)>) {
1370		let actual: Vec<(H256, u32)> =
1371			sqlx::query("SELECT substrate_block_hash, is_canon FROM blocks")
1372				.map(|row: SqliteRow| (H256::from_slice(&row.get::<Vec<u8>, _>(0)[..]), row.get(1)))
1373				.fetch_all(pool)
1374				.await
1375				.expect("sql query must succeed");
1376		assert_eq!(expected, actual);
1377	}
1378
1379	#[tokio::test]
1380	async fn genesis_works() {
1381		let TestData { backend, .. } = prepare().await;
1382		let filter = TestFilter {
1383			from_block: 0,
1384			to_block: 0,
1385			addresses: vec![],
1386			topics: vec![],
1387			expected_result: vec![],
1388		};
1389		let result = run_test_case(backend, &filter).await.expect("must succeed");
1390		assert_eq!(result, filter.expected_result);
1391	}
1392
1393	#[tokio::test]
1394	async fn unsanitized_input_works() {
1395		let TestData { backend, .. } = prepare().await;
1396		let filter = TestFilter {
1397			from_block: 0,
1398			to_block: 0,
1399			addresses: vec![],
1400			topics: vec![vec![], vec![], vec![], vec![]],
1401			expected_result: vec![],
1402		};
1403		let result = run_test_case(backend, &filter).await.expect("must succeed");
1404		assert_eq!(result, filter.expected_result);
1405	}
1406
1407	#[tokio::test]
1408	async fn test_malformed_topic_cleans_invalid_options() {
1409		let TestData {
1410			backend,
1411			topics_a,
1412			topics_b,
1413			topics_d,
1414			log_1_badc_2_0_alice,
1415			..
1416		} = prepare().await;
1417
1418		// [(a,null,b), (a, null), (d,null), null] -> [(a,b), a, d]
1419		let filter = TestFilter {
1420			from_block: 0,
1421			to_block: 1,
1422			addresses: vec![],
1423			topics: vec![vec![topics_a, topics_b], vec![topics_a], vec![topics_d]],
1424			expected_result: vec![log_1_badc_2_0_alice.into()],
1425		};
1426		let result = run_test_case(backend, &filter).await.expect("must succeed");
1427		assert_eq!(result, filter.expected_result);
1428	}
1429
1430	#[tokio::test]
1431	async fn block_range_works() {
1432		let TestData {
1433			backend,
1434			log_1_abcd_0_0_alice,
1435			log_1_dcba_1_0_alice,
1436			log_1_badc_2_0_alice,
1437			log_2_abcd_0_0_bob,
1438			log_2_dcba_1_0_bob,
1439			log_2_badc_2_0_bob,
1440			..
1441		} = prepare().await;
1442
1443		let filter = TestFilter {
1444			from_block: 0,
1445			to_block: 2,
1446			addresses: vec![],
1447			topics: vec![],
1448			expected_result: vec![
1449				log_1_abcd_0_0_alice.into(),
1450				log_1_dcba_1_0_alice.into(),
1451				log_1_badc_2_0_alice.into(),
1452				log_2_abcd_0_0_bob.into(),
1453				log_2_dcba_1_0_bob.into(),
1454				log_2_badc_2_0_bob.into(),
1455			],
1456		};
1457		let result = run_test_case(backend, &filter).await.expect("must succeed");
1458		assert_eq!(result, filter.expected_result);
1459	}
1460
1461	#[tokio::test]
1462	async fn address_filter_works() {
1463		let TestData {
1464			backend,
1465			alice,
1466			log_1_abcd_0_0_alice,
1467			log_1_dcba_1_0_alice,
1468			log_1_badc_2_0_alice,
1469			..
1470		} = prepare().await;
1471		let filter = TestFilter {
1472			from_block: 0,
1473			to_block: 3,
1474			addresses: vec![alice],
1475			topics: vec![],
1476			expected_result: vec![
1477				log_1_abcd_0_0_alice.into(),
1478				log_1_dcba_1_0_alice.into(),
1479				log_1_badc_2_0_alice.into(),
1480			],
1481		};
1482		let result = run_test_case(backend, &filter).await.expect("must succeed");
1483		assert_eq!(result, filter.expected_result);
1484	}
1485
1486	#[tokio::test]
1487	async fn topic_filter_works() {
1488		let TestData {
1489			backend,
1490			topics_d,
1491			log_1_dcba_1_0_alice,
1492			log_2_dcba_1_0_bob,
1493			log_3_dcba_1_0_bob,
1494			..
1495		} = prepare().await;
1496		let filter = TestFilter {
1497			from_block: 0,
1498			to_block: 3,
1499			addresses: vec![],
1500			topics: vec![vec![topics_d]],
1501			expected_result: vec![
1502				log_1_dcba_1_0_alice.into(),
1503				log_2_dcba_1_0_bob.into(),
1504				log_3_dcba_1_0_bob.into(),
1505			],
1506		};
1507		let result = run_test_case(backend, &filter).await.expect("must succeed");
1508		assert_eq!(result, filter.expected_result);
1509	}
1510
1511	#[tokio::test]
1512	async fn test_filters_address_and_topic() {
1513		let TestData {
1514			backend,
1515			bob,
1516			topics_b,
1517			log_2_badc_2_0_bob,
1518			log_3_badc_2_0_bob,
1519			..
1520		} = prepare().await;
1521		let filter = TestFilter {
1522			from_block: 0,
1523			to_block: 3,
1524			addresses: vec![bob],
1525			topics: vec![vec![topics_b]],
1526			expected_result: vec![log_2_badc_2_0_bob.into(), log_3_badc_2_0_bob.into()],
1527		};
1528		let result = run_test_case(backend, &filter).await.expect("must succeed");
1529		assert_eq!(result, filter.expected_result);
1530	}
1531
1532	#[tokio::test]
1533	async fn test_filters_multi_address_and_topic() {
1534		let TestData {
1535			backend,
1536			alice,
1537			bob,
1538			topics_b,
1539			log_1_badc_2_0_alice,
1540			log_2_badc_2_0_bob,
1541			log_3_badc_2_0_bob,
1542			..
1543		} = prepare().await;
1544		let filter = TestFilter {
1545			from_block: 0,
1546			to_block: 3,
1547			addresses: vec![alice, bob],
1548			topics: vec![vec![topics_b]],
1549			expected_result: vec![
1550				log_1_badc_2_0_alice.into(),
1551				log_2_badc_2_0_bob.into(),
1552				log_3_badc_2_0_bob.into(),
1553			],
1554		};
1555		let result = run_test_case(backend, &filter).await.expect("must succeed");
1556		assert_eq!(result, filter.expected_result);
1557	}
1558
1559	#[tokio::test]
1560	async fn test_filters_multi_address_and_multi_topic() {
1561		let TestData {
1562			backend,
1563			alice,
1564			bob,
1565			topics_a,
1566			topics_b,
1567			log_1_abcd_0_0_alice,
1568			log_2_abcd_0_0_bob,
1569			log_3_abcd_0_0_bob,
1570			..
1571		} = prepare().await;
1572		let filter = TestFilter {
1573			from_block: 0,
1574			to_block: 3,
1575			addresses: vec![alice, bob],
1576			topics: vec![vec![topics_a], vec![topics_b]],
1577			expected_result: vec![
1578				log_1_abcd_0_0_alice.into(),
1579				log_2_abcd_0_0_bob.into(),
1580				log_3_abcd_0_0_bob.into(),
1581			],
1582		};
1583		let result = run_test_case(backend, &filter).await.expect("must succeed");
1584		assert_eq!(result, filter.expected_result);
1585	}
1586
1587	#[tokio::test]
1588	async fn filter_with_topic_wildcards_works() {
1589		let TestData {
1590			backend,
1591			alice,
1592			bob,
1593			topics_d,
1594			topics_b,
1595			log_1_dcba_1_0_alice,
1596			log_2_dcba_1_0_bob,
1597			log_3_dcba_1_0_bob,
1598			..
1599		} = prepare().await;
1600		let filter = TestFilter {
1601			from_block: 0,
1602			to_block: 3,
1603			addresses: vec![alice, bob],
1604			topics: vec![vec![topics_d], vec![], vec![topics_b]],
1605			expected_result: vec![
1606				log_1_dcba_1_0_alice.into(),
1607				log_2_dcba_1_0_bob.into(),
1608				log_3_dcba_1_0_bob.into(),
1609			],
1610		};
1611		let result = run_test_case(backend, &filter).await.expect("must succeed");
1612		assert_eq!(result, filter.expected_result);
1613	}
1614
1615	#[tokio::test]
1616	async fn trailing_wildcard_is_useless_but_works() {
1617		let TestData {
1618			alice,
1619			backend,
1620			topics_b,
1621			log_1_dcba_1_0_alice,
1622			..
1623		} = prepare().await;
1624		let filter = TestFilter {
1625			from_block: 0,
1626			to_block: 1,
1627			addresses: vec![alice],
1628			topics: vec![vec![], vec![], vec![topics_b]],
1629			expected_result: vec![log_1_dcba_1_0_alice.into()],
1630		};
1631		let result = run_test_case(backend, &filter).await.expect("must succeed");
1632		assert_eq!(result, filter.expected_result);
1633	}
1634
1635	#[tokio::test]
1636	async fn filter_with_multi_topic_options_works() {
1637		let TestData {
1638			backend,
1639			topics_a,
1640			topics_d,
1641			log_1_abcd_0_0_alice,
1642			log_1_dcba_1_0_alice,
1643			log_2_abcd_0_0_bob,
1644			log_2_dcba_1_0_bob,
1645			log_3_abcd_0_0_bob,
1646			log_3_dcba_1_0_bob,
1647			..
1648		} = prepare().await;
1649		let filter = TestFilter {
1650			from_block: 0,
1651			to_block: 3,
1652			addresses: vec![],
1653			topics: vec![vec![topics_a, topics_d]],
1654			expected_result: vec![
1655				log_1_abcd_0_0_alice.into(),
1656				log_1_dcba_1_0_alice.into(),
1657				log_2_abcd_0_0_bob.into(),
1658				log_2_dcba_1_0_bob.into(),
1659				log_3_abcd_0_0_bob.into(),
1660				log_3_dcba_1_0_bob.into(),
1661			],
1662		};
1663		let result = run_test_case(backend, &filter).await.expect("must succeed");
1664		assert_eq!(result, filter.expected_result);
1665	}
1666
1667	#[tokio::test]
1668	async fn filter_with_multi_topic_options_and_wildcards_works() {
1669		let TestData {
1670			backend,
1671			bob,
1672			topics_a,
1673			topics_b,
1674			topics_c,
1675			topics_d,
1676			log_2_dcba_1_0_bob,
1677			log_2_badc_2_0_bob,
1678			log_3_dcba_1_0_bob,
1679			log_3_badc_2_0_bob,
1680			..
1681		} = prepare().await;
1682		let filter = TestFilter {
1683			from_block: 0,
1684			to_block: 3,
1685			addresses: vec![bob],
1686			// Product on input [null,null,(b,d),(a,c)].
1687			topics: vec![
1688				vec![],
1689				vec![],
1690				vec![topics_b, topics_d],
1691				vec![topics_a, topics_c],
1692			],
1693			expected_result: vec![
1694				log_2_dcba_1_0_bob.into(),
1695				log_2_badc_2_0_bob.into(),
1696				log_3_dcba_1_0_bob.into(),
1697				log_3_badc_2_0_bob.into(),
1698			],
1699		};
1700		let result = run_test_case(backend, &filter).await.expect("must succeed");
1701		assert_eq!(result, filter.expected_result);
1702	}
1703
1704	#[tokio::test]
1705	async fn test_canonicalize_sets_canon_flag_for_redacted_and_enacted_blocks_correctly() {
1706		let TestData {
1707			backend,
1708			substrate_hash_1,
1709			substrate_hash_2,
1710			substrate_hash_3,
1711			..
1712		} = prepare().await;
1713
1714		// set block #1 to non canon
1715		sqlx::query("UPDATE blocks SET is_canon = 0 WHERE substrate_block_hash = ?")
1716			.bind(substrate_hash_1.as_bytes())
1717			.execute(backend.pool())
1718			.await
1719			.expect("sql query must succeed");
1720		assert_blocks_canon(
1721			backend.pool(),
1722			vec![
1723				(substrate_hash_1, 0),
1724				(substrate_hash_2, 1),
1725				(substrate_hash_3, 1),
1726			],
1727		)
1728		.await;
1729
1730		backend
1731			.canonicalize(&[substrate_hash_2], &[substrate_hash_1])
1732			.await
1733			.expect("must succeed");
1734
1735		assert_blocks_canon(
1736			backend.pool(),
1737			vec![
1738				(substrate_hash_1, 1),
1739				(substrate_hash_2, 0),
1740				(substrate_hash_3, 1),
1741			],
1742		)
1743		.await;
1744	}
1745
1746	#[test]
1747	fn test_query_should_be_generated_correctly() {
1748		use sqlx::Execute;
1749
1750		let from_block: u64 = 100;
1751		let to_block: u64 = 500;
1752		let addresses: Vec<H160> = vec![
1753			H160::repeat_byte(0x01),
1754			H160::repeat_byte(0x02),
1755			H160::repeat_byte(0x03),
1756		];
1757		let topics = [
1758			hashset![
1759				H256::repeat_byte(0x01),
1760				H256::repeat_byte(0x02),
1761				H256::repeat_byte(0x03),
1762			],
1763			hashset![H256::repeat_byte(0x04), H256::repeat_byte(0x05),],
1764			hashset![],
1765			hashset![H256::repeat_byte(0x06)],
1766		];
1767
1768		let expected_query_sql = "
1769SELECT
1770	l.substrate_block_hash,
1771	b.ethereum_block_hash,
1772	b.block_number,
1773	b.ethereum_storage_schema,
1774	l.transaction_index,
1775	l.log_index
1776FROM logs AS l
1777INNER JOIN blocks AS b
1778ON (b.block_number BETWEEN ? AND ?) AND b.substrate_block_hash = l.substrate_block_hash AND b.is_canon = 1
1779WHERE 1 AND l.address IN (?, ?, ?) AND l.topic_1 IN (?, ?, ?) AND l.topic_2 IN (?, ?) AND l.topic_4 = ?
1780ORDER BY b.block_number ASC, l.transaction_index ASC, l.log_index ASC
1781LIMIT 10001";
1782
1783		let mut qb = QueryBuilder::new("");
1784		let actual_query_sql = build_query(&mut qb, from_block, to_block, addresses, topics).sql();
1785		assert_eq!(expected_query_sql, actual_query_sql);
1786	}
1787}