1use std::{cmp::Ordering, collections::HashSet, num::NonZeroU32, str::FromStr, sync::Arc};
20
21use futures::TryStreamExt;
22use scale_codec::{Decode, Encode};
23use sqlx::{
24 query::Query,
25 sqlite::{
26 SqliteArguments, SqliteConnectOptions, SqlitePool, SqlitePoolOptions, SqliteQueryResult,
27 },
28 ConnectOptions, Error, Execute, QueryBuilder, Row, Sqlite,
29};
30use sc_client_api::backend::{Backend as BackendT, StorageProvider};
32use sp_api::{ApiExt, ProvideRuntimeApi};
33use sp_blockchain::HeaderBackend;
34use sp_core::{H160, H256};
35use sp_runtime::{
36 generic::BlockId,
37 traits::{Block as BlockT, Header as HeaderT, UniqueSaturatedInto, Zero},
38};
39use fc_api::{FilteredLog, TransactionMetadata};
41use fc_storage::{StorageOverride, StorageQuerier};
42use fp_consensus::{FindLogError, Hashes, Log as ConsensusLog, PostLog, PreLog};
43use fp_rpc::EthereumRuntimeRPCApi;
44use fp_storage::EthereumStorageSchema;
45
46#[derive(Debug, Eq, PartialEq)]
48pub struct Log {
49 pub address: Vec<u8>,
50 pub topic_1: Option<Vec<u8>>,
51 pub topic_2: Option<Vec<u8>>,
52 pub topic_3: Option<Vec<u8>>,
53 pub topic_4: Option<Vec<u8>>,
54 pub log_index: i32,
55 pub transaction_index: i32,
56 pub substrate_block_hash: Vec<u8>,
57}
58
59#[derive(Eq, PartialEq)]
61struct BlockMetadata {
62 pub substrate_block_hash: H256,
63 pub block_number: i32,
64 pub post_hashes: Hashes,
65 pub schema: EthereumStorageSchema,
66 pub is_canon: i32,
67}
68
69#[derive(Debug)]
72pub struct SqliteBackendConfig<'a> {
73 pub path: &'a str,
74 pub create_if_missing: bool,
75 pub thread_count: u32,
76 pub cache_size: u64,
77}
78
79#[derive(Debug, Default)]
81pub struct BlockIndexedStatus {
82 pub indexed: bool,
83 pub canon: bool,
84}
85
86#[derive(Debug)]
88pub enum BackendConfig<'a> {
89 Sqlite(SqliteBackendConfig<'a>),
90}
91
92#[derive(Clone)]
93pub struct Backend<Block> {
94 pool: SqlitePool,
96 storage_override: Arc<dyn StorageOverride<Block>>,
98
99 num_ops_timeout: i32,
102}
103
104impl<Block> Backend<Block>
105where
106 Block: BlockT<Hash = H256>,
107{
108 pub async fn new(
110 config: BackendConfig<'_>,
111 pool_size: u32,
112 num_ops_timeout: Option<NonZeroU32>,
113 storage_override: Arc<dyn StorageOverride<Block>>,
114 ) -> Result<Self, Error> {
115 let any_pool = SqlitePoolOptions::new()
116 .max_connections(pool_size)
117 .connect_lazy_with(Self::connect_options(&config)?.disable_statement_logging());
118 let _ = Self::create_database_if_not_exists(&any_pool).await?;
119 let _ = Self::create_indexes_if_not_exist(&any_pool).await?;
120 Ok(Self {
121 pool: any_pool,
122 storage_override,
123 num_ops_timeout: num_ops_timeout
124 .map(|n| n.get())
125 .unwrap_or(0)
126 .try_into()
127 .unwrap_or(i32::MAX),
128 })
129 }
130
131 fn connect_options(config: &BackendConfig) -> Result<SqliteConnectOptions, Error> {
132 match config {
133 BackendConfig::Sqlite(config) => {
134 log::info!(target: "frontier-sql", "📑 Connection configuration: {config:?}");
135 let config = sqlx::sqlite::SqliteConnectOptions::from_str(config.path)?
136 .create_if_missing(config.create_if_missing)
137 .busy_timeout(std::time::Duration::from_secs(8))
139 .pragma("cache_size", format!("-{}", config.cache_size))
141 .pragma("analysis_limit", "1000")
143 .pragma("threads", config.thread_count.to_string())
145 .pragma("temp_store", "memory")
147 .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
149 .synchronous(sqlx::sqlite::SqliteSynchronous::Normal);
151 Ok(config)
152 }
153 }
154 }
155
156 pub fn pool(&self) -> &SqlitePool {
158 &self.pool
159 }
160
161 pub async fn canonicalize(&self, retracted: &[H256], enacted: &[H256]) -> Result<(), Error> {
164 let mut tx = self.pool().begin().await?;
165
166 let mut builder: QueryBuilder<Sqlite> =
168 QueryBuilder::new("UPDATE blocks SET is_canon = 0 WHERE substrate_block_hash IN (");
169 let mut retracted_hashes = builder.separated(", ");
170 for hash in retracted.iter() {
171 let hash = hash.as_bytes();
172 retracted_hashes.push_bind(hash);
173 }
174 retracted_hashes.push_unseparated(")");
175 let query = builder.build();
176 query.execute(&mut *tx).await?;
177
178 let mut builder: QueryBuilder<Sqlite> =
180 QueryBuilder::new("UPDATE blocks SET is_canon = 1 WHERE substrate_block_hash IN (");
181 let mut enacted_hashes = builder.separated(", ");
182 for hash in enacted.iter() {
183 let hash = hash.as_bytes();
184 enacted_hashes.push_bind(hash);
185 }
186 enacted_hashes.push_unseparated(")");
187 let query = builder.build();
188 query.execute(&mut *tx).await?;
189
190 tx.commit().await
191 }
192
193 pub async fn insert_genesis_block_metadata<Client, BE>(
195 &self,
196 client: Arc<Client>,
197 ) -> Result<Option<H256>, Error>
198 where
199 Client: StorageProvider<Block, BE> + HeaderBackend<Block> + 'static,
200 Client: ProvideRuntimeApi<Block>,
201 Client::Api: EthereumRuntimeRPCApi<Block>,
202 BE: BackendT<Block> + 'static,
203 {
204 let id = BlockId::Number(Zero::zero());
205 let substrate_genesis_hash = client
206 .expect_block_hash_from_id(&id)
207 .map_err(|_| Error::Protocol("Cannot resolve genesis hash".to_string()))?;
208 let maybe_substrate_hash: Option<H256> = if let Ok(Some(_)) =
209 client.header(substrate_genesis_hash)
210 {
211 let has_api = client
212 .runtime_api()
213 .has_api_with::<dyn EthereumRuntimeRPCApi<Block>, _>(
214 substrate_genesis_hash,
215 |version| version >= 1,
216 )
217 .expect("runtime api reachable");
218
219 log::debug!(target: "frontier-sql", "Index genesis block, has_api={has_api}, hash={substrate_genesis_hash:?}");
220
221 if has_api {
222 let ethereum_block = client
225 .runtime_api()
226 .current_block(substrate_genesis_hash)
227 .expect("runtime api reachable")
228 .expect("ethereum genesis block");
229
230 let schema = StorageQuerier::new(client)
231 .storage_schema(substrate_genesis_hash)
232 .unwrap_or(EthereumStorageSchema::V3)
233 .encode();
234 let ethereum_block_hash = ethereum_block.header.hash().as_bytes().to_owned();
235 let substrate_block_hash = substrate_genesis_hash.as_bytes();
236 let block_number = 0i32;
237 let is_canon = 1i32;
238
239 let _ = sqlx::query(
240 "INSERT OR IGNORE INTO blocks(
241 ethereum_block_hash,
242 substrate_block_hash,
243 block_number,
244 ethereum_storage_schema,
245 is_canon)
246 VALUES (?, ?, ?, ?, ?)",
247 )
248 .bind(ethereum_block_hash)
249 .bind(substrate_block_hash)
250 .bind(block_number)
251 .bind(schema)
252 .bind(is_canon)
253 .execute(self.pool())
254 .await?;
255 }
256 Some(substrate_genesis_hash)
257 } else {
258 None
259 };
260 Ok(maybe_substrate_hash)
261 }
262
263 fn insert_block_metadata_inner<Client, BE>(
264 client: Arc<Client>,
265 hash: H256,
266 storage_override: &dyn StorageOverride<Block>,
267 ) -> Result<BlockMetadata, Error>
268 where
269 Client: StorageProvider<Block, BE> + HeaderBackend<Block> + 'static,
270 BE: BackendT<Block> + 'static,
271 {
272 log::trace!(target: "frontier-sql", "🛠️ [Metadata] Retrieving digest data for block {hash:?}");
273 if let Ok(Some(header)) = client.header(hash) {
274 match fp_consensus::find_log(header.digest()) {
275 Ok(log) => {
276 let schema = StorageQuerier::new(client.clone())
277 .storage_schema(hash)
278 .unwrap_or(EthereumStorageSchema::V3);
279 let log_hashes = match log {
280 ConsensusLog::Post(PostLog::Hashes(post_hashes)) => post_hashes,
281 ConsensusLog::Post(PostLog::Block(block)) => Hashes::from_block(block),
282 ConsensusLog::Post(PostLog::BlockHash(expect_eth_block_hash)) => {
283 let ethereum_block = storage_override.current_block(hash);
284 match ethereum_block {
285 Some(block) => {
286 let got_eth_block_hash = block.header.hash();
287 if got_eth_block_hash != expect_eth_block_hash {
288 return Err(Error::Protocol(format!(
289 "Ethereum block hash mismatch: \
290 frontier consensus digest ({expect_eth_block_hash:?}), \
291 db state ({got_eth_block_hash:?})"
292 )));
293 } else {
294 Hashes::from_block(block)
295 }
296 }
297 None => {
298 return Err(Error::Protocol(format!(
299 "Missing ethereum block for hash mismatch {expect_eth_block_hash:?}"
300 )))
301 }
302 }
303 }
304 ConsensusLog::Pre(PreLog::Block(block)) => Hashes::from_block(block),
305 };
306
307 let header_number = *header.number();
308 let block_number =
309 UniqueSaturatedInto::<u32>::unique_saturated_into(header_number) as i32;
310 let is_canon = match client.hash(header_number) {
311 Ok(Some(inner_hash)) => (inner_hash == hash) as i32,
312 Ok(None) => {
313 log::debug!(target: "frontier-sql", "[Metadata] Missing header for block #{block_number} ({hash:?})");
314 0
315 }
316 Err(err) => {
317 log::debug!(
318 target: "frontier-sql",
319 "[Metadata] Failed to retrieve header for block #{block_number} ({hash:?}): {err:?}",
320 );
321 0
322 }
323 };
324
325 log::trace!(
326 target: "frontier-sql",
327 "[Metadata] Prepared block metadata for #{block_number} ({hash:?}) canon={is_canon}",
328 );
329 Ok(BlockMetadata {
330 substrate_block_hash: hash,
331 block_number,
332 post_hashes: log_hashes,
333 schema,
334 is_canon,
335 })
336 }
337 Err(FindLogError::NotFound) => Err(Error::Protocol(format!(
338 "[Metadata] No logs found for hash {hash:?}",
339 ))),
340 Err(FindLogError::MultipleLogs) => Err(Error::Protocol(format!(
341 "[Metadata] Multiple logs found for hash {hash:?}",
342 ))),
343 }
344 } else {
345 Err(Error::Protocol(format!(
346 "[Metadata] Failed retrieving header for hash {hash:?}"
347 )))
348 }
349 }
350
351 pub async fn insert_block_metadata<Client, BE>(
353 &self,
354 client: Arc<Client>,
355 hash: H256,
356 ) -> Result<(), Error>
357 where
358 Client: StorageProvider<Block, BE> + HeaderBackend<Block> + 'static,
359 BE: BackendT<Block> + 'static,
360 {
361 let storage_override = self.storage_override.clone();
363 let metadata = tokio::task::spawn_blocking(move || {
364 Self::insert_block_metadata_inner(client.clone(), hash, &*storage_override)
365 })
366 .await
367 .map_err(|_| Error::Protocol("tokio blocking metadata task failed".to_string()))??;
368
369 let mut tx = self.pool().begin().await?;
370
371 log::debug!(
372 target: "frontier-sql",
373 "🛠️ [Metadata] Starting execution of statements on db transaction"
374 );
375 let post_hashes = metadata.post_hashes;
376 let ethereum_block_hash = post_hashes.block_hash.as_bytes();
377 let substrate_block_hash = metadata.substrate_block_hash.as_bytes();
378 let schema = metadata.schema.encode();
379 let block_number = metadata.block_number;
380 let is_canon = metadata.is_canon;
381
382 let _ = sqlx::query(
383 "INSERT OR IGNORE INTO blocks(
384 ethereum_block_hash,
385 substrate_block_hash,
386 block_number,
387 ethereum_storage_schema,
388 is_canon)
389 VALUES (?, ?, ?, ?, ?)",
390 )
391 .bind(ethereum_block_hash)
392 .bind(substrate_block_hash)
393 .bind(block_number)
394 .bind(schema)
395 .bind(is_canon)
396 .execute(&mut *tx)
397 .await?;
398 for (i, &transaction_hash) in post_hashes.transaction_hashes.iter().enumerate() {
399 let ethereum_transaction_hash = transaction_hash.as_bytes();
400 let ethereum_transaction_index = i as i32;
401 log::trace!(
402 target: "frontier-sql",
403 "[Metadata] Inserting TX for block #{block_number} - {transaction_hash:?} index {ethereum_transaction_index}",
404 );
405 let _ = sqlx::query(
406 "INSERT OR IGNORE INTO transactions(
407 ethereum_transaction_hash,
408 substrate_block_hash,
409 ethereum_block_hash,
410 ethereum_transaction_index)
411 VALUES (?, ?, ?, ?)",
412 )
413 .bind(ethereum_transaction_hash)
414 .bind(substrate_block_hash)
415 .bind(ethereum_block_hash)
416 .bind(ethereum_transaction_index)
417 .execute(&mut *tx)
418 .await?;
419 }
420
421 sqlx::query("INSERT INTO sync_status(substrate_block_hash) VALUES (?)")
422 .bind(hash.as_bytes())
423 .execute(&mut *tx)
424 .await?;
425
426 log::debug!(target: "frontier-sql", "[Metadata] Ready to commit");
427 tx.commit().await
428 }
429
430 pub async fn index_block_logs(&self, block_hash: Block::Hash) {
432 let pool = self.pool().clone();
433 let storage_override = self.storage_override.clone();
434 let _ = async {
435 let mut tx = pool.begin().await?;
444 match sqlx::query(
446 "UPDATE sync_status
447 SET status = 1
448 WHERE substrate_block_hash IN
449 (SELECT substrate_block_hash
450 FROM sync_status
451 WHERE status = 0 AND substrate_block_hash = ?) RETURNING substrate_block_hash",
452 )
453 .bind(block_hash.as_bytes())
454 .fetch_one(&mut *tx)
455 .await
456 {
457 Ok(_) => {
458 let logs = tokio::task::spawn_blocking(move || {
460 Self::get_logs(storage_override, block_hash)
461 })
462 .await
463 .map_err(|_| Error::Protocol("tokio blocking task failed".to_string()))?;
464
465 for log in logs {
466 let _ = sqlx::query(
467 "INSERT OR IGNORE INTO logs(
468 address,
469 topic_1,
470 topic_2,
471 topic_3,
472 topic_4,
473 log_index,
474 transaction_index,
475 substrate_block_hash)
476 VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
477 )
478 .bind(log.address)
479 .bind(log.topic_1)
480 .bind(log.topic_2)
481 .bind(log.topic_3)
482 .bind(log.topic_4)
483 .bind(log.log_index)
484 .bind(log.transaction_index)
485 .bind(log.substrate_block_hash)
486 .execute(&mut *tx)
487 .await?;
488 }
489 Ok(tx.commit().await?)
490 }
491 Err(e) => Err(e),
492 }
493 }
494 .await
495 .map_err(|e| {
496 log::error!(target: "frontier-sql", "{e}");
497 });
498 let _ = sqlx::query("PRAGMA optimize").execute(&pool).await;
500 log::debug!(target: "frontier-sql", "Batch committed");
501 }
502
503 fn get_logs(
504 storage_override: Arc<dyn StorageOverride<Block>>,
505 substrate_block_hash: H256,
506 ) -> Vec<Log> {
507 let mut logs: Vec<Log> = vec![];
508 let mut transaction_count: usize = 0;
509 let mut log_count: usize = 0;
510 let receipts = storage_override
511 .current_receipts(substrate_block_hash)
512 .unwrap_or_default();
513
514 transaction_count += receipts.len();
515 for (transaction_index, receipt) in receipts.iter().enumerate() {
516 let receipt_logs = match receipt {
517 ethereum::ReceiptV4::Legacy(d)
518 | ethereum::ReceiptV4::EIP2930(d)
519 | ethereum::ReceiptV4::EIP1559(d)
520 | ethereum::ReceiptV4::EIP7702(d) => &d.logs,
521 };
522 let transaction_index = transaction_index as i32;
523 log_count += receipt_logs.len();
524 for (log_index, log) in receipt_logs.iter().enumerate() {
525 #[allow(clippy::get_first)]
526 logs.push(Log {
527 address: log.address.as_bytes().to_owned(),
528 topic_1: log.topics.get(0).map(|l| l.as_bytes().to_owned()),
529 topic_2: log.topics.get(1).map(|l| l.as_bytes().to_owned()),
530 topic_3: log.topics.get(2).map(|l| l.as_bytes().to_owned()),
531 topic_4: log.topics.get(3).map(|l| l.as_bytes().to_owned()),
532 log_index: log_index as i32,
533 transaction_index,
534 substrate_block_hash: substrate_block_hash.as_bytes().to_owned(),
535 });
536 }
537 }
538 log::debug!(
539 target: "frontier-sql",
540 "Ready to commit {log_count} logs from {transaction_count} transactions"
541 );
542 logs
543 }
544
545 pub async fn is_block_indexed(&self, block_hash: Block::Hash) -> bool {
547 sqlx::query("SELECT substrate_block_hash FROM sync_status WHERE substrate_block_hash = ?")
548 .bind(block_hash.as_bytes().to_owned())
549 .fetch_optional(self.pool())
550 .await
551 .map(|r| r.is_some())
552 .unwrap_or(false)
553 }
554
555 pub async fn block_indexed_and_canon_status(
557 &self,
558 block_hash: Block::Hash,
559 ) -> BlockIndexedStatus {
560 sqlx::query(
561 "SELECT b.is_canon FROM sync_status AS s
562 INNER JOIN blocks AS b
563 ON s.substrate_block_hash = b.substrate_block_hash
564 WHERE s.substrate_block_hash = ?",
565 )
566 .bind(block_hash.as_bytes().to_owned())
567 .fetch_optional(self.pool())
568 .await
569 .map(|result| {
570 result
571 .map(|row| {
572 let is_canon: i32 = row.get(0);
573 BlockIndexedStatus {
574 indexed: true,
575 canon: is_canon != 0,
576 }
577 })
578 .unwrap_or_default()
579 })
580 .unwrap_or_default()
581 }
582
583 pub async fn set_block_as_canon(&self, block_hash: H256) -> Result<SqliteQueryResult, Error> {
585 sqlx::query("UPDATE blocks SET is_canon = 1 WHERE substrate_block_hash = ?")
586 .bind(block_hash.as_bytes())
587 .execute(self.pool())
588 .await
589 }
590
591 pub async fn get_first_missing_canon_block(&self) -> Option<u32> {
595 match sqlx::query(
596 "SELECT b1.block_number-1
597 FROM blocks as b1
598 WHERE b1.block_number > 0 AND b1.is_canon=1 AND NOT EXISTS (
599 SELECT 1 FROM blocks AS b2
600 WHERE b2.block_number = b1.block_number-1
601 AND b1.is_canon=1
602 AND b2.is_canon=1
603 )
604 ORDER BY block_number LIMIT 1",
605 )
606 .fetch_optional(self.pool())
607 .await
608 {
609 Ok(result) => {
610 if let Some(row) = result {
611 let block_number: u32 = row.get(0);
612 return Some(block_number);
613 }
614 }
615 Err(err) => {
616 log::debug!(target: "frontier-sql", "Failed retrieving missing block {err:?}");
617 }
618 }
619
620 None
621 }
622
623 pub async fn get_first_pending_canon_block(&self) -> Option<H256> {
627 match sqlx::query(
628 "SELECT s.substrate_block_hash FROM sync_status AS s
629 INNER JOIN blocks as b
630 ON s.substrate_block_hash = b.substrate_block_hash
631 WHERE b.is_canon = 1 AND s.status = 0
632 ORDER BY b.block_number LIMIT 1",
633 )
634 .fetch_optional(self.pool())
635 .await
636 {
637 Ok(result) => {
638 if let Some(row) = result {
639 let block_hash_bytes: Vec<u8> = row.get(0);
640 let block_hash = H256::from_slice(&block_hash_bytes[..]);
641 return Some(block_hash);
642 }
643 }
644 Err(err) => {
645 log::debug!(target: "frontier-sql", "Failed retrieving missing block {err:?}");
646 }
647 }
648
649 None
650 }
651
652 pub async fn last_indexed_canon_block(&self) -> Result<H256, Error> {
654 let row = sqlx::query(
655 "SELECT b.substrate_block_hash FROM blocks AS b
656 INNER JOIN sync_status AS s
657 ON s.substrate_block_hash = b.substrate_block_hash
658 WHERE b.is_canon=1 AND s.status = 1
659 ORDER BY b.id DESC LIMIT 1",
660 )
661 .fetch_one(self.pool())
662 .await?;
663 Ok(H256::from_slice(
664 &row.try_get::<Vec<u8>, _>(0).unwrap_or_default()[..],
665 ))
666 }
667
668 async fn create_database_if_not_exists(pool: &SqlitePool) -> Result<SqliteQueryResult, Error> {
670 sqlx::query(
671 "BEGIN;
672 CREATE TABLE IF NOT EXISTS logs (
673 id INTEGER PRIMARY KEY,
674 address BLOB NOT NULL,
675 topic_1 BLOB,
676 topic_2 BLOB,
677 topic_3 BLOB,
678 topic_4 BLOB,
679 log_index INTEGER NOT NULL,
680 transaction_index INTEGER NOT NULL,
681 substrate_block_hash BLOB NOT NULL,
682 UNIQUE (
683 log_index,
684 transaction_index,
685 substrate_block_hash
686 )
687 );
688 CREATE TABLE IF NOT EXISTS sync_status (
689 id INTEGER PRIMARY KEY,
690 substrate_block_hash BLOB NOT NULL,
691 status INTEGER DEFAULT 0 NOT NULL,
692 UNIQUE (
693 substrate_block_hash
694 )
695 );
696 CREATE TABLE IF NOT EXISTS blocks (
697 id INTEGER PRIMARY KEY,
698 block_number INTEGER NOT NULL,
699 ethereum_block_hash BLOB NOT NULL,
700 substrate_block_hash BLOB NOT NULL,
701 ethereum_storage_schema BLOB NOT NULL,
702 is_canon INTEGER NOT NULL,
703 UNIQUE (
704 ethereum_block_hash,
705 substrate_block_hash
706 )
707 );
708 CREATE TABLE IF NOT EXISTS transactions (
709 id INTEGER PRIMARY KEY,
710 ethereum_transaction_hash BLOB NOT NULL,
711 substrate_block_hash BLOB NOT NULL,
712 ethereum_block_hash BLOB NOT NULL,
713 ethereum_transaction_index INTEGER NOT NULL,
714 UNIQUE (
715 ethereum_transaction_hash,
716 substrate_block_hash
717 )
718 );
719 COMMIT;",
720 )
721 .execute(pool)
722 .await
723 }
724
725 async fn create_indexes_if_not_exist(pool: &SqlitePool) -> Result<SqliteQueryResult, Error> {
727 sqlx::query(
728 "BEGIN;
729 CREATE INDEX IF NOT EXISTS logs_main_idx ON logs (
730 address,
731 topic_1,
732 topic_2,
733 topic_3,
734 topic_4
735 );
736 CREATE INDEX IF NOT EXISTS logs_substrate_index ON logs (
737 substrate_block_hash
738 );
739 CREATE INDEX IF NOT EXISTS blocks_number_index ON blocks (
740 block_number
741 );
742 CREATE INDEX IF NOT EXISTS blocks_substrate_index ON blocks (
743 substrate_block_hash
744 );
745 CREATE INDEX IF NOT EXISTS eth_block_hash_idx ON blocks (
746 ethereum_block_hash
747 );
748 CREATE INDEX IF NOT EXISTS eth_tx_hash_idx ON transactions (
749 ethereum_transaction_hash
750 );
751 CREATE INDEX IF NOT EXISTS eth_tx_hash_2_idx ON transactions (
752 ethereum_block_hash,
753 ethereum_transaction_index
754 );
755 COMMIT;",
756 )
757 .execute(pool)
758 .await
759 }
760}
761
762#[async_trait::async_trait]
763impl<Block: BlockT<Hash = H256>> fc_api::Backend<Block> for Backend<Block> {
764 async fn block_hash(
765 &self,
766 ethereum_block_hash: &H256,
767 ) -> Result<Option<Vec<Block::Hash>>, String> {
768 let ethereum_block_hash = ethereum_block_hash.as_bytes();
769 let res =
770 sqlx::query("SELECT substrate_block_hash FROM blocks WHERE ethereum_block_hash = ?")
771 .bind(ethereum_block_hash)
772 .fetch_all(&self.pool)
773 .await
774 .ok()
775 .map(|rows| {
776 rows.iter()
777 .map(|row| {
778 H256::from_slice(&row.try_get::<Vec<u8>, _>(0).unwrap_or_default()[..])
779 })
780 .collect()
781 });
782 Ok(res)
783 }
784
785 async fn transaction_metadata(
786 &self,
787 ethereum_transaction_hash: &H256,
788 ) -> Result<Vec<TransactionMetadata<Block>>, String> {
789 let ethereum_transaction_hash = ethereum_transaction_hash.as_bytes();
790 let out = sqlx::query(
791 "SELECT
792 substrate_block_hash, ethereum_block_hash, ethereum_transaction_index
793 FROM transactions WHERE ethereum_transaction_hash = ?",
794 )
795 .bind(ethereum_transaction_hash)
796 .fetch_all(&self.pool)
797 .await
798 .unwrap_or_default()
799 .iter()
800 .map(|row| {
801 let substrate_block_hash =
802 H256::from_slice(&row.try_get::<Vec<u8>, _>(0).unwrap_or_default()[..]);
803 let ethereum_block_hash =
804 H256::from_slice(&row.try_get::<Vec<u8>, _>(1).unwrap_or_default()[..]);
805 let ethereum_transaction_index = row.try_get::<i32, _>(2).unwrap_or_default() as u32;
806 TransactionMetadata {
807 substrate_block_hash,
808 ethereum_block_hash,
809 ethereum_index: ethereum_transaction_index,
810 }
811 })
812 .collect();
813
814 Ok(out)
815 }
816
817 fn log_indexer(&self) -> &dyn fc_api::LogIndexerBackend<Block> {
818 self
819 }
820
821 async fn first_block_hash(&self) -> Result<Block::Hash, String> {
822 sqlx::query("SELECT substrate_block_hash FROM blocks ORDER BY block_number ASC LIMIT 1")
824 .fetch_one(self.pool())
825 .await
826 .map(|row| H256::from_slice(&row.get::<Vec<u8>, _>(0)[..]))
827 .map_err(|e| format!("Failed to fetch oldest block hash: {e}"))
828 }
829
830 async fn latest_block_hash(&self) -> Result<Block::Hash, String> {
831 sqlx::query("SELECT substrate_block_hash FROM blocks ORDER BY block_number DESC LIMIT 1")
833 .fetch_one(self.pool())
834 .await
835 .map(|row| H256::from_slice(&row.get::<Vec<u8>, _>(0)[..]))
836 .map_err(|e| format!("Failed to fetch best hash: {e}"))
837 }
838}
839
840#[async_trait::async_trait]
841impl<Block: BlockT<Hash = H256>> fc_api::LogIndexerBackend<Block> for Backend<Block> {
842 fn is_indexed(&self) -> bool {
843 true
844 }
845
846 async fn filter_logs(
847 &self,
848 from_block: u64,
849 to_block: u64,
850 addresses: Vec<H160>,
851 topics: Vec<Vec<H256>>,
852 ) -> Result<Vec<FilteredLog<Block>>, String> {
853 let mut unique_topics: [HashSet<H256>; 4] = [
854 HashSet::new(),
855 HashSet::new(),
856 HashSet::new(),
857 HashSet::new(),
858 ];
859 for (topic_index, topic_options) in topics.into_iter().enumerate() {
860 unique_topics[topic_index].extend(topic_options);
861 }
862
863 let log_key = format!("{from_block}-{to_block}-{addresses:?}-{unique_topics:?}");
864 let mut qb = QueryBuilder::new("");
865 let query = build_query(&mut qb, from_block, to_block, addresses, unique_topics);
866 let sql = query.sql();
867
868 let mut conn = self
869 .pool()
870 .acquire()
871 .await
872 .map_err(|err| format!("failed acquiring sqlite connection: {err}"))?;
873 let log_key2 = log_key.clone();
874 conn.lock_handle()
875 .await
876 .map_err(|err| format!("{err:?}"))?
877 .set_progress_handler(self.num_ops_timeout, move || {
878 log::debug!(target: "frontier-sql", "Sqlite progress_handler triggered for {log_key2}");
879 false
880 });
881 log::debug!(target: "frontier-sql", "Query: {sql:?} - {log_key}");
882
883 let mut out: Vec<FilteredLog<Block>> = vec![];
884 let mut rows = query.fetch(&mut *conn);
885 let maybe_err = loop {
886 match rows.try_next().await {
887 Ok(Some(row)) => {
888 let substrate_block_hash =
890 H256::from_slice(&row.try_get::<Vec<u8>, _>(0).unwrap_or_default()[..]);
891 let ethereum_block_hash =
893 H256::from_slice(&row.try_get::<Vec<u8>, _>(1).unwrap_or_default()[..]);
894 let block_number = row.try_get::<i32, _>(2).unwrap_or_default() as u32;
896 let ethereum_storage_schema: EthereumStorageSchema =
898 Decode::decode(&mut &row.try_get::<Vec<u8>, _>(3).unwrap_or_default()[..])
899 .map_err(|_| {
900 "Cannot decode EthereumStorageSchema for block".to_string()
901 })?;
902 let transaction_index = row.try_get::<i32, _>(4).unwrap_or_default() as u32;
904 let log_index = row.try_get::<i32, _>(5).unwrap_or_default() as u32;
906 out.push(FilteredLog {
907 substrate_block_hash,
908 ethereum_block_hash,
909 block_number,
910 ethereum_storage_schema,
911 transaction_index,
912 log_index,
913 });
914 }
915 Ok(None) => break None, Err(err) => break Some(err),
917 };
918 };
919 drop(rows);
920 conn.lock_handle()
921 .await
922 .map_err(|err| format!("{err:?}"))?
923 .remove_progress_handler();
924
925 if let Some(err) = maybe_err {
926 log::error!(target: "frontier-sql", "Failed to query sql db: {err:?} - {log_key}");
927 return Err("Failed to query sql db with statement".to_string());
928 }
929
930 log::info!(target: "frontier-sql", "FILTER remove handler - {log_key}");
931 Ok(out)
932 }
933}
934
935fn build_query<'a>(
937 qb: &'a mut QueryBuilder<Sqlite>,
938 from_block: u64,
939 to_block: u64,
940 addresses: Vec<H160>,
941 topics: [HashSet<H256>; 4],
942) -> Query<'a, Sqlite, SqliteArguments<'a>> {
943 qb.push(
944 "
945SELECT
946 l.substrate_block_hash,
947 b.ethereum_block_hash,
948 b.block_number,
949 b.ethereum_storage_schema,
950 l.transaction_index,
951 l.log_index
952FROM logs AS l
953INNER JOIN blocks AS b
954ON (b.block_number BETWEEN ",
955 );
956 qb.separated(" AND ")
957 .push_bind(from_block as i64)
958 .push_bind(to_block as i64)
959 .push_unseparated(")");
960 qb.push(" AND b.substrate_block_hash = l.substrate_block_hash")
961 .push(" AND b.is_canon = 1")
962 .push("\nWHERE 1");
963
964 if !addresses.is_empty() {
965 qb.push(" AND l.address IN (");
966 let mut qb_addr = qb.separated(", ");
967 addresses.iter().for_each(|addr| {
968 qb_addr.push_bind(addr.as_bytes().to_owned());
969 });
970 qb_addr.push_unseparated(")");
971 }
972
973 for (i, topic_options) in topics.iter().enumerate() {
974 match topic_options.len().cmp(&1) {
975 Ordering::Greater => {
976 qb.push(format!(" AND l.topic_{} IN (", i + 1));
977 let mut qb_topic = qb.separated(", ");
978 topic_options.iter().for_each(|t| {
979 qb_topic.push_bind(t.as_bytes().to_owned());
980 });
981 qb_topic.push_unseparated(")");
982 }
983 Ordering::Equal => {
984 qb.push(format!(" AND l.topic_{} = ", i + 1)).push_bind(
985 topic_options
986 .iter()
987 .next()
988 .expect("length is 1, must exist; qed")
989 .as_bytes()
990 .to_owned(),
991 );
992 }
993 Ordering::Less => {}
994 }
995 }
996
997 qb.push(
998 "
999ORDER BY b.block_number ASC, l.transaction_index ASC, l.log_index ASC
1000LIMIT 10001",
1001 );
1002
1003 qb.build()
1004}
1005
1006#[cfg(test)]
1007mod test {
1008 use super::*;
1009
1010 use std::path::Path;
1011
1012 use maplit::hashset;
1013 use scale_codec::Encode;
1014 use sqlx::{sqlite::SqliteRow, QueryBuilder, Row, SqlitePool};
1015 use tempfile::tempdir;
1016 use sp_core::{H160, H256};
1018 use sp_runtime::{
1019 generic::{Block, Header},
1020 traits::BlakeTwo256,
1021 };
1022 use substrate_test_runtime_client::{
1023 DefaultTestClientBuilderExt, TestClientBuilder, TestClientBuilderExt,
1024 };
1025 use fc_api::Backend as BackendT;
1027 use fc_storage::SchemaV3StorageOverride;
1028 use fp_storage::{EthereumStorageSchema, PALLET_ETHEREUM_SCHEMA};
1029
1030 type OpaqueBlock =
1031 Block<Header<u64, BlakeTwo256>, substrate_test_runtime_client::runtime::Extrinsic>;
1032
1033 struct TestFilter {
1034 pub from_block: u64,
1035 pub to_block: u64,
1036 pub addresses: Vec<H160>,
1037 pub topics: Vec<Vec<H256>>,
1038 pub expected_result: Vec<FilteredLog<OpaqueBlock>>,
1039 }
1040
1041 #[derive(Debug, Clone)]
1042 struct Log {
1043 block_number: u32,
1044 address: H160,
1045 topics: [H256; 4],
1046 substrate_block_hash: H256,
1047 ethereum_block_hash: H256,
1048 transaction_index: u32,
1049 log_index: u32,
1050 }
1051
1052 #[allow(unused)]
1053 struct TestData {
1054 backend: Backend<OpaqueBlock>,
1055 alice: H160,
1056 bob: H160,
1057 topics_a: H256,
1058 topics_b: H256,
1059 topics_c: H256,
1060 topics_d: H256,
1061 substrate_hash_1: H256,
1062 substrate_hash_2: H256,
1063 substrate_hash_3: H256,
1064 ethereum_hash_1: H256,
1065 ethereum_hash_2: H256,
1066 ethereum_hash_3: H256,
1067 log_1_abcd_0_0_alice: Log,
1068 log_1_dcba_1_0_alice: Log,
1069 log_1_badc_2_0_alice: Log,
1070 log_2_abcd_0_0_bob: Log,
1071 log_2_dcba_1_0_bob: Log,
1072 log_2_badc_2_0_bob: Log,
1073 log_3_abcd_0_0_bob: Log,
1074 log_3_dcba_1_0_bob: Log,
1075 log_3_badc_2_0_bob: Log,
1076 }
1077
1078 impl From<Log> for FilteredLog<OpaqueBlock> {
1079 fn from(value: Log) -> Self {
1080 Self {
1081 substrate_block_hash: value.substrate_block_hash,
1082 ethereum_block_hash: value.ethereum_block_hash,
1083 block_number: value.block_number,
1084 ethereum_storage_schema: EthereumStorageSchema::V3,
1085 transaction_index: value.transaction_index,
1086 log_index: value.log_index,
1087 }
1088 }
1089 }
1090
1091 async fn prepare() -> TestData {
1092 let tmp = tempdir().expect("create a temporary directory");
1093 let builder = TestClientBuilder::new().add_extra_storage(
1095 PALLET_ETHEREUM_SCHEMA.to_vec(),
1096 Encode::encode(&EthereumStorageSchema::V3),
1097 );
1098 let (client, _) = builder
1100 .build_with_native_executor::<substrate_test_runtime_client::runtime::RuntimeApi, _>(
1101 None,
1102 );
1103 let client = Arc::new(client);
1104 let storage_override = Arc::new(SchemaV3StorageOverride::new(client.clone()));
1106 let indexer_backend = Backend::new(
1108 BackendConfig::Sqlite(SqliteBackendConfig {
1109 path: Path::new("sqlite:///")
1110 .join(tmp.path())
1111 .join("test.db3")
1112 .to_str()
1113 .unwrap(),
1114 create_if_missing: true,
1115 cache_size: 20480,
1116 thread_count: 4,
1117 }),
1118 1,
1119 None,
1120 storage_override.clone(),
1121 )
1122 .await
1123 .expect("indexer pool to be created");
1124
1125 let alice = H160::repeat_byte(0x01);
1128 let bob = H160::repeat_byte(0x02);
1129 let topics_a = H256::repeat_byte(0x01);
1131 let topics_b = H256::repeat_byte(0x02);
1132 let topics_c = H256::repeat_byte(0x03);
1133 let topics_d = H256::repeat_byte(0x04);
1134 let substrate_hash_1 = H256::repeat_byte(0x05);
1136 let substrate_hash_2 = H256::repeat_byte(0x06);
1137 let substrate_hash_3 = H256::repeat_byte(0x07);
1138 let ethereum_hash_1 = H256::repeat_byte(0x08);
1140 let ethereum_hash_2 = H256::repeat_byte(0x09);
1141 let ethereum_hash_3 = H256::repeat_byte(0x0a);
1142 let ethereum_storage_schema = EthereumStorageSchema::V3;
1144
1145 let block_entries = vec![
1146 (
1148 1i32,
1149 ethereum_hash_1,
1150 substrate_hash_1,
1151 ethereum_storage_schema,
1152 ),
1153 (
1155 2i32,
1156 ethereum_hash_2,
1157 substrate_hash_2,
1158 ethereum_storage_schema,
1159 ),
1160 (
1162 3i32,
1163 ethereum_hash_3,
1164 substrate_hash_3,
1165 ethereum_storage_schema,
1166 ),
1167 ];
1168 let mut builder = QueryBuilder::new(
1169 "INSERT INTO blocks(
1170 block_number,
1171 ethereum_block_hash,
1172 substrate_block_hash,
1173 ethereum_storage_schema,
1174 is_canon
1175 )",
1176 );
1177 builder.push_values(block_entries, |mut b, entry| {
1178 let block_number = entry.0;
1179 let ethereum_block_hash = entry.1.as_bytes().to_owned();
1180 let substrate_block_hash = entry.2.as_bytes().to_owned();
1181 let ethereum_storage_schema = entry.3.encode();
1182
1183 b.push_bind(block_number);
1184 b.push_bind(ethereum_block_hash);
1185 b.push_bind(substrate_block_hash);
1186 b.push_bind(ethereum_storage_schema);
1187 b.push_bind(1i32);
1188 });
1189 let query = builder.build();
1190 let _ = query
1191 .execute(indexer_backend.pool())
1192 .await
1193 .expect("insert should succeed");
1194
1195 let log_1_abcd_0_0_alice = Log {
1197 block_number: 1,
1198 address: alice,
1199 topics: [topics_a, topics_b, topics_c, topics_d],
1200 log_index: 0,
1201 transaction_index: 0,
1202 substrate_block_hash: substrate_hash_1,
1203 ethereum_block_hash: ethereum_hash_1,
1204 };
1205 let log_1_dcba_1_0_alice = Log {
1206 block_number: 1,
1207 address: alice,
1208 topics: [topics_d, topics_c, topics_b, topics_a],
1209 log_index: 1,
1210 transaction_index: 0,
1211 substrate_block_hash: substrate_hash_1,
1212 ethereum_block_hash: ethereum_hash_1,
1213 };
1214 let log_1_badc_2_0_alice = Log {
1215 block_number: 1,
1216 address: alice,
1217 topics: [topics_b, topics_a, topics_d, topics_c],
1218 log_index: 2,
1219 transaction_index: 0,
1220 substrate_block_hash: substrate_hash_1,
1221 ethereum_block_hash: ethereum_hash_1,
1222 };
1223 let log_2_abcd_0_0_bob = Log {
1224 block_number: 2,
1225 address: bob,
1226 topics: [topics_a, topics_b, topics_c, topics_d],
1227 log_index: 0,
1228 transaction_index: 0,
1229 substrate_block_hash: substrate_hash_2,
1230 ethereum_block_hash: ethereum_hash_2,
1231 };
1232 let log_2_dcba_1_0_bob = Log {
1233 block_number: 2,
1234 address: bob,
1235 topics: [topics_d, topics_c, topics_b, topics_a],
1236 log_index: 1,
1237 transaction_index: 0,
1238 substrate_block_hash: substrate_hash_2,
1239 ethereum_block_hash: ethereum_hash_2,
1240 };
1241 let log_2_badc_2_0_bob = Log {
1242 block_number: 2,
1243 address: bob,
1244 topics: [topics_b, topics_a, topics_d, topics_c],
1245 log_index: 2,
1246 transaction_index: 0,
1247 substrate_block_hash: substrate_hash_2,
1248 ethereum_block_hash: ethereum_hash_2,
1249 };
1250
1251 let log_3_abcd_0_0_bob = Log {
1252 block_number: 3,
1253 address: bob,
1254 topics: [topics_a, topics_b, topics_c, topics_d],
1255 log_index: 0,
1256 transaction_index: 0,
1257 substrate_block_hash: substrate_hash_3,
1258 ethereum_block_hash: ethereum_hash_3,
1259 };
1260 let log_3_dcba_1_0_bob = Log {
1261 block_number: 3,
1262 address: bob,
1263 topics: [topics_d, topics_c, topics_b, topics_a],
1264 log_index: 1,
1265 transaction_index: 0,
1266 substrate_block_hash: substrate_hash_3,
1267 ethereum_block_hash: ethereum_hash_3,
1268 };
1269 let log_3_badc_2_0_bob = Log {
1270 block_number: 3,
1271 address: bob,
1272 topics: [topics_b, topics_a, topics_d, topics_c],
1273 log_index: 2,
1274 transaction_index: 0,
1275 substrate_block_hash: substrate_hash_3,
1276 ethereum_block_hash: ethereum_hash_3,
1277 };
1278
1279 let log_entries = vec![
1280 log_1_abcd_0_0_alice.clone(),
1282 log_1_dcba_1_0_alice.clone(),
1283 log_1_badc_2_0_alice.clone(),
1284 log_2_abcd_0_0_bob.clone(),
1286 log_2_dcba_1_0_bob.clone(),
1287 log_2_badc_2_0_bob.clone(),
1288 log_3_abcd_0_0_bob.clone(),
1290 log_3_dcba_1_0_bob.clone(),
1291 log_3_badc_2_0_bob.clone(),
1292 ];
1293
1294 let mut builder: QueryBuilder<sqlx::Sqlite> = QueryBuilder::new(
1295 "INSERT INTO logs(
1296 address,
1297 topic_1,
1298 topic_2,
1299 topic_3,
1300 topic_4,
1301 log_index,
1302 transaction_index,
1303 substrate_block_hash
1304 )",
1305 );
1306 builder.push_values(log_entries, |mut b, entry| {
1307 let address = entry.address.as_bytes().to_owned();
1308 let topic_1 = entry.topics[0].as_bytes().to_owned();
1309 let topic_2 = entry.topics[1].as_bytes().to_owned();
1310 let topic_3 = entry.topics[2].as_bytes().to_owned();
1311 let topic_4 = entry.topics[3].as_bytes().to_owned();
1312 let log_index = entry.log_index;
1313 let transaction_index = entry.transaction_index;
1314 let substrate_block_hash = entry.substrate_block_hash.as_bytes().to_owned();
1315
1316 b.push_bind(address);
1317 b.push_bind(topic_1);
1318 b.push_bind(topic_2);
1319 b.push_bind(topic_3);
1320 b.push_bind(topic_4);
1321 b.push_bind(log_index);
1322 b.push_bind(transaction_index);
1323 b.push_bind(substrate_block_hash);
1324 });
1325 let query = builder.build();
1326 let _ = query.execute(indexer_backend.pool()).await;
1327
1328 TestData {
1329 alice,
1330 bob,
1331 topics_a,
1332 topics_b,
1333 topics_c,
1334 topics_d,
1335 substrate_hash_1,
1336 substrate_hash_2,
1337 substrate_hash_3,
1338 ethereum_hash_1,
1339 ethereum_hash_2,
1340 ethereum_hash_3,
1341 backend: indexer_backend,
1342 log_1_abcd_0_0_alice,
1343 log_1_dcba_1_0_alice,
1344 log_1_badc_2_0_alice,
1345 log_2_abcd_0_0_bob,
1346 log_2_dcba_1_0_bob,
1347 log_2_badc_2_0_bob,
1348 log_3_abcd_0_0_bob,
1349 log_3_dcba_1_0_bob,
1350 log_3_badc_2_0_bob,
1351 }
1352 }
1353
1354 async fn run_test_case(
1355 backend: Backend<OpaqueBlock>,
1356 test_case: &TestFilter,
1357 ) -> Result<Vec<FilteredLog<OpaqueBlock>>, String> {
1358 backend
1359 .log_indexer()
1360 .filter_logs(
1361 test_case.from_block,
1362 test_case.to_block,
1363 test_case.addresses.clone(),
1364 test_case.topics.clone(),
1365 )
1366 .await
1367 }
1368
1369 async fn assert_blocks_canon(pool: &SqlitePool, expected: Vec<(H256, u32)>) {
1370 let actual: Vec<(H256, u32)> =
1371 sqlx::query("SELECT substrate_block_hash, is_canon FROM blocks")
1372 .map(|row: SqliteRow| (H256::from_slice(&row.get::<Vec<u8>, _>(0)[..]), row.get(1)))
1373 .fetch_all(pool)
1374 .await
1375 .expect("sql query must succeed");
1376 assert_eq!(expected, actual);
1377 }
1378
1379 #[tokio::test]
1380 async fn genesis_works() {
1381 let TestData { backend, .. } = prepare().await;
1382 let filter = TestFilter {
1383 from_block: 0,
1384 to_block: 0,
1385 addresses: vec![],
1386 topics: vec![],
1387 expected_result: vec![],
1388 };
1389 let result = run_test_case(backend, &filter).await.expect("must succeed");
1390 assert_eq!(result, filter.expected_result);
1391 }
1392
1393 #[tokio::test]
1394 async fn unsanitized_input_works() {
1395 let TestData { backend, .. } = prepare().await;
1396 let filter = TestFilter {
1397 from_block: 0,
1398 to_block: 0,
1399 addresses: vec![],
1400 topics: vec![vec![], vec![], vec![], vec![]],
1401 expected_result: vec![],
1402 };
1403 let result = run_test_case(backend, &filter).await.expect("must succeed");
1404 assert_eq!(result, filter.expected_result);
1405 }
1406
1407 #[tokio::test]
1408 async fn test_malformed_topic_cleans_invalid_options() {
1409 let TestData {
1410 backend,
1411 topics_a,
1412 topics_b,
1413 topics_d,
1414 log_1_badc_2_0_alice,
1415 ..
1416 } = prepare().await;
1417
1418 let filter = TestFilter {
1420 from_block: 0,
1421 to_block: 1,
1422 addresses: vec![],
1423 topics: vec![vec![topics_a, topics_b], vec![topics_a], vec![topics_d]],
1424 expected_result: vec![log_1_badc_2_0_alice.into()],
1425 };
1426 let result = run_test_case(backend, &filter).await.expect("must succeed");
1427 assert_eq!(result, filter.expected_result);
1428 }
1429
1430 #[tokio::test]
1431 async fn block_range_works() {
1432 let TestData {
1433 backend,
1434 log_1_abcd_0_0_alice,
1435 log_1_dcba_1_0_alice,
1436 log_1_badc_2_0_alice,
1437 log_2_abcd_0_0_bob,
1438 log_2_dcba_1_0_bob,
1439 log_2_badc_2_0_bob,
1440 ..
1441 } = prepare().await;
1442
1443 let filter = TestFilter {
1444 from_block: 0,
1445 to_block: 2,
1446 addresses: vec![],
1447 topics: vec![],
1448 expected_result: vec![
1449 log_1_abcd_0_0_alice.into(),
1450 log_1_dcba_1_0_alice.into(),
1451 log_1_badc_2_0_alice.into(),
1452 log_2_abcd_0_0_bob.into(),
1453 log_2_dcba_1_0_bob.into(),
1454 log_2_badc_2_0_bob.into(),
1455 ],
1456 };
1457 let result = run_test_case(backend, &filter).await.expect("must succeed");
1458 assert_eq!(result, filter.expected_result);
1459 }
1460
1461 #[tokio::test]
1462 async fn address_filter_works() {
1463 let TestData {
1464 backend,
1465 alice,
1466 log_1_abcd_0_0_alice,
1467 log_1_dcba_1_0_alice,
1468 log_1_badc_2_0_alice,
1469 ..
1470 } = prepare().await;
1471 let filter = TestFilter {
1472 from_block: 0,
1473 to_block: 3,
1474 addresses: vec![alice],
1475 topics: vec![],
1476 expected_result: vec![
1477 log_1_abcd_0_0_alice.into(),
1478 log_1_dcba_1_0_alice.into(),
1479 log_1_badc_2_0_alice.into(),
1480 ],
1481 };
1482 let result = run_test_case(backend, &filter).await.expect("must succeed");
1483 assert_eq!(result, filter.expected_result);
1484 }
1485
1486 #[tokio::test]
1487 async fn topic_filter_works() {
1488 let TestData {
1489 backend,
1490 topics_d,
1491 log_1_dcba_1_0_alice,
1492 log_2_dcba_1_0_bob,
1493 log_3_dcba_1_0_bob,
1494 ..
1495 } = prepare().await;
1496 let filter = TestFilter {
1497 from_block: 0,
1498 to_block: 3,
1499 addresses: vec![],
1500 topics: vec![vec![topics_d]],
1501 expected_result: vec![
1502 log_1_dcba_1_0_alice.into(),
1503 log_2_dcba_1_0_bob.into(),
1504 log_3_dcba_1_0_bob.into(),
1505 ],
1506 };
1507 let result = run_test_case(backend, &filter).await.expect("must succeed");
1508 assert_eq!(result, filter.expected_result);
1509 }
1510
1511 #[tokio::test]
1512 async fn test_filters_address_and_topic() {
1513 let TestData {
1514 backend,
1515 bob,
1516 topics_b,
1517 log_2_badc_2_0_bob,
1518 log_3_badc_2_0_bob,
1519 ..
1520 } = prepare().await;
1521 let filter = TestFilter {
1522 from_block: 0,
1523 to_block: 3,
1524 addresses: vec![bob],
1525 topics: vec![vec![topics_b]],
1526 expected_result: vec![log_2_badc_2_0_bob.into(), log_3_badc_2_0_bob.into()],
1527 };
1528 let result = run_test_case(backend, &filter).await.expect("must succeed");
1529 assert_eq!(result, filter.expected_result);
1530 }
1531
1532 #[tokio::test]
1533 async fn test_filters_multi_address_and_topic() {
1534 let TestData {
1535 backend,
1536 alice,
1537 bob,
1538 topics_b,
1539 log_1_badc_2_0_alice,
1540 log_2_badc_2_0_bob,
1541 log_3_badc_2_0_bob,
1542 ..
1543 } = prepare().await;
1544 let filter = TestFilter {
1545 from_block: 0,
1546 to_block: 3,
1547 addresses: vec![alice, bob],
1548 topics: vec![vec![topics_b]],
1549 expected_result: vec![
1550 log_1_badc_2_0_alice.into(),
1551 log_2_badc_2_0_bob.into(),
1552 log_3_badc_2_0_bob.into(),
1553 ],
1554 };
1555 let result = run_test_case(backend, &filter).await.expect("must succeed");
1556 assert_eq!(result, filter.expected_result);
1557 }
1558
1559 #[tokio::test]
1560 async fn test_filters_multi_address_and_multi_topic() {
1561 let TestData {
1562 backend,
1563 alice,
1564 bob,
1565 topics_a,
1566 topics_b,
1567 log_1_abcd_0_0_alice,
1568 log_2_abcd_0_0_bob,
1569 log_3_abcd_0_0_bob,
1570 ..
1571 } = prepare().await;
1572 let filter = TestFilter {
1573 from_block: 0,
1574 to_block: 3,
1575 addresses: vec![alice, bob],
1576 topics: vec![vec![topics_a], vec![topics_b]],
1577 expected_result: vec![
1578 log_1_abcd_0_0_alice.into(),
1579 log_2_abcd_0_0_bob.into(),
1580 log_3_abcd_0_0_bob.into(),
1581 ],
1582 };
1583 let result = run_test_case(backend, &filter).await.expect("must succeed");
1584 assert_eq!(result, filter.expected_result);
1585 }
1586
1587 #[tokio::test]
1588 async fn filter_with_topic_wildcards_works() {
1589 let TestData {
1590 backend,
1591 alice,
1592 bob,
1593 topics_d,
1594 topics_b,
1595 log_1_dcba_1_0_alice,
1596 log_2_dcba_1_0_bob,
1597 log_3_dcba_1_0_bob,
1598 ..
1599 } = prepare().await;
1600 let filter = TestFilter {
1601 from_block: 0,
1602 to_block: 3,
1603 addresses: vec![alice, bob],
1604 topics: vec![vec![topics_d], vec![], vec![topics_b]],
1605 expected_result: vec![
1606 log_1_dcba_1_0_alice.into(),
1607 log_2_dcba_1_0_bob.into(),
1608 log_3_dcba_1_0_bob.into(),
1609 ],
1610 };
1611 let result = run_test_case(backend, &filter).await.expect("must succeed");
1612 assert_eq!(result, filter.expected_result);
1613 }
1614
1615 #[tokio::test]
1616 async fn trailing_wildcard_is_useless_but_works() {
1617 let TestData {
1618 alice,
1619 backend,
1620 topics_b,
1621 log_1_dcba_1_0_alice,
1622 ..
1623 } = prepare().await;
1624 let filter = TestFilter {
1625 from_block: 0,
1626 to_block: 1,
1627 addresses: vec![alice],
1628 topics: vec![vec![], vec![], vec![topics_b]],
1629 expected_result: vec![log_1_dcba_1_0_alice.into()],
1630 };
1631 let result = run_test_case(backend, &filter).await.expect("must succeed");
1632 assert_eq!(result, filter.expected_result);
1633 }
1634
1635 #[tokio::test]
1636 async fn filter_with_multi_topic_options_works() {
1637 let TestData {
1638 backend,
1639 topics_a,
1640 topics_d,
1641 log_1_abcd_0_0_alice,
1642 log_1_dcba_1_0_alice,
1643 log_2_abcd_0_0_bob,
1644 log_2_dcba_1_0_bob,
1645 log_3_abcd_0_0_bob,
1646 log_3_dcba_1_0_bob,
1647 ..
1648 } = prepare().await;
1649 let filter = TestFilter {
1650 from_block: 0,
1651 to_block: 3,
1652 addresses: vec![],
1653 topics: vec![vec![topics_a, topics_d]],
1654 expected_result: vec![
1655 log_1_abcd_0_0_alice.into(),
1656 log_1_dcba_1_0_alice.into(),
1657 log_2_abcd_0_0_bob.into(),
1658 log_2_dcba_1_0_bob.into(),
1659 log_3_abcd_0_0_bob.into(),
1660 log_3_dcba_1_0_bob.into(),
1661 ],
1662 };
1663 let result = run_test_case(backend, &filter).await.expect("must succeed");
1664 assert_eq!(result, filter.expected_result);
1665 }
1666
1667 #[tokio::test]
1668 async fn filter_with_multi_topic_options_and_wildcards_works() {
1669 let TestData {
1670 backend,
1671 bob,
1672 topics_a,
1673 topics_b,
1674 topics_c,
1675 topics_d,
1676 log_2_dcba_1_0_bob,
1677 log_2_badc_2_0_bob,
1678 log_3_dcba_1_0_bob,
1679 log_3_badc_2_0_bob,
1680 ..
1681 } = prepare().await;
1682 let filter = TestFilter {
1683 from_block: 0,
1684 to_block: 3,
1685 addresses: vec![bob],
1686 topics: vec![
1688 vec![],
1689 vec![],
1690 vec![topics_b, topics_d],
1691 vec![topics_a, topics_c],
1692 ],
1693 expected_result: vec![
1694 log_2_dcba_1_0_bob.into(),
1695 log_2_badc_2_0_bob.into(),
1696 log_3_dcba_1_0_bob.into(),
1697 log_3_badc_2_0_bob.into(),
1698 ],
1699 };
1700 let result = run_test_case(backend, &filter).await.expect("must succeed");
1701 assert_eq!(result, filter.expected_result);
1702 }
1703
1704 #[tokio::test]
1705 async fn test_canonicalize_sets_canon_flag_for_redacted_and_enacted_blocks_correctly() {
1706 let TestData {
1707 backend,
1708 substrate_hash_1,
1709 substrate_hash_2,
1710 substrate_hash_3,
1711 ..
1712 } = prepare().await;
1713
1714 sqlx::query("UPDATE blocks SET is_canon = 0 WHERE substrate_block_hash = ?")
1716 .bind(substrate_hash_1.as_bytes())
1717 .execute(backend.pool())
1718 .await
1719 .expect("sql query must succeed");
1720 assert_blocks_canon(
1721 backend.pool(),
1722 vec![
1723 (substrate_hash_1, 0),
1724 (substrate_hash_2, 1),
1725 (substrate_hash_3, 1),
1726 ],
1727 )
1728 .await;
1729
1730 backend
1731 .canonicalize(&[substrate_hash_2], &[substrate_hash_1])
1732 .await
1733 .expect("must succeed");
1734
1735 assert_blocks_canon(
1736 backend.pool(),
1737 vec![
1738 (substrate_hash_1, 1),
1739 (substrate_hash_2, 0),
1740 (substrate_hash_3, 1),
1741 ],
1742 )
1743 .await;
1744 }
1745
1746 #[test]
1747 fn test_query_should_be_generated_correctly() {
1748 use sqlx::Execute;
1749
1750 let from_block: u64 = 100;
1751 let to_block: u64 = 500;
1752 let addresses: Vec<H160> = vec![
1753 H160::repeat_byte(0x01),
1754 H160::repeat_byte(0x02),
1755 H160::repeat_byte(0x03),
1756 ];
1757 let topics = [
1758 hashset![
1759 H256::repeat_byte(0x01),
1760 H256::repeat_byte(0x02),
1761 H256::repeat_byte(0x03),
1762 ],
1763 hashset![H256::repeat_byte(0x04), H256::repeat_byte(0x05),],
1764 hashset![],
1765 hashset![H256::repeat_byte(0x06)],
1766 ];
1767
1768 let expected_query_sql = "
1769SELECT
1770 l.substrate_block_hash,
1771 b.ethereum_block_hash,
1772 b.block_number,
1773 b.ethereum_storage_schema,
1774 l.transaction_index,
1775 l.log_index
1776FROM logs AS l
1777INNER JOIN blocks AS b
1778ON (b.block_number BETWEEN ? AND ?) AND b.substrate_block_hash = l.substrate_block_hash AND b.is_canon = 1
1779WHERE 1 AND l.address IN (?, ?, ?) AND l.topic_1 IN (?, ?, ?) AND l.topic_2 IN (?, ?) AND l.topic_4 = ?
1780ORDER BY b.block_number ASC, l.transaction_index ASC, l.log_index ASC
1781LIMIT 10001";
1782
1783 let mut qb = QueryBuilder::new("");
1784 let actual_query_sql = build_query(&mut qb, from_block, to_block, addresses, topics).sql();
1785 assert_eq!(expected_query_sql, actual_query_sql);
1786 }
1787}