fc_db/sql/
mod.rs

1// This file is part of Frontier.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use std::{cmp::Ordering, collections::HashSet, num::NonZeroU32, str::FromStr, sync::Arc};
20
21use futures::TryStreamExt;
22use scale_codec::{Decode, Encode};
23use sqlx::{
24	query::Query,
25	sqlite::{
26		SqliteArguments, SqliteConnectOptions, SqlitePool, SqlitePoolOptions, SqliteQueryResult,
27	},
28	ConnectOptions, Error, Execute, QueryBuilder, Row, Sqlite,
29};
30// Substrate
31use sc_client_api::backend::{Backend as BackendT, StorageProvider};
32use sp_api::{ApiExt, ProvideRuntimeApi};
33use sp_blockchain::HeaderBackend;
34use sp_core::{H160, H256};
35use sp_runtime::{
36	generic::BlockId,
37	traits::{Block as BlockT, Header as HeaderT, UniqueSaturatedInto, Zero},
38};
39// Frontier
40use fc_api::{FilteredLog, TransactionMetadata};
41use fc_storage::{StorageOverride, StorageQuerier};
42use fp_consensus::{FindLogError, Hashes, Log as ConsensusLog, PostLog, PreLog};
43use fp_rpc::EthereumRuntimeRPCApi;
44use fp_storage::EthereumStorageSchema;
45
46/// Maximum number to topics allowed to be filtered upon
47const MAX_TOPIC_COUNT: u16 = 4;
48
49/// Represents a log item.
50#[derive(Debug, Eq, PartialEq)]
51pub struct Log {
52	pub address: Vec<u8>,
53	pub topic_1: Option<Vec<u8>>,
54	pub topic_2: Option<Vec<u8>>,
55	pub topic_3: Option<Vec<u8>>,
56	pub topic_4: Option<Vec<u8>>,
57	pub log_index: i32,
58	pub transaction_index: i32,
59	pub substrate_block_hash: Vec<u8>,
60}
61
62/// Represents the block metadata.
63#[derive(Eq, PartialEq)]
64struct BlockMetadata {
65	pub substrate_block_hash: H256,
66	pub block_number: i32,
67	pub post_hashes: Hashes,
68	pub schema: EthereumStorageSchema,
69	pub is_canon: i32,
70}
71
72/// Represents the Sqlite connection options that are
73/// used to establish a database connection.
74#[derive(Debug)]
75pub struct SqliteBackendConfig<'a> {
76	pub path: &'a str,
77	pub create_if_missing: bool,
78	pub thread_count: u32,
79	pub cache_size: u64,
80}
81
82/// Represents the indexed status of a block and if it's canon or not.
83#[derive(Debug, Default)]
84pub struct BlockIndexedStatus {
85	pub indexed: bool,
86	pub canon: bool,
87}
88
89/// Represents the backend configurations.
90#[derive(Debug)]
91pub enum BackendConfig<'a> {
92	Sqlite(SqliteBackendConfig<'a>),
93}
94
95#[derive(Clone)]
96pub struct Backend<Block> {
97	/// The Sqlite connection.
98	pool: SqlitePool,
99	/// The additional overrides for the logs handler.
100	storage_override: Arc<dyn StorageOverride<Block>>,
101
102	/// The number of allowed operations for the Sqlite filter call.
103	/// A value of `0` disables the timeout.
104	num_ops_timeout: i32,
105}
106
107impl<Block> Backend<Block>
108where
109	Block: BlockT<Hash = H256>,
110{
111	/// Creates a new instance of the SQL backend.
112	pub async fn new(
113		config: BackendConfig<'_>,
114		pool_size: u32,
115		num_ops_timeout: Option<NonZeroU32>,
116		storage_override: Arc<dyn StorageOverride<Block>>,
117	) -> Result<Self, Error> {
118		let any_pool = SqlitePoolOptions::new()
119			.max_connections(pool_size)
120			.connect_lazy_with(Self::connect_options(&config)?.disable_statement_logging());
121		let _ = Self::create_database_if_not_exists(&any_pool).await?;
122		let _ = Self::create_indexes_if_not_exist(&any_pool).await?;
123		Ok(Self {
124			pool: any_pool,
125			storage_override,
126			num_ops_timeout: num_ops_timeout
127				.map(|n| n.get())
128				.unwrap_or(0)
129				.try_into()
130				.unwrap_or(i32::MAX),
131		})
132	}
133
134	fn connect_options(config: &BackendConfig) -> Result<SqliteConnectOptions, Error> {
135		match config {
136			BackendConfig::Sqlite(config) => {
137				log::info!(target: "frontier-sql", "📑 Connection configuration: {config:?}");
138				let config = sqlx::sqlite::SqliteConnectOptions::from_str(config.path)?
139					.create_if_missing(config.create_if_missing)
140					// https://www.sqlite.org/pragma.html#pragma_busy_timeout
141					.busy_timeout(std::time::Duration::from_secs(8))
142					// 200MB, https://www.sqlite.org/pragma.html#pragma_cache_size
143					.pragma("cache_size", format!("-{}", config.cache_size))
144					// https://www.sqlite.org/pragma.html#pragma_analysis_limit
145					.pragma("analysis_limit", "1000")
146					// https://www.sqlite.org/pragma.html#pragma_threads
147					.pragma("threads", config.thread_count.to_string())
148					// https://www.sqlite.org/pragma.html#pragma_threads
149					.pragma("temp_store", "memory")
150					// https://www.sqlite.org/wal.html
151					.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
152					// https://www.sqlite.org/pragma.html#pragma_synchronous
153					.synchronous(sqlx::sqlite::SqliteSynchronous::Normal);
154				Ok(config)
155			}
156		}
157	}
158
159	/// Get the underlying Sqlite pool.
160	pub fn pool(&self) -> &SqlitePool {
161		&self.pool
162	}
163
164	/// Canonicalize the indexed blocks, marking/demarking them as canon based on the
165	/// provided `retracted` and `enacted` values.
166	pub async fn canonicalize(&self, retracted: &[H256], enacted: &[H256]) -> Result<(), Error> {
167		let mut tx = self.pool().begin().await?;
168
169		// Retracted
170		let mut builder: QueryBuilder<Sqlite> =
171			QueryBuilder::new("UPDATE blocks SET is_canon = 0 WHERE substrate_block_hash IN (");
172		let mut retracted_hashes = builder.separated(", ");
173		for hash in retracted.iter() {
174			let hash = hash.as_bytes();
175			retracted_hashes.push_bind(hash);
176		}
177		retracted_hashes.push_unseparated(")");
178		let query = builder.build();
179		query.execute(&mut *tx).await?;
180
181		// Enacted
182		let mut builder: QueryBuilder<Sqlite> =
183			QueryBuilder::new("UPDATE blocks SET is_canon = 1 WHERE substrate_block_hash IN (");
184		let mut enacted_hashes = builder.separated(", ");
185		for hash in enacted.iter() {
186			let hash = hash.as_bytes();
187			enacted_hashes.push_bind(hash);
188		}
189		enacted_hashes.push_unseparated(")");
190		let query = builder.build();
191		query.execute(&mut *tx).await?;
192
193		tx.commit().await
194	}
195
196	/// Index the block metadata for the genesis block.
197	pub async fn insert_genesis_block_metadata<Client, BE>(
198		&self,
199		client: Arc<Client>,
200	) -> Result<Option<H256>, Error>
201	where
202		Client: StorageProvider<Block, BE> + HeaderBackend<Block> + 'static,
203		Client: ProvideRuntimeApi<Block>,
204		Client::Api: EthereumRuntimeRPCApi<Block>,
205		BE: BackendT<Block> + 'static,
206	{
207		let id = BlockId::Number(Zero::zero());
208		let substrate_genesis_hash = client
209			.expect_block_hash_from_id(&id)
210			.map_err(|_| Error::Protocol("Cannot resolve genesis hash".to_string()))?;
211		let maybe_substrate_hash: Option<H256> = if let Ok(Some(_)) =
212			client.header(substrate_genesis_hash)
213		{
214			let has_api = client
215				.runtime_api()
216				.has_api_with::<dyn EthereumRuntimeRPCApi<Block>, _>(
217					substrate_genesis_hash,
218					|version| version >= 1,
219				)
220				.expect("runtime api reachable");
221
222			log::debug!(target: "frontier-sql", "Index genesis block, has_api={has_api}, hash={substrate_genesis_hash:?}");
223
224			if has_api {
225				// The chain has frontier support from genesis.
226				// Read from the runtime and store the block metadata.
227				let ethereum_block = client
228					.runtime_api()
229					.current_block(substrate_genesis_hash)
230					.expect("runtime api reachable")
231					.expect("ethereum genesis block");
232
233				let schema = StorageQuerier::new(client)
234					.storage_schema(substrate_genesis_hash)
235					.unwrap_or(EthereumStorageSchema::V3)
236					.encode();
237				let ethereum_block_hash = ethereum_block.header.hash().as_bytes().to_owned();
238				let substrate_block_hash = substrate_genesis_hash.as_bytes();
239				let block_number = 0i32;
240				let is_canon = 1i32;
241
242				let _ = sqlx::query(
243					"INSERT OR IGNORE INTO blocks(
244						ethereum_block_hash,
245						substrate_block_hash,
246						block_number,
247						ethereum_storage_schema,
248						is_canon)
249					VALUES (?, ?, ?, ?, ?)",
250				)
251				.bind(ethereum_block_hash)
252				.bind(substrate_block_hash)
253				.bind(block_number)
254				.bind(schema)
255				.bind(is_canon)
256				.execute(self.pool())
257				.await?;
258			}
259			Some(substrate_genesis_hash)
260		} else {
261			None
262		};
263		Ok(maybe_substrate_hash)
264	}
265
266	fn insert_block_metadata_inner<Client, BE>(
267		client: Arc<Client>,
268		hash: H256,
269		storage_override: &dyn StorageOverride<Block>,
270	) -> Result<BlockMetadata, Error>
271	where
272		Client: StorageProvider<Block, BE> + HeaderBackend<Block> + 'static,
273		BE: BackendT<Block> + 'static,
274	{
275		log::trace!(target: "frontier-sql", "🛠️  [Metadata] Retrieving digest data for block {hash:?}");
276		if let Ok(Some(header)) = client.header(hash) {
277			match fp_consensus::find_log(header.digest()) {
278				Ok(log) => {
279					let schema = StorageQuerier::new(client.clone())
280						.storage_schema(hash)
281						.unwrap_or(EthereumStorageSchema::V3);
282					let log_hashes = match log {
283						ConsensusLog::Post(PostLog::Hashes(post_hashes)) => post_hashes,
284						ConsensusLog::Post(PostLog::Block(block)) => Hashes::from_block(block),
285						ConsensusLog::Post(PostLog::BlockHash(expect_eth_block_hash)) => {
286							let ethereum_block = storage_override.current_block(hash);
287							match ethereum_block {
288								Some(block) => {
289									let got_eth_block_hash = block.header.hash();
290									if got_eth_block_hash != expect_eth_block_hash {
291										return Err(Error::Protocol(format!(
292											"Ethereum block hash mismatch: \
293											frontier consensus digest ({expect_eth_block_hash:?}), \
294											db state ({got_eth_block_hash:?})"
295										)));
296									} else {
297										Hashes::from_block(block)
298									}
299								}
300								None => {
301									return Err(Error::Protocol(format!(
302										"Missing ethereum block for hash mismatch {expect_eth_block_hash:?}"
303									)))
304								}
305							}
306						}
307						ConsensusLog::Pre(PreLog::Block(block)) => Hashes::from_block(block),
308					};
309
310					let header_number = *header.number();
311					let block_number =
312						UniqueSaturatedInto::<u32>::unique_saturated_into(header_number) as i32;
313					let is_canon = match client.hash(header_number) {
314						Ok(Some(inner_hash)) => (inner_hash == hash) as i32,
315						Ok(None) => {
316							log::debug!(target: "frontier-sql", "[Metadata] Missing header for block #{block_number} ({hash:?})");
317							0
318						}
319						Err(err) => {
320							log::debug!(
321								target: "frontier-sql",
322								"[Metadata] Failed to retrieve header for block #{block_number} ({hash:?}): {err:?}",
323							);
324							0
325						}
326					};
327
328					log::trace!(
329						target: "frontier-sql",
330						"[Metadata] Prepared block metadata for #{block_number} ({hash:?}) canon={is_canon}",
331					);
332					Ok(BlockMetadata {
333						substrate_block_hash: hash,
334						block_number,
335						post_hashes: log_hashes,
336						schema,
337						is_canon,
338					})
339				}
340				Err(FindLogError::NotFound) => Err(Error::Protocol(format!(
341					"[Metadata] No logs found for hash {hash:?}",
342				))),
343				Err(FindLogError::MultipleLogs) => Err(Error::Protocol(format!(
344					"[Metadata] Multiple logs found for hash {hash:?}",
345				))),
346			}
347		} else {
348			Err(Error::Protocol(format!(
349				"[Metadata] Failed retrieving header for hash {hash:?}"
350			)))
351		}
352	}
353
354	/// Insert the block metadata for the provided block hashes.
355	pub async fn insert_block_metadata<Client, BE>(
356		&self,
357		client: Arc<Client>,
358		hash: H256,
359	) -> Result<(), Error>
360	where
361		Client: StorageProvider<Block, BE> + HeaderBackend<Block> + 'static,
362		BE: BackendT<Block> + 'static,
363	{
364		// Spawn a blocking task to get block metadata from substrate backend.
365		let storage_override = self.storage_override.clone();
366		let metadata = tokio::task::spawn_blocking(move || {
367			Self::insert_block_metadata_inner(client.clone(), hash, &*storage_override)
368		})
369		.await
370		.map_err(|_| Error::Protocol("tokio blocking metadata task failed".to_string()))??;
371
372		let mut tx = self.pool().begin().await?;
373
374		log::debug!(
375			target: "frontier-sql",
376			"🛠️  [Metadata] Starting execution of statements on db transaction"
377		);
378		let post_hashes = metadata.post_hashes;
379		let ethereum_block_hash = post_hashes.block_hash.as_bytes();
380		let substrate_block_hash = metadata.substrate_block_hash.as_bytes();
381		let schema = metadata.schema.encode();
382		let block_number = metadata.block_number;
383		let is_canon = metadata.is_canon;
384
385		let _ = sqlx::query(
386			"INSERT OR IGNORE INTO blocks(
387					ethereum_block_hash,
388					substrate_block_hash,
389					block_number,
390					ethereum_storage_schema,
391					is_canon)
392				VALUES (?, ?, ?, ?, ?)",
393		)
394		.bind(ethereum_block_hash)
395		.bind(substrate_block_hash)
396		.bind(block_number)
397		.bind(schema)
398		.bind(is_canon)
399		.execute(&mut *tx)
400		.await?;
401		for (i, &transaction_hash) in post_hashes.transaction_hashes.iter().enumerate() {
402			let ethereum_transaction_hash = transaction_hash.as_bytes();
403			let ethereum_transaction_index = i as i32;
404			log::trace!(
405				target: "frontier-sql",
406				"[Metadata] Inserting TX for block #{block_number} - {transaction_hash:?} index {ethereum_transaction_index}",
407			);
408			let _ = sqlx::query(
409				"INSERT OR IGNORE INTO transactions(
410						ethereum_transaction_hash,
411						substrate_block_hash,
412						ethereum_block_hash,
413						ethereum_transaction_index)
414					VALUES (?, ?, ?, ?)",
415			)
416			.bind(ethereum_transaction_hash)
417			.bind(substrate_block_hash)
418			.bind(ethereum_block_hash)
419			.bind(ethereum_transaction_index)
420			.execute(&mut *tx)
421			.await?;
422		}
423
424		sqlx::query("INSERT INTO sync_status(substrate_block_hash) VALUES (?)")
425			.bind(hash.as_bytes())
426			.execute(&mut *tx)
427			.await?;
428
429		log::debug!(target: "frontier-sql", "[Metadata] Ready to commit");
430		tx.commit().await
431	}
432
433	/// Index the logs for the newly indexed blocks upto a `max_pending_blocks` value.
434	pub async fn index_block_logs(&self, block_hash: Block::Hash) {
435		let pool = self.pool().clone();
436		let storage_override = self.storage_override.clone();
437		let _ = async {
438			// The overarching db transaction for the task.
439			// Due to the async nature of this task, the same work is likely to happen
440			// more than once. For example when a new batch is scheduled when the previous one
441			// didn't finished yet and the new batch happens to select the same substrate
442			// block hashes for the update.
443			// That is expected, we are exchanging extra work for *acid*ity.
444			// There is no case of unique constrain violation or race condition as already
445			// existing entries are ignored.
446			let mut tx = pool.begin().await?;
447			// Update statement returning the substrate block hashes for this batch.
448			match sqlx::query(
449				"UPDATE sync_status
450			SET status = 1
451			WHERE substrate_block_hash IN
452				(SELECT substrate_block_hash
453				FROM sync_status
454				WHERE status = 0 AND substrate_block_hash = ?) RETURNING substrate_block_hash",
455			)
456			.bind(block_hash.as_bytes())
457			.fetch_one(&mut *tx)
458			.await
459			{
460				Ok(_) => {
461					// Spawn a blocking task to get log data from substrate backend.
462					let logs = tokio::task::spawn_blocking(move || {
463						Self::get_logs(storage_override, block_hash)
464					})
465					.await
466					.map_err(|_| Error::Protocol("tokio blocking task failed".to_string()))?;
467
468					for log in logs {
469						let _ = sqlx::query(
470							"INSERT OR IGNORE INTO logs(
471						address,
472						topic_1,
473						topic_2,
474						topic_3,
475						topic_4,
476						log_index,
477						transaction_index,
478						substrate_block_hash)
479					VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
480						)
481						.bind(log.address)
482						.bind(log.topic_1)
483						.bind(log.topic_2)
484						.bind(log.topic_3)
485						.bind(log.topic_4)
486						.bind(log.log_index)
487						.bind(log.transaction_index)
488						.bind(log.substrate_block_hash)
489						.execute(&mut *tx)
490						.await?;
491					}
492					Ok(tx.commit().await?)
493				}
494				Err(e) => Err(e),
495			}
496		}
497		.await
498		.map_err(|e| {
499			log::error!(target: "frontier-sql", "{e}");
500		});
501		// https://www.sqlite.org/pragma.html#pragma_optimize
502		let _ = sqlx::query("PRAGMA optimize").execute(&pool).await;
503		log::debug!(target: "frontier-sql", "Batch committed");
504	}
505
506	fn get_logs(
507		storage_override: Arc<dyn StorageOverride<Block>>,
508		substrate_block_hash: H256,
509	) -> Vec<Log> {
510		let mut logs: Vec<Log> = vec![];
511		let mut transaction_count: usize = 0;
512		let mut log_count: usize = 0;
513		let receipts = storage_override
514			.current_receipts(substrate_block_hash)
515			.unwrap_or_default();
516
517		transaction_count += receipts.len();
518		for (transaction_index, receipt) in receipts.iter().enumerate() {
519			let receipt_logs = match receipt {
520				ethereum::ReceiptV4::Legacy(d)
521				| ethereum::ReceiptV4::EIP2930(d)
522				| ethereum::ReceiptV4::EIP1559(d)
523				| ethereum::ReceiptV4::EIP7702(d) => &d.logs,
524			};
525			let transaction_index = transaction_index as i32;
526			log_count += receipt_logs.len();
527			for (log_index, log) in receipt_logs.iter().enumerate() {
528				#[allow(clippy::get_first)]
529				logs.push(Log {
530					address: log.address.as_bytes().to_owned(),
531					topic_1: log.topics.get(0).map(|l| l.as_bytes().to_owned()),
532					topic_2: log.topics.get(1).map(|l| l.as_bytes().to_owned()),
533					topic_3: log.topics.get(2).map(|l| l.as_bytes().to_owned()),
534					topic_4: log.topics.get(3).map(|l| l.as_bytes().to_owned()),
535					log_index: log_index as i32,
536					transaction_index,
537					substrate_block_hash: substrate_block_hash.as_bytes().to_owned(),
538				});
539			}
540		}
541		log::debug!(
542			target: "frontier-sql",
543			"Ready to commit {log_count} logs from {transaction_count} transactions"
544		);
545		logs
546	}
547
548	/// Retrieves the status if a block has been already indexed.
549	pub async fn is_block_indexed(&self, block_hash: Block::Hash) -> bool {
550		sqlx::query("SELECT substrate_block_hash FROM sync_status WHERE substrate_block_hash = ?")
551			.bind(block_hash.as_bytes().to_owned())
552			.fetch_optional(self.pool())
553			.await
554			.map(|r| r.is_some())
555			.unwrap_or(false)
556	}
557
558	/// Retrieves the status if a block is indexed and if also marked as canon.
559	pub async fn block_indexed_and_canon_status(
560		&self,
561		block_hash: Block::Hash,
562	) -> BlockIndexedStatus {
563		sqlx::query(
564			"SELECT b.is_canon FROM sync_status AS s
565			INNER JOIN blocks AS b
566			ON s.substrate_block_hash = b.substrate_block_hash
567			WHERE s.substrate_block_hash = ?",
568		)
569		.bind(block_hash.as_bytes().to_owned())
570		.fetch_optional(self.pool())
571		.await
572		.map(|result| {
573			result
574				.map(|row| {
575					let is_canon: i32 = row.get(0);
576					BlockIndexedStatus {
577						indexed: true,
578						canon: is_canon != 0,
579					}
580				})
581				.unwrap_or_default()
582		})
583		.unwrap_or_default()
584	}
585
586	/// Sets the provided block as canon.
587	pub async fn set_block_as_canon(&self, block_hash: H256) -> Result<SqliteQueryResult, Error> {
588		sqlx::query("UPDATE blocks SET is_canon = 1 WHERE substrate_block_hash = ?")
589			.bind(block_hash.as_bytes())
590			.execute(self.pool())
591			.await
592	}
593
594	/// Retrieves the first missing canonical block number in decreasing order that hasn't been indexed yet.
595	/// If no unindexed block exists or the table or the rows do not exist, then the function
596	/// returns `None`.
597	pub async fn get_first_missing_canon_block(&self) -> Option<u32> {
598		match sqlx::query(
599			"SELECT b1.block_number-1
600			FROM blocks as b1
601			WHERE b1.block_number > 0 AND b1.is_canon=1 AND NOT EXISTS (
602				SELECT 1 FROM blocks AS b2
603				WHERE b2.block_number = b1.block_number-1
604				AND b1.is_canon=1
605				AND b2.is_canon=1
606			)
607			ORDER BY block_number LIMIT 1",
608		)
609		.fetch_optional(self.pool())
610		.await
611		{
612			Ok(result) => {
613				if let Some(row) = result {
614					let block_number: u32 = row.get(0);
615					return Some(block_number);
616				}
617			}
618			Err(err) => {
619				log::debug!(target: "frontier-sql", "Failed retrieving missing block {err:?}");
620			}
621		}
622
623		None
624	}
625
626	/// Retrieves the first pending canonical block hash in decreasing order that hasn't had
627	// its logs indexed yet. If no unindexed block exists or the table or the rows do not exist,
628	/// then the function returns `None`.
629	pub async fn get_first_pending_canon_block(&self) -> Option<H256> {
630		match sqlx::query(
631			"SELECT s.substrate_block_hash FROM sync_status AS s
632			INNER JOIN blocks as b
633			ON s.substrate_block_hash = b.substrate_block_hash
634			WHERE b.is_canon = 1 AND s.status = 0
635			ORDER BY b.block_number LIMIT 1",
636		)
637		.fetch_optional(self.pool())
638		.await
639		{
640			Ok(result) => {
641				if let Some(row) = result {
642					let block_hash_bytes: Vec<u8> = row.get(0);
643					let block_hash = H256::from_slice(&block_hash_bytes[..]);
644					return Some(block_hash);
645				}
646			}
647			Err(err) => {
648				log::debug!(target: "frontier-sql", "Failed retrieving missing block {err:?}");
649			}
650		}
651
652		None
653	}
654
655	/// Retrieve the block hash for the last indexed canon block.
656	pub async fn last_indexed_canon_block(&self) -> Result<H256, Error> {
657		let row = sqlx::query(
658			"SELECT b.substrate_block_hash FROM blocks AS b
659			INNER JOIN sync_status AS s
660			ON s.substrate_block_hash = b.substrate_block_hash
661			WHERE b.is_canon=1 AND s.status = 1
662			ORDER BY b.id DESC LIMIT 1",
663		)
664		.fetch_one(self.pool())
665		.await?;
666		Ok(H256::from_slice(
667			&row.try_get::<Vec<u8>, _>(0).unwrap_or_default()[..],
668		))
669	}
670
671	/// Create the Sqlite database if it does not already exist.
672	async fn create_database_if_not_exists(pool: &SqlitePool) -> Result<SqliteQueryResult, Error> {
673		sqlx::query(
674			"BEGIN;
675			CREATE TABLE IF NOT EXISTS logs (
676				id INTEGER PRIMARY KEY,
677				address BLOB NOT NULL,
678				topic_1 BLOB,
679				topic_2 BLOB,
680				topic_3 BLOB,
681				topic_4 BLOB,
682				log_index INTEGER NOT NULL,
683				transaction_index INTEGER NOT NULL,
684				substrate_block_hash BLOB NOT NULL,
685				UNIQUE (
686					log_index,
687					transaction_index,
688					substrate_block_hash
689				)
690			);
691			CREATE TABLE IF NOT EXISTS sync_status (
692				id INTEGER PRIMARY KEY,
693				substrate_block_hash BLOB NOT NULL,
694				status INTEGER DEFAULT 0 NOT NULL,
695				UNIQUE (
696					substrate_block_hash
697				)
698			);
699			CREATE TABLE IF NOT EXISTS blocks (
700				id INTEGER PRIMARY KEY,
701				block_number INTEGER NOT NULL,
702				ethereum_block_hash BLOB NOT NULL,
703				substrate_block_hash BLOB NOT NULL,
704				ethereum_storage_schema BLOB NOT NULL,
705				is_canon INTEGER NOT NULL,
706				UNIQUE (
707					ethereum_block_hash,
708					substrate_block_hash
709				)
710			);
711			CREATE TABLE IF NOT EXISTS transactions (
712				id INTEGER PRIMARY KEY,
713				ethereum_transaction_hash BLOB NOT NULL,
714				substrate_block_hash BLOB NOT NULL,
715				ethereum_block_hash BLOB NOT NULL,
716				ethereum_transaction_index INTEGER NOT NULL,
717				UNIQUE (
718					ethereum_transaction_hash,
719					substrate_block_hash
720				)
721			);
722			COMMIT;",
723		)
724		.execute(pool)
725		.await
726	}
727
728	/// Create the Sqlite database indices if it does not already exist.
729	async fn create_indexes_if_not_exist(pool: &SqlitePool) -> Result<SqliteQueryResult, Error> {
730		sqlx::query(
731			"BEGIN;
732			CREATE INDEX IF NOT EXISTS logs_main_idx ON logs (
733				address,
734				topic_1,
735				topic_2,
736				topic_3,
737				topic_4
738			);
739			CREATE INDEX IF NOT EXISTS logs_substrate_index ON logs (
740				substrate_block_hash
741			);
742			CREATE INDEX IF NOT EXISTS blocks_number_index ON blocks (
743				block_number
744			);
745			CREATE INDEX IF NOT EXISTS blocks_substrate_index ON blocks (
746				substrate_block_hash
747			);
748			CREATE INDEX IF NOT EXISTS eth_block_hash_idx ON blocks (
749				ethereum_block_hash
750			);
751			CREATE INDEX IF NOT EXISTS eth_tx_hash_idx ON transactions (
752				ethereum_transaction_hash
753			);
754			CREATE INDEX IF NOT EXISTS eth_tx_hash_2_idx ON transactions (
755				ethereum_block_hash,
756				ethereum_transaction_index
757			);
758			COMMIT;",
759		)
760		.execute(pool)
761		.await
762	}
763}
764
765#[async_trait::async_trait]
766impl<Block: BlockT<Hash = H256>> fc_api::Backend<Block> for Backend<Block> {
767	async fn block_hash(
768		&self,
769		ethereum_block_hash: &H256,
770	) -> Result<Option<Vec<Block::Hash>>, String> {
771		let ethereum_block_hash = ethereum_block_hash.as_bytes();
772		let res =
773			sqlx::query("SELECT substrate_block_hash FROM blocks WHERE ethereum_block_hash = ?")
774				.bind(ethereum_block_hash)
775				.fetch_all(&self.pool)
776				.await
777				.ok()
778				.map(|rows| {
779					rows.iter()
780						.map(|row| {
781							H256::from_slice(&row.try_get::<Vec<u8>, _>(0).unwrap_or_default()[..])
782						})
783						.collect()
784				});
785		Ok(res)
786	}
787
788	async fn transaction_metadata(
789		&self,
790		ethereum_transaction_hash: &H256,
791	) -> Result<Vec<TransactionMetadata<Block>>, String> {
792		let ethereum_transaction_hash = ethereum_transaction_hash.as_bytes();
793		let out = sqlx::query(
794			"SELECT
795				substrate_block_hash, ethereum_block_hash, ethereum_transaction_index
796			FROM transactions WHERE ethereum_transaction_hash = ?",
797		)
798		.bind(ethereum_transaction_hash)
799		.fetch_all(&self.pool)
800		.await
801		.unwrap_or_default()
802		.iter()
803		.map(|row| {
804			let substrate_block_hash =
805				H256::from_slice(&row.try_get::<Vec<u8>, _>(0).unwrap_or_default()[..]);
806			let ethereum_block_hash =
807				H256::from_slice(&row.try_get::<Vec<u8>, _>(1).unwrap_or_default()[..]);
808			let ethereum_transaction_index = row.try_get::<i32, _>(2).unwrap_or_default() as u32;
809			TransactionMetadata {
810				substrate_block_hash,
811				ethereum_block_hash,
812				ethereum_index: ethereum_transaction_index,
813			}
814		})
815		.collect();
816
817		Ok(out)
818	}
819
820	fn log_indexer(&self) -> &dyn fc_api::LogIndexerBackend<Block> {
821		self
822	}
823
824	async fn first_block_hash(&self) -> Result<Block::Hash, String> {
825		// Retrieves the block hash for the earliest indexed block, maybe it's not canon.
826		sqlx::query("SELECT substrate_block_hash FROM blocks ORDER BY block_number ASC LIMIT 1")
827			.fetch_one(self.pool())
828			.await
829			.map(|row| H256::from_slice(&row.get::<Vec<u8>, _>(0)[..]))
830			.map_err(|e| format!("Failed to fetch oldest block hash: {e}"))
831	}
832
833	async fn latest_block_hash(&self) -> Result<Block::Hash, String> {
834		// Retrieves the block hash for the latest indexed block, maybe it's not canon.
835		sqlx::query("SELECT substrate_block_hash FROM blocks ORDER BY block_number DESC LIMIT 1")
836			.fetch_one(self.pool())
837			.await
838			.map(|row| H256::from_slice(&row.get::<Vec<u8>, _>(0)[..]))
839			.map_err(|e| format!("Failed to fetch best hash: {e}"))
840	}
841}
842
843#[async_trait::async_trait]
844impl<Block: BlockT<Hash = H256>> fc_api::LogIndexerBackend<Block> for Backend<Block> {
845	fn is_indexed(&self) -> bool {
846		true
847	}
848
849	async fn filter_logs(
850		&self,
851		from_block: u64,
852		to_block: u64,
853		addresses: Vec<H160>,
854		topics: Vec<Vec<Option<H256>>>,
855	) -> Result<Vec<FilteredLog<Block>>, String> {
856		let mut unique_topics: [HashSet<H256>; 4] = [
857			HashSet::new(),
858			HashSet::new(),
859			HashSet::new(),
860			HashSet::new(),
861		];
862		for topic_combination in topics.into_iter() {
863			for (topic_index, topic) in topic_combination.into_iter().enumerate() {
864				if topic_index == MAX_TOPIC_COUNT as usize {
865					return Err("Invalid topic input. Maximum length is 4.".to_string());
866				}
867
868				if let Some(topic) = topic {
869					unique_topics[topic_index].insert(topic);
870				}
871			}
872		}
873
874		let log_key = format!("{from_block}-{to_block}-{addresses:?}-{unique_topics:?}");
875		let mut qb = QueryBuilder::new("");
876		let query = build_query(&mut qb, from_block, to_block, addresses, unique_topics);
877		let sql = query.sql();
878
879		let mut conn = self
880			.pool()
881			.acquire()
882			.await
883			.map_err(|err| format!("failed acquiring sqlite connection: {err}"))?;
884		let log_key2 = log_key.clone();
885		conn.lock_handle()
886			.await
887			.map_err(|err| format!("{err:?}"))?
888			.set_progress_handler(self.num_ops_timeout, move || {
889				log::debug!(target: "frontier-sql", "Sqlite progress_handler triggered for {log_key2}");
890				false
891			});
892		log::debug!(target: "frontier-sql", "Query: {sql:?} - {log_key}");
893
894		let mut out: Vec<FilteredLog<Block>> = vec![];
895		let mut rows = query.fetch(&mut *conn);
896		let maybe_err = loop {
897			match rows.try_next().await {
898				Ok(Some(row)) => {
899					// Substrate block hash
900					let substrate_block_hash =
901						H256::from_slice(&row.try_get::<Vec<u8>, _>(0).unwrap_or_default()[..]);
902					// Ethereum block hash
903					let ethereum_block_hash =
904						H256::from_slice(&row.try_get::<Vec<u8>, _>(1).unwrap_or_default()[..]);
905					// Block number
906					let block_number = row.try_get::<i32, _>(2).unwrap_or_default() as u32;
907					// Ethereum storage schema
908					let ethereum_storage_schema: EthereumStorageSchema =
909						Decode::decode(&mut &row.try_get::<Vec<u8>, _>(3).unwrap_or_default()[..])
910							.map_err(|_| {
911								"Cannot decode EthereumStorageSchema for block".to_string()
912							})?;
913					// Transaction index
914					let transaction_index = row.try_get::<i32, _>(4).unwrap_or_default() as u32;
915					// Log index
916					let log_index = row.try_get::<i32, _>(5).unwrap_or_default() as u32;
917					out.push(FilteredLog {
918						substrate_block_hash,
919						ethereum_block_hash,
920						block_number,
921						ethereum_storage_schema,
922						transaction_index,
923						log_index,
924					});
925				}
926				Ok(None) => break None, // no more rows
927				Err(err) => break Some(err),
928			};
929		};
930		drop(rows);
931		conn.lock_handle()
932			.await
933			.map_err(|err| format!("{err:?}"))?
934			.remove_progress_handler();
935
936		if let Some(err) = maybe_err {
937			log::error!(target: "frontier-sql", "Failed to query sql db: {err:?} - {log_key}");
938			return Err("Failed to query sql db with statement".to_string());
939		}
940
941		log::info!(target: "frontier-sql", "FILTER remove handler - {log_key}");
942		Ok(out)
943	}
944}
945
946/// Build a SQL query to retrieve a list of logs given certain constraints.
947fn build_query<'a>(
948	qb: &'a mut QueryBuilder<Sqlite>,
949	from_block: u64,
950	to_block: u64,
951	addresses: Vec<H160>,
952	topics: [HashSet<H256>; 4],
953) -> Query<'a, Sqlite, SqliteArguments<'a>> {
954	qb.push(
955		"
956SELECT
957	l.substrate_block_hash,
958	b.ethereum_block_hash,
959	b.block_number,
960	b.ethereum_storage_schema,
961	l.transaction_index,
962	l.log_index
963FROM logs AS l
964INNER JOIN blocks AS b
965ON (b.block_number BETWEEN ",
966	);
967	qb.separated(" AND ")
968		.push_bind(from_block as i64)
969		.push_bind(to_block as i64)
970		.push_unseparated(")");
971	qb.push(" AND b.substrate_block_hash = l.substrate_block_hash")
972		.push(" AND b.is_canon = 1")
973		.push("\nWHERE 1");
974
975	if !addresses.is_empty() {
976		qb.push(" AND l.address IN (");
977		let mut qb_addr = qb.separated(", ");
978		addresses.iter().for_each(|addr| {
979			qb_addr.push_bind(addr.as_bytes().to_owned());
980		});
981		qb_addr.push_unseparated(")");
982	}
983
984	for (i, topic_options) in topics.iter().enumerate() {
985		match topic_options.len().cmp(&1) {
986			Ordering::Greater => {
987				qb.push(format!(" AND l.topic_{} IN (", i + 1));
988				let mut qb_topic = qb.separated(", ");
989				topic_options.iter().for_each(|t| {
990					qb_topic.push_bind(t.as_bytes().to_owned());
991				});
992				qb_topic.push_unseparated(")");
993			}
994			Ordering::Equal => {
995				qb.push(format!(" AND l.topic_{} = ", i + 1)).push_bind(
996					topic_options
997						.iter()
998						.next()
999						.expect("length is 1, must exist; qed")
1000						.as_bytes()
1001						.to_owned(),
1002				);
1003			}
1004			Ordering::Less => {}
1005		}
1006	}
1007
1008	qb.push(
1009		"
1010ORDER BY b.block_number ASC, l.transaction_index ASC, l.log_index ASC
1011LIMIT 10001",
1012	);
1013
1014	qb.build()
1015}
1016
1017#[cfg(test)]
1018mod test {
1019	use super::*;
1020
1021	use std::path::Path;
1022
1023	use maplit::hashset;
1024	use scale_codec::Encode;
1025	use sqlx::{sqlite::SqliteRow, QueryBuilder, Row, SqlitePool};
1026	use tempfile::tempdir;
1027	// Substrate
1028	use sp_core::{H160, H256};
1029	use sp_runtime::{
1030		generic::{Block, Header},
1031		traits::BlakeTwo256,
1032	};
1033	use substrate_test_runtime_client::{
1034		DefaultTestClientBuilderExt, TestClientBuilder, TestClientBuilderExt,
1035	};
1036	// Frontier
1037	use fc_api::Backend as BackendT;
1038	use fc_storage::SchemaV3StorageOverride;
1039	use fp_storage::{EthereumStorageSchema, PALLET_ETHEREUM_SCHEMA};
1040
1041	type OpaqueBlock =
1042		Block<Header<u64, BlakeTwo256>, substrate_test_runtime_client::runtime::Extrinsic>;
1043
1044	struct TestFilter {
1045		pub from_block: u64,
1046		pub to_block: u64,
1047		pub addresses: Vec<H160>,
1048		pub topics: Vec<Vec<Option<H256>>>,
1049		pub expected_result: Vec<FilteredLog<OpaqueBlock>>,
1050	}
1051
1052	#[derive(Debug, Clone)]
1053	struct Log {
1054		block_number: u32,
1055		address: H160,
1056		topics: [H256; 4],
1057		substrate_block_hash: H256,
1058		ethereum_block_hash: H256,
1059		transaction_index: u32,
1060		log_index: u32,
1061	}
1062
1063	#[allow(unused)]
1064	struct TestData {
1065		backend: Backend<OpaqueBlock>,
1066		alice: H160,
1067		bob: H160,
1068		topics_a: H256,
1069		topics_b: H256,
1070		topics_c: H256,
1071		topics_d: H256,
1072		substrate_hash_1: H256,
1073		substrate_hash_2: H256,
1074		substrate_hash_3: H256,
1075		ethereum_hash_1: H256,
1076		ethereum_hash_2: H256,
1077		ethereum_hash_3: H256,
1078		log_1_abcd_0_0_alice: Log,
1079		log_1_dcba_1_0_alice: Log,
1080		log_1_badc_2_0_alice: Log,
1081		log_2_abcd_0_0_bob: Log,
1082		log_2_dcba_1_0_bob: Log,
1083		log_2_badc_2_0_bob: Log,
1084		log_3_abcd_0_0_bob: Log,
1085		log_3_dcba_1_0_bob: Log,
1086		log_3_badc_2_0_bob: Log,
1087	}
1088
1089	impl From<Log> for FilteredLog<OpaqueBlock> {
1090		fn from(value: Log) -> Self {
1091			Self {
1092				substrate_block_hash: value.substrate_block_hash,
1093				ethereum_block_hash: value.ethereum_block_hash,
1094				block_number: value.block_number,
1095				ethereum_storage_schema: EthereumStorageSchema::V3,
1096				transaction_index: value.transaction_index,
1097				log_index: value.log_index,
1098			}
1099		}
1100	}
1101
1102	async fn prepare() -> TestData {
1103		let tmp = tempdir().expect("create a temporary directory");
1104		// Initialize storage with schema V3
1105		let builder = TestClientBuilder::new().add_extra_storage(
1106			PALLET_ETHEREUM_SCHEMA.to_vec(),
1107			Encode::encode(&EthereumStorageSchema::V3),
1108		);
1109		// Client
1110		let (client, _) = builder
1111			.build_with_native_executor::<substrate_test_runtime_client::runtime::RuntimeApi, _>(
1112				None,
1113			);
1114		let client = Arc::new(client);
1115		// Overrides
1116		let storage_override = Arc::new(SchemaV3StorageOverride::new(client.clone()));
1117		// Indexer backend
1118		let indexer_backend = Backend::new(
1119			BackendConfig::Sqlite(SqliteBackendConfig {
1120				path: Path::new("sqlite:///")
1121					.join(tmp.path())
1122					.join("test.db3")
1123					.to_str()
1124					.unwrap(),
1125				create_if_missing: true,
1126				cache_size: 20480,
1127				thread_count: 4,
1128			}),
1129			1,
1130			None,
1131			storage_override.clone(),
1132		)
1133		.await
1134		.expect("indexer pool to be created");
1135
1136		// Prepare test db data
1137		// Addresses
1138		let alice = H160::repeat_byte(0x01);
1139		let bob = H160::repeat_byte(0x02);
1140		// Topics
1141		let topics_a = H256::repeat_byte(0x01);
1142		let topics_b = H256::repeat_byte(0x02);
1143		let topics_c = H256::repeat_byte(0x03);
1144		let topics_d = H256::repeat_byte(0x04);
1145		// Substrate block hashes
1146		let substrate_hash_1 = H256::repeat_byte(0x05);
1147		let substrate_hash_2 = H256::repeat_byte(0x06);
1148		let substrate_hash_3 = H256::repeat_byte(0x07);
1149		// Ethereum block hashes
1150		let ethereum_hash_1 = H256::repeat_byte(0x08);
1151		let ethereum_hash_2 = H256::repeat_byte(0x09);
1152		let ethereum_hash_3 = H256::repeat_byte(0x0a);
1153		// Ethereum storage schema
1154		let ethereum_storage_schema = EthereumStorageSchema::V3;
1155
1156		let block_entries = vec![
1157			// Block 1
1158			(
1159				1i32,
1160				ethereum_hash_1,
1161				substrate_hash_1,
1162				ethereum_storage_schema,
1163			),
1164			// Block 2
1165			(
1166				2i32,
1167				ethereum_hash_2,
1168				substrate_hash_2,
1169				ethereum_storage_schema,
1170			),
1171			// Block 3
1172			(
1173				3i32,
1174				ethereum_hash_3,
1175				substrate_hash_3,
1176				ethereum_storage_schema,
1177			),
1178		];
1179		let mut builder = QueryBuilder::new(
1180			"INSERT INTO blocks(
1181				block_number,
1182				ethereum_block_hash,
1183				substrate_block_hash,
1184				ethereum_storage_schema,
1185				is_canon
1186			)",
1187		);
1188		builder.push_values(block_entries, |mut b, entry| {
1189			let block_number = entry.0;
1190			let ethereum_block_hash = entry.1.as_bytes().to_owned();
1191			let substrate_block_hash = entry.2.as_bytes().to_owned();
1192			let ethereum_storage_schema = entry.3.encode();
1193
1194			b.push_bind(block_number);
1195			b.push_bind(ethereum_block_hash);
1196			b.push_bind(substrate_block_hash);
1197			b.push_bind(ethereum_storage_schema);
1198			b.push_bind(1i32);
1199		});
1200		let query = builder.build();
1201		let _ = query
1202			.execute(indexer_backend.pool())
1203			.await
1204			.expect("insert should succeed");
1205
1206		// log_{BLOCK}_{TOPICS}_{LOG_INDEX}_{TX_INDEX}
1207		let log_1_abcd_0_0_alice = Log {
1208			block_number: 1,
1209			address: alice,
1210			topics: [topics_a, topics_b, topics_c, topics_d],
1211			log_index: 0,
1212			transaction_index: 0,
1213			substrate_block_hash: substrate_hash_1,
1214			ethereum_block_hash: ethereum_hash_1,
1215		};
1216		let log_1_dcba_1_0_alice = Log {
1217			block_number: 1,
1218			address: alice,
1219			topics: [topics_d, topics_c, topics_b, topics_a],
1220			log_index: 1,
1221			transaction_index: 0,
1222			substrate_block_hash: substrate_hash_1,
1223			ethereum_block_hash: ethereum_hash_1,
1224		};
1225		let log_1_badc_2_0_alice = Log {
1226			block_number: 1,
1227			address: alice,
1228			topics: [topics_b, topics_a, topics_d, topics_c],
1229			log_index: 2,
1230			transaction_index: 0,
1231			substrate_block_hash: substrate_hash_1,
1232			ethereum_block_hash: ethereum_hash_1,
1233		};
1234		let log_2_abcd_0_0_bob = Log {
1235			block_number: 2,
1236			address: bob,
1237			topics: [topics_a, topics_b, topics_c, topics_d],
1238			log_index: 0,
1239			transaction_index: 0,
1240			substrate_block_hash: substrate_hash_2,
1241			ethereum_block_hash: ethereum_hash_2,
1242		};
1243		let log_2_dcba_1_0_bob = Log {
1244			block_number: 2,
1245			address: bob,
1246			topics: [topics_d, topics_c, topics_b, topics_a],
1247			log_index: 1,
1248			transaction_index: 0,
1249			substrate_block_hash: substrate_hash_2,
1250			ethereum_block_hash: ethereum_hash_2,
1251		};
1252		let log_2_badc_2_0_bob = Log {
1253			block_number: 2,
1254			address: bob,
1255			topics: [topics_b, topics_a, topics_d, topics_c],
1256			log_index: 2,
1257			transaction_index: 0,
1258			substrate_block_hash: substrate_hash_2,
1259			ethereum_block_hash: ethereum_hash_2,
1260		};
1261
1262		let log_3_abcd_0_0_bob = Log {
1263			block_number: 3,
1264			address: bob,
1265			topics: [topics_a, topics_b, topics_c, topics_d],
1266			log_index: 0,
1267			transaction_index: 0,
1268			substrate_block_hash: substrate_hash_3,
1269			ethereum_block_hash: ethereum_hash_3,
1270		};
1271		let log_3_dcba_1_0_bob = Log {
1272			block_number: 3,
1273			address: bob,
1274			topics: [topics_d, topics_c, topics_b, topics_a],
1275			log_index: 1,
1276			transaction_index: 0,
1277			substrate_block_hash: substrate_hash_3,
1278			ethereum_block_hash: ethereum_hash_3,
1279		};
1280		let log_3_badc_2_0_bob = Log {
1281			block_number: 3,
1282			address: bob,
1283			topics: [topics_b, topics_a, topics_d, topics_c],
1284			log_index: 2,
1285			transaction_index: 0,
1286			substrate_block_hash: substrate_hash_3,
1287			ethereum_block_hash: ethereum_hash_3,
1288		};
1289
1290		let log_entries = vec![
1291			// Block 1
1292			log_1_abcd_0_0_alice.clone(),
1293			log_1_dcba_1_0_alice.clone(),
1294			log_1_badc_2_0_alice.clone(),
1295			// Block 2
1296			log_2_abcd_0_0_bob.clone(),
1297			log_2_dcba_1_0_bob.clone(),
1298			log_2_badc_2_0_bob.clone(),
1299			// Block 3
1300			log_3_abcd_0_0_bob.clone(),
1301			log_3_dcba_1_0_bob.clone(),
1302			log_3_badc_2_0_bob.clone(),
1303		];
1304
1305		let mut builder: QueryBuilder<sqlx::Sqlite> = QueryBuilder::new(
1306			"INSERT INTO logs(
1307				address,
1308				topic_1,
1309				topic_2,
1310				topic_3,
1311				topic_4,
1312				log_index,
1313				transaction_index,
1314				substrate_block_hash
1315			)",
1316		);
1317		builder.push_values(log_entries, |mut b, entry| {
1318			let address = entry.address.as_bytes().to_owned();
1319			let topic_1 = entry.topics[0].as_bytes().to_owned();
1320			let topic_2 = entry.topics[1].as_bytes().to_owned();
1321			let topic_3 = entry.topics[2].as_bytes().to_owned();
1322			let topic_4 = entry.topics[3].as_bytes().to_owned();
1323			let log_index = entry.log_index;
1324			let transaction_index = entry.transaction_index;
1325			let substrate_block_hash = entry.substrate_block_hash.as_bytes().to_owned();
1326
1327			b.push_bind(address);
1328			b.push_bind(topic_1);
1329			b.push_bind(topic_2);
1330			b.push_bind(topic_3);
1331			b.push_bind(topic_4);
1332			b.push_bind(log_index);
1333			b.push_bind(transaction_index);
1334			b.push_bind(substrate_block_hash);
1335		});
1336		let query = builder.build();
1337		let _ = query.execute(indexer_backend.pool()).await;
1338
1339		TestData {
1340			alice,
1341			bob,
1342			topics_a,
1343			topics_b,
1344			topics_c,
1345			topics_d,
1346			substrate_hash_1,
1347			substrate_hash_2,
1348			substrate_hash_3,
1349			ethereum_hash_1,
1350			ethereum_hash_2,
1351			ethereum_hash_3,
1352			backend: indexer_backend,
1353			log_1_abcd_0_0_alice,
1354			log_1_dcba_1_0_alice,
1355			log_1_badc_2_0_alice,
1356			log_2_abcd_0_0_bob,
1357			log_2_dcba_1_0_bob,
1358			log_2_badc_2_0_bob,
1359			log_3_abcd_0_0_bob,
1360			log_3_dcba_1_0_bob,
1361			log_3_badc_2_0_bob,
1362		}
1363	}
1364
1365	async fn run_test_case(
1366		backend: Backend<OpaqueBlock>,
1367		test_case: &TestFilter,
1368	) -> Result<Vec<FilteredLog<OpaqueBlock>>, String> {
1369		backend
1370			.log_indexer()
1371			.filter_logs(
1372				test_case.from_block,
1373				test_case.to_block,
1374				test_case.addresses.clone(),
1375				test_case.topics.clone(),
1376			)
1377			.await
1378	}
1379
1380	async fn assert_blocks_canon(pool: &SqlitePool, expected: Vec<(H256, u32)>) {
1381		let actual: Vec<(H256, u32)> =
1382			sqlx::query("SELECT substrate_block_hash, is_canon FROM blocks")
1383				.map(|row: SqliteRow| (H256::from_slice(&row.get::<Vec<u8>, _>(0)[..]), row.get(1)))
1384				.fetch_all(pool)
1385				.await
1386				.expect("sql query must succeed");
1387		assert_eq!(expected, actual);
1388	}
1389
1390	#[tokio::test]
1391	async fn genesis_works() {
1392		let TestData { backend, .. } = prepare().await;
1393		let filter = TestFilter {
1394			from_block: 0,
1395			to_block: 0,
1396			addresses: vec![],
1397			topics: vec![],
1398			expected_result: vec![],
1399		};
1400		let result = run_test_case(backend, &filter).await.expect("must succeed");
1401		assert_eq!(result, filter.expected_result);
1402	}
1403
1404	#[tokio::test]
1405	async fn unsanitized_input_works() {
1406		let TestData { backend, .. } = prepare().await;
1407		let filter = TestFilter {
1408			from_block: 0,
1409			to_block: 0,
1410			addresses: vec![],
1411			topics: vec![vec![None], vec![None, None, None]],
1412			expected_result: vec![],
1413		};
1414		let result = run_test_case(backend, &filter).await.expect("must succeed");
1415		assert_eq!(result, filter.expected_result);
1416	}
1417
1418	#[tokio::test]
1419	async fn invalid_topic_input_size_fails() {
1420		let TestData {
1421			backend, topics_a, ..
1422		} = prepare().await;
1423		let filter = TestFilter {
1424			from_block: 0,
1425			to_block: 0,
1426			addresses: vec![],
1427			topics: vec![
1428				vec![Some(topics_a), None, None, None, None],
1429				vec![Some(topics_a), None, None, None],
1430			],
1431			expected_result: vec![],
1432		};
1433		run_test_case(backend, &filter)
1434			.await
1435			.expect_err("Invalid topic input. Maximum length is 4.");
1436	}
1437
1438	#[tokio::test]
1439	async fn test_malformed_topic_cleans_invalid_options() {
1440		let TestData {
1441			backend,
1442			topics_a,
1443			topics_b,
1444			topics_d,
1445			log_1_badc_2_0_alice,
1446			..
1447		} = prepare().await;
1448
1449		// [(a,null,b), (a, null), (d,null), null] -> [(a,b), a, d]
1450		let filter = TestFilter {
1451			from_block: 0,
1452			to_block: 1,
1453			addresses: vec![],
1454			topics: vec![
1455				vec![Some(topics_a), None, Some(topics_d)],
1456				vec![None], // not considered
1457				vec![Some(topics_b), Some(topics_a), None],
1458				vec![None, None, None, None], // not considered
1459			],
1460			expected_result: vec![log_1_badc_2_0_alice.into()],
1461		};
1462		let result = run_test_case(backend, &filter).await.expect("must succeed");
1463		assert_eq!(result, filter.expected_result);
1464	}
1465
1466	#[tokio::test]
1467	async fn block_range_works() {
1468		let TestData {
1469			backend,
1470			log_1_abcd_0_0_alice,
1471			log_1_dcba_1_0_alice,
1472			log_1_badc_2_0_alice,
1473			log_2_abcd_0_0_bob,
1474			log_2_dcba_1_0_bob,
1475			log_2_badc_2_0_bob,
1476			..
1477		} = prepare().await;
1478
1479		let filter = TestFilter {
1480			from_block: 0,
1481			to_block: 2,
1482			addresses: vec![],
1483			topics: vec![],
1484			expected_result: vec![
1485				log_1_abcd_0_0_alice.into(),
1486				log_1_dcba_1_0_alice.into(),
1487				log_1_badc_2_0_alice.into(),
1488				log_2_abcd_0_0_bob.into(),
1489				log_2_dcba_1_0_bob.into(),
1490				log_2_badc_2_0_bob.into(),
1491			],
1492		};
1493		let result = run_test_case(backend, &filter).await.expect("must succeed");
1494		assert_eq!(result, filter.expected_result);
1495	}
1496
1497	#[tokio::test]
1498	async fn address_filter_works() {
1499		let TestData {
1500			backend,
1501			alice,
1502			log_1_abcd_0_0_alice,
1503			log_1_dcba_1_0_alice,
1504			log_1_badc_2_0_alice,
1505			..
1506		} = prepare().await;
1507		let filter = TestFilter {
1508			from_block: 0,
1509			to_block: 3,
1510			addresses: vec![alice],
1511			topics: vec![],
1512			expected_result: vec![
1513				log_1_abcd_0_0_alice.into(),
1514				log_1_dcba_1_0_alice.into(),
1515				log_1_badc_2_0_alice.into(),
1516			],
1517		};
1518		let result = run_test_case(backend, &filter).await.expect("must succeed");
1519		assert_eq!(result, filter.expected_result);
1520	}
1521
1522	#[tokio::test]
1523	async fn topic_filter_works() {
1524		let TestData {
1525			backend,
1526			topics_d,
1527			log_1_dcba_1_0_alice,
1528			log_2_dcba_1_0_bob,
1529			log_3_dcba_1_0_bob,
1530			..
1531		} = prepare().await;
1532		let filter = TestFilter {
1533			from_block: 0,
1534			to_block: 3,
1535			addresses: vec![],
1536			topics: vec![vec![Some(topics_d)]],
1537			expected_result: vec![
1538				log_1_dcba_1_0_alice.into(),
1539				log_2_dcba_1_0_bob.into(),
1540				log_3_dcba_1_0_bob.into(),
1541			],
1542		};
1543		let result = run_test_case(backend, &filter).await.expect("must succeed");
1544		assert_eq!(result, filter.expected_result);
1545	}
1546
1547	#[tokio::test]
1548	async fn test_filters_address_and_topic() {
1549		let TestData {
1550			backend,
1551			bob,
1552			topics_b,
1553			log_2_badc_2_0_bob,
1554			log_3_badc_2_0_bob,
1555			..
1556		} = prepare().await;
1557		let filter = TestFilter {
1558			from_block: 0,
1559			to_block: 3,
1560			addresses: vec![bob],
1561			topics: vec![vec![Some(topics_b)]],
1562			expected_result: vec![log_2_badc_2_0_bob.into(), log_3_badc_2_0_bob.into()],
1563		};
1564		let result = run_test_case(backend, &filter).await.expect("must succeed");
1565		assert_eq!(result, filter.expected_result);
1566	}
1567
1568	#[tokio::test]
1569	async fn test_filters_multi_address_and_topic() {
1570		let TestData {
1571			backend,
1572			alice,
1573			bob,
1574			topics_b,
1575			log_1_badc_2_0_alice,
1576			log_2_badc_2_0_bob,
1577			log_3_badc_2_0_bob,
1578			..
1579		} = prepare().await;
1580		let filter = TestFilter {
1581			from_block: 0,
1582			to_block: 3,
1583			addresses: vec![alice, bob],
1584			topics: vec![vec![Some(topics_b)]],
1585			expected_result: vec![
1586				log_1_badc_2_0_alice.into(),
1587				log_2_badc_2_0_bob.into(),
1588				log_3_badc_2_0_bob.into(),
1589			],
1590		};
1591		let result = run_test_case(backend, &filter).await.expect("must succeed");
1592		assert_eq!(result, filter.expected_result);
1593	}
1594
1595	#[tokio::test]
1596	async fn test_filters_multi_address_and_multi_topic() {
1597		let TestData {
1598			backend,
1599			alice,
1600			bob,
1601			topics_a,
1602			topics_b,
1603			log_1_abcd_0_0_alice,
1604			log_2_abcd_0_0_bob,
1605			log_3_abcd_0_0_bob,
1606			..
1607		} = prepare().await;
1608		let filter = TestFilter {
1609			from_block: 0,
1610			to_block: 3,
1611			addresses: vec![alice, bob],
1612			topics: vec![vec![Some(topics_a), Some(topics_b)]],
1613			expected_result: vec![
1614				log_1_abcd_0_0_alice.into(),
1615				log_2_abcd_0_0_bob.into(),
1616				log_3_abcd_0_0_bob.into(),
1617			],
1618		};
1619		let result = run_test_case(backend, &filter).await.expect("must succeed");
1620		assert_eq!(result, filter.expected_result);
1621	}
1622
1623	#[tokio::test]
1624	async fn filter_with_topic_wildcards_works() {
1625		let TestData {
1626			backend,
1627			alice,
1628			bob,
1629			topics_d,
1630			topics_b,
1631			log_1_dcba_1_0_alice,
1632			log_2_dcba_1_0_bob,
1633			log_3_dcba_1_0_bob,
1634			..
1635		} = prepare().await;
1636		let filter = TestFilter {
1637			from_block: 0,
1638			to_block: 3,
1639			addresses: vec![alice, bob],
1640			topics: vec![vec![Some(topics_d), None, Some(topics_b)]],
1641			expected_result: vec![
1642				log_1_dcba_1_0_alice.into(),
1643				log_2_dcba_1_0_bob.into(),
1644				log_3_dcba_1_0_bob.into(),
1645			],
1646		};
1647		let result = run_test_case(backend, &filter).await.expect("must succeed");
1648		assert_eq!(result, filter.expected_result);
1649	}
1650
1651	#[tokio::test]
1652	async fn trailing_wildcard_is_useless_but_works() {
1653		let TestData {
1654			alice,
1655			backend,
1656			topics_b,
1657			log_1_dcba_1_0_alice,
1658			..
1659		} = prepare().await;
1660		let filter = TestFilter {
1661			from_block: 0,
1662			to_block: 1,
1663			addresses: vec![alice],
1664			topics: vec![vec![None, None, Some(topics_b), None]],
1665			expected_result: vec![log_1_dcba_1_0_alice.into()],
1666		};
1667		let result = run_test_case(backend, &filter).await.expect("must succeed");
1668		assert_eq!(result, filter.expected_result);
1669	}
1670
1671	#[tokio::test]
1672	async fn filter_with_multi_topic_options_works() {
1673		let TestData {
1674			backend,
1675			topics_a,
1676			topics_d,
1677			log_1_abcd_0_0_alice,
1678			log_1_dcba_1_0_alice,
1679			log_2_abcd_0_0_bob,
1680			log_2_dcba_1_0_bob,
1681			log_3_abcd_0_0_bob,
1682			log_3_dcba_1_0_bob,
1683			..
1684		} = prepare().await;
1685		let filter = TestFilter {
1686			from_block: 0,
1687			to_block: 3,
1688			addresses: vec![],
1689			topics: vec![
1690				vec![Some(topics_a)],
1691				vec![Some(topics_d)],
1692				vec![Some(topics_d)], // duplicate, ignored
1693			],
1694			expected_result: vec![
1695				log_1_abcd_0_0_alice.into(),
1696				log_1_dcba_1_0_alice.into(),
1697				log_2_abcd_0_0_bob.into(),
1698				log_2_dcba_1_0_bob.into(),
1699				log_3_abcd_0_0_bob.into(),
1700				log_3_dcba_1_0_bob.into(),
1701			],
1702		};
1703		let result = run_test_case(backend, &filter).await.expect("must succeed");
1704		assert_eq!(result, filter.expected_result);
1705	}
1706
1707	#[tokio::test]
1708	async fn filter_with_multi_topic_options_and_wildcards_works() {
1709		let TestData {
1710			backend,
1711			bob,
1712			topics_a,
1713			topics_b,
1714			topics_c,
1715			topics_d,
1716			log_2_dcba_1_0_bob,
1717			log_2_badc_2_0_bob,
1718			log_3_dcba_1_0_bob,
1719			log_3_badc_2_0_bob,
1720			..
1721		} = prepare().await;
1722		let filter = TestFilter {
1723			from_block: 0,
1724			to_block: 3,
1725			addresses: vec![bob],
1726			// Product on input [null,null,(b,d),(a,c)].
1727			topics: vec![
1728				vec![None, None, Some(topics_b), Some(topics_a)],
1729				vec![None, None, Some(topics_b), Some(topics_c)],
1730				vec![None, None, Some(topics_d), Some(topics_a)],
1731				vec![None, None, Some(topics_d), Some(topics_c)],
1732			],
1733			expected_result: vec![
1734				log_2_dcba_1_0_bob.into(),
1735				log_2_badc_2_0_bob.into(),
1736				log_3_dcba_1_0_bob.into(),
1737				log_3_badc_2_0_bob.into(),
1738			],
1739		};
1740		let result = run_test_case(backend, &filter).await.expect("must succeed");
1741		assert_eq!(result, filter.expected_result);
1742	}
1743
1744	#[tokio::test]
1745	async fn test_canonicalize_sets_canon_flag_for_redacted_and_enacted_blocks_correctly() {
1746		let TestData {
1747			backend,
1748			substrate_hash_1,
1749			substrate_hash_2,
1750			substrate_hash_3,
1751			..
1752		} = prepare().await;
1753
1754		// set block #1 to non canon
1755		sqlx::query("UPDATE blocks SET is_canon = 0 WHERE substrate_block_hash = ?")
1756			.bind(substrate_hash_1.as_bytes())
1757			.execute(backend.pool())
1758			.await
1759			.expect("sql query must succeed");
1760		assert_blocks_canon(
1761			backend.pool(),
1762			vec![
1763				(substrate_hash_1, 0),
1764				(substrate_hash_2, 1),
1765				(substrate_hash_3, 1),
1766			],
1767		)
1768		.await;
1769
1770		backend
1771			.canonicalize(&[substrate_hash_2], &[substrate_hash_1])
1772			.await
1773			.expect("must succeed");
1774
1775		assert_blocks_canon(
1776			backend.pool(),
1777			vec![
1778				(substrate_hash_1, 1),
1779				(substrate_hash_2, 0),
1780				(substrate_hash_3, 1),
1781			],
1782		)
1783		.await;
1784	}
1785
1786	#[test]
1787	fn test_query_should_be_generated_correctly() {
1788		use sqlx::Execute;
1789
1790		let from_block: u64 = 100;
1791		let to_block: u64 = 500;
1792		let addresses: Vec<H160> = vec![
1793			H160::repeat_byte(0x01),
1794			H160::repeat_byte(0x02),
1795			H160::repeat_byte(0x03),
1796		];
1797		let topics = [
1798			hashset![
1799				H256::repeat_byte(0x01),
1800				H256::repeat_byte(0x02),
1801				H256::repeat_byte(0x03),
1802			],
1803			hashset![H256::repeat_byte(0x04), H256::repeat_byte(0x05),],
1804			hashset![],
1805			hashset![H256::repeat_byte(0x06)],
1806		];
1807
1808		let expected_query_sql = "
1809SELECT
1810	l.substrate_block_hash,
1811	b.ethereum_block_hash,
1812	b.block_number,
1813	b.ethereum_storage_schema,
1814	l.transaction_index,
1815	l.log_index
1816FROM logs AS l
1817INNER JOIN blocks AS b
1818ON (b.block_number BETWEEN ? AND ?) AND b.substrate_block_hash = l.substrate_block_hash AND b.is_canon = 1
1819WHERE 1 AND l.address IN (?, ?, ?) AND l.topic_1 IN (?, ?, ?) AND l.topic_2 IN (?, ?) AND l.topic_4 = ?
1820ORDER BY b.block_number ASC, l.transaction_index ASC, l.log_index ASC
1821LIMIT 10001";
1822
1823		let mut qb = QueryBuilder::new("");
1824		let actual_query_sql = build_query(&mut qb, from_block, to_block, addresses, topics).sql();
1825		assert_eq!(expected_query_sql, actual_query_sql);
1826	}
1827}