1use std::{cmp::Ordering, collections::HashSet, num::NonZeroU32, str::FromStr, sync::Arc};
20
21use futures::TryStreamExt;
22use scale_codec::{Decode, Encode};
23use sqlx::{
24 query::Query,
25 sqlite::{
26 SqliteArguments, SqliteConnectOptions, SqlitePool, SqlitePoolOptions, SqliteQueryResult,
27 },
28 ConnectOptions, Error, Execute, QueryBuilder, Row, Sqlite,
29};
30use sc_client_api::backend::{Backend as BackendT, StorageProvider};
32use sp_api::{ApiExt, ProvideRuntimeApi};
33use sp_blockchain::HeaderBackend;
34use sp_core::{H160, H256};
35use sp_runtime::{
36 generic::BlockId,
37 traits::{Block as BlockT, Header as HeaderT, UniqueSaturatedInto, Zero},
38};
39use fc_api::{FilteredLog, TransactionMetadata};
41use fc_storage::{StorageOverride, StorageQuerier};
42use fp_consensus::{FindLogError, Hashes, Log as ConsensusLog, PostLog, PreLog};
43use fp_rpc::EthereumRuntimeRPCApi;
44use fp_storage::EthereumStorageSchema;
45
46const MAX_TOPIC_COUNT: u16 = 4;
48
49#[derive(Debug, Eq, PartialEq)]
51pub struct Log {
52 pub address: Vec<u8>,
53 pub topic_1: Option<Vec<u8>>,
54 pub topic_2: Option<Vec<u8>>,
55 pub topic_3: Option<Vec<u8>>,
56 pub topic_4: Option<Vec<u8>>,
57 pub log_index: i32,
58 pub transaction_index: i32,
59 pub substrate_block_hash: Vec<u8>,
60}
61
62#[derive(Eq, PartialEq)]
64struct BlockMetadata {
65 pub substrate_block_hash: H256,
66 pub block_number: i32,
67 pub post_hashes: Hashes,
68 pub schema: EthereumStorageSchema,
69 pub is_canon: i32,
70}
71
72#[derive(Debug)]
75pub struct SqliteBackendConfig<'a> {
76 pub path: &'a str,
77 pub create_if_missing: bool,
78 pub thread_count: u32,
79 pub cache_size: u64,
80}
81
82#[derive(Debug, Default)]
84pub struct BlockIndexedStatus {
85 pub indexed: bool,
86 pub canon: bool,
87}
88
89#[derive(Debug)]
91pub enum BackendConfig<'a> {
92 Sqlite(SqliteBackendConfig<'a>),
93}
94
95#[derive(Clone)]
96pub struct Backend<Block> {
97 pool: SqlitePool,
99 storage_override: Arc<dyn StorageOverride<Block>>,
101
102 num_ops_timeout: i32,
105}
106
107impl<Block> Backend<Block>
108where
109 Block: BlockT<Hash = H256>,
110{
111 pub async fn new(
113 config: BackendConfig<'_>,
114 pool_size: u32,
115 num_ops_timeout: Option<NonZeroU32>,
116 storage_override: Arc<dyn StorageOverride<Block>>,
117 ) -> Result<Self, Error> {
118 let any_pool = SqlitePoolOptions::new()
119 .max_connections(pool_size)
120 .connect_lazy_with(Self::connect_options(&config)?.disable_statement_logging());
121 let _ = Self::create_database_if_not_exists(&any_pool).await?;
122 let _ = Self::create_indexes_if_not_exist(&any_pool).await?;
123 Ok(Self {
124 pool: any_pool,
125 storage_override,
126 num_ops_timeout: num_ops_timeout
127 .map(|n| n.get())
128 .unwrap_or(0)
129 .try_into()
130 .unwrap_or(i32::MAX),
131 })
132 }
133
134 fn connect_options(config: &BackendConfig) -> Result<SqliteConnectOptions, Error> {
135 match config {
136 BackendConfig::Sqlite(config) => {
137 log::info!(target: "frontier-sql", "📑 Connection configuration: {config:?}");
138 let config = sqlx::sqlite::SqliteConnectOptions::from_str(config.path)?
139 .create_if_missing(config.create_if_missing)
140 .busy_timeout(std::time::Duration::from_secs(8))
142 .pragma("cache_size", format!("-{}", config.cache_size))
144 .pragma("analysis_limit", "1000")
146 .pragma("threads", config.thread_count.to_string())
148 .pragma("temp_store", "memory")
150 .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
152 .synchronous(sqlx::sqlite::SqliteSynchronous::Normal);
154 Ok(config)
155 }
156 }
157 }
158
159 pub fn pool(&self) -> &SqlitePool {
161 &self.pool
162 }
163
164 pub async fn canonicalize(&self, retracted: &[H256], enacted: &[H256]) -> Result<(), Error> {
167 let mut tx = self.pool().begin().await?;
168
169 let mut builder: QueryBuilder<Sqlite> =
171 QueryBuilder::new("UPDATE blocks SET is_canon = 0 WHERE substrate_block_hash IN (");
172 let mut retracted_hashes = builder.separated(", ");
173 for hash in retracted.iter() {
174 let hash = hash.as_bytes();
175 retracted_hashes.push_bind(hash);
176 }
177 retracted_hashes.push_unseparated(")");
178 let query = builder.build();
179 query.execute(&mut *tx).await?;
180
181 let mut builder: QueryBuilder<Sqlite> =
183 QueryBuilder::new("UPDATE blocks SET is_canon = 1 WHERE substrate_block_hash IN (");
184 let mut enacted_hashes = builder.separated(", ");
185 for hash in enacted.iter() {
186 let hash = hash.as_bytes();
187 enacted_hashes.push_bind(hash);
188 }
189 enacted_hashes.push_unseparated(")");
190 let query = builder.build();
191 query.execute(&mut *tx).await?;
192
193 tx.commit().await
194 }
195
196 pub async fn insert_genesis_block_metadata<Client, BE>(
198 &self,
199 client: Arc<Client>,
200 ) -> Result<Option<H256>, Error>
201 where
202 Client: StorageProvider<Block, BE> + HeaderBackend<Block> + 'static,
203 Client: ProvideRuntimeApi<Block>,
204 Client::Api: EthereumRuntimeRPCApi<Block>,
205 BE: BackendT<Block> + 'static,
206 {
207 let id = BlockId::Number(Zero::zero());
208 let substrate_genesis_hash = client
209 .expect_block_hash_from_id(&id)
210 .map_err(|_| Error::Protocol("Cannot resolve genesis hash".to_string()))?;
211 let maybe_substrate_hash: Option<H256> = if let Ok(Some(_)) =
212 client.header(substrate_genesis_hash)
213 {
214 let has_api = client
215 .runtime_api()
216 .has_api_with::<dyn EthereumRuntimeRPCApi<Block>, _>(
217 substrate_genesis_hash,
218 |version| version >= 1,
219 )
220 .expect("runtime api reachable");
221
222 log::debug!(target: "frontier-sql", "Index genesis block, has_api={has_api}, hash={substrate_genesis_hash:?}");
223
224 if has_api {
225 let ethereum_block = client
228 .runtime_api()
229 .current_block(substrate_genesis_hash)
230 .expect("runtime api reachable")
231 .expect("ethereum genesis block");
232
233 let schema = StorageQuerier::new(client)
234 .storage_schema(substrate_genesis_hash)
235 .unwrap_or(EthereumStorageSchema::V3)
236 .encode();
237 let ethereum_block_hash = ethereum_block.header.hash().as_bytes().to_owned();
238 let substrate_block_hash = substrate_genesis_hash.as_bytes();
239 let block_number = 0i32;
240 let is_canon = 1i32;
241
242 let _ = sqlx::query(
243 "INSERT OR IGNORE INTO blocks(
244 ethereum_block_hash,
245 substrate_block_hash,
246 block_number,
247 ethereum_storage_schema,
248 is_canon)
249 VALUES (?, ?, ?, ?, ?)",
250 )
251 .bind(ethereum_block_hash)
252 .bind(substrate_block_hash)
253 .bind(block_number)
254 .bind(schema)
255 .bind(is_canon)
256 .execute(self.pool())
257 .await?;
258 }
259 Some(substrate_genesis_hash)
260 } else {
261 None
262 };
263 Ok(maybe_substrate_hash)
264 }
265
266 fn insert_block_metadata_inner<Client, BE>(
267 client: Arc<Client>,
268 hash: H256,
269 storage_override: &dyn StorageOverride<Block>,
270 ) -> Result<BlockMetadata, Error>
271 where
272 Client: StorageProvider<Block, BE> + HeaderBackend<Block> + 'static,
273 BE: BackendT<Block> + 'static,
274 {
275 log::trace!(target: "frontier-sql", "🛠️ [Metadata] Retrieving digest data for block {hash:?}");
276 if let Ok(Some(header)) = client.header(hash) {
277 match fp_consensus::find_log(header.digest()) {
278 Ok(log) => {
279 let schema = StorageQuerier::new(client.clone())
280 .storage_schema(hash)
281 .unwrap_or(EthereumStorageSchema::V3);
282 let log_hashes = match log {
283 ConsensusLog::Post(PostLog::Hashes(post_hashes)) => post_hashes,
284 ConsensusLog::Post(PostLog::Block(block)) => Hashes::from_block(block),
285 ConsensusLog::Post(PostLog::BlockHash(expect_eth_block_hash)) => {
286 let ethereum_block = storage_override.current_block(hash);
287 match ethereum_block {
288 Some(block) => {
289 let got_eth_block_hash = block.header.hash();
290 if got_eth_block_hash != expect_eth_block_hash {
291 return Err(Error::Protocol(format!(
292 "Ethereum block hash mismatch: \
293 frontier consensus digest ({expect_eth_block_hash:?}), \
294 db state ({got_eth_block_hash:?})"
295 )));
296 } else {
297 Hashes::from_block(block)
298 }
299 }
300 None => {
301 return Err(Error::Protocol(format!(
302 "Missing ethereum block for hash mismatch {expect_eth_block_hash:?}"
303 )))
304 }
305 }
306 }
307 ConsensusLog::Pre(PreLog::Block(block)) => Hashes::from_block(block),
308 };
309
310 let header_number = *header.number();
311 let block_number =
312 UniqueSaturatedInto::<u32>::unique_saturated_into(header_number) as i32;
313 let is_canon = match client.hash(header_number) {
314 Ok(Some(inner_hash)) => (inner_hash == hash) as i32,
315 Ok(None) => {
316 log::debug!(target: "frontier-sql", "[Metadata] Missing header for block #{block_number} ({hash:?})");
317 0
318 }
319 Err(err) => {
320 log::debug!(
321 target: "frontier-sql",
322 "[Metadata] Failed to retrieve header for block #{block_number} ({hash:?}): {err:?}",
323 );
324 0
325 }
326 };
327
328 log::trace!(
329 target: "frontier-sql",
330 "[Metadata] Prepared block metadata for #{block_number} ({hash:?}) canon={is_canon}",
331 );
332 Ok(BlockMetadata {
333 substrate_block_hash: hash,
334 block_number,
335 post_hashes: log_hashes,
336 schema,
337 is_canon,
338 })
339 }
340 Err(FindLogError::NotFound) => Err(Error::Protocol(format!(
341 "[Metadata] No logs found for hash {hash:?}",
342 ))),
343 Err(FindLogError::MultipleLogs) => Err(Error::Protocol(format!(
344 "[Metadata] Multiple logs found for hash {hash:?}",
345 ))),
346 }
347 } else {
348 Err(Error::Protocol(format!(
349 "[Metadata] Failed retrieving header for hash {hash:?}"
350 )))
351 }
352 }
353
354 pub async fn insert_block_metadata<Client, BE>(
356 &self,
357 client: Arc<Client>,
358 hash: H256,
359 ) -> Result<(), Error>
360 where
361 Client: StorageProvider<Block, BE> + HeaderBackend<Block> + 'static,
362 BE: BackendT<Block> + 'static,
363 {
364 let storage_override = self.storage_override.clone();
366 let metadata = tokio::task::spawn_blocking(move || {
367 Self::insert_block_metadata_inner(client.clone(), hash, &*storage_override)
368 })
369 .await
370 .map_err(|_| Error::Protocol("tokio blocking metadata task failed".to_string()))??;
371
372 let mut tx = self.pool().begin().await?;
373
374 log::debug!(
375 target: "frontier-sql",
376 "🛠️ [Metadata] Starting execution of statements on db transaction"
377 );
378 let post_hashes = metadata.post_hashes;
379 let ethereum_block_hash = post_hashes.block_hash.as_bytes();
380 let substrate_block_hash = metadata.substrate_block_hash.as_bytes();
381 let schema = metadata.schema.encode();
382 let block_number = metadata.block_number;
383 let is_canon = metadata.is_canon;
384
385 let _ = sqlx::query(
386 "INSERT OR IGNORE INTO blocks(
387 ethereum_block_hash,
388 substrate_block_hash,
389 block_number,
390 ethereum_storage_schema,
391 is_canon)
392 VALUES (?, ?, ?, ?, ?)",
393 )
394 .bind(ethereum_block_hash)
395 .bind(substrate_block_hash)
396 .bind(block_number)
397 .bind(schema)
398 .bind(is_canon)
399 .execute(&mut *tx)
400 .await?;
401 for (i, &transaction_hash) in post_hashes.transaction_hashes.iter().enumerate() {
402 let ethereum_transaction_hash = transaction_hash.as_bytes();
403 let ethereum_transaction_index = i as i32;
404 log::trace!(
405 target: "frontier-sql",
406 "[Metadata] Inserting TX for block #{block_number} - {transaction_hash:?} index {ethereum_transaction_index}",
407 );
408 let _ = sqlx::query(
409 "INSERT OR IGNORE INTO transactions(
410 ethereum_transaction_hash,
411 substrate_block_hash,
412 ethereum_block_hash,
413 ethereum_transaction_index)
414 VALUES (?, ?, ?, ?)",
415 )
416 .bind(ethereum_transaction_hash)
417 .bind(substrate_block_hash)
418 .bind(ethereum_block_hash)
419 .bind(ethereum_transaction_index)
420 .execute(&mut *tx)
421 .await?;
422 }
423
424 sqlx::query("INSERT INTO sync_status(substrate_block_hash) VALUES (?)")
425 .bind(hash.as_bytes())
426 .execute(&mut *tx)
427 .await?;
428
429 log::debug!(target: "frontier-sql", "[Metadata] Ready to commit");
430 tx.commit().await
431 }
432
433 pub async fn index_block_logs(&self, block_hash: Block::Hash) {
435 let pool = self.pool().clone();
436 let storage_override = self.storage_override.clone();
437 let _ = async {
438 let mut tx = pool.begin().await?;
447 match sqlx::query(
449 "UPDATE sync_status
450 SET status = 1
451 WHERE substrate_block_hash IN
452 (SELECT substrate_block_hash
453 FROM sync_status
454 WHERE status = 0 AND substrate_block_hash = ?) RETURNING substrate_block_hash",
455 )
456 .bind(block_hash.as_bytes())
457 .fetch_one(&mut *tx)
458 .await
459 {
460 Ok(_) => {
461 let logs = tokio::task::spawn_blocking(move || {
463 Self::get_logs(storage_override, block_hash)
464 })
465 .await
466 .map_err(|_| Error::Protocol("tokio blocking task failed".to_string()))?;
467
468 for log in logs {
469 let _ = sqlx::query(
470 "INSERT OR IGNORE INTO logs(
471 address,
472 topic_1,
473 topic_2,
474 topic_3,
475 topic_4,
476 log_index,
477 transaction_index,
478 substrate_block_hash)
479 VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
480 )
481 .bind(log.address)
482 .bind(log.topic_1)
483 .bind(log.topic_2)
484 .bind(log.topic_3)
485 .bind(log.topic_4)
486 .bind(log.log_index)
487 .bind(log.transaction_index)
488 .bind(log.substrate_block_hash)
489 .execute(&mut *tx)
490 .await?;
491 }
492 Ok(tx.commit().await?)
493 }
494 Err(e) => Err(e),
495 }
496 }
497 .await
498 .map_err(|e| {
499 log::error!(target: "frontier-sql", "{e}");
500 });
501 let _ = sqlx::query("PRAGMA optimize").execute(&pool).await;
503 log::debug!(target: "frontier-sql", "Batch committed");
504 }
505
506 fn get_logs(
507 storage_override: Arc<dyn StorageOverride<Block>>,
508 substrate_block_hash: H256,
509 ) -> Vec<Log> {
510 let mut logs: Vec<Log> = vec![];
511 let mut transaction_count: usize = 0;
512 let mut log_count: usize = 0;
513 let receipts = storage_override
514 .current_receipts(substrate_block_hash)
515 .unwrap_or_default();
516
517 transaction_count += receipts.len();
518 for (transaction_index, receipt) in receipts.iter().enumerate() {
519 let receipt_logs = match receipt {
520 ethereum::ReceiptV4::Legacy(d)
521 | ethereum::ReceiptV4::EIP2930(d)
522 | ethereum::ReceiptV4::EIP1559(d)
523 | ethereum::ReceiptV4::EIP7702(d) => &d.logs,
524 };
525 let transaction_index = transaction_index as i32;
526 log_count += receipt_logs.len();
527 for (log_index, log) in receipt_logs.iter().enumerate() {
528 #[allow(clippy::get_first)]
529 logs.push(Log {
530 address: log.address.as_bytes().to_owned(),
531 topic_1: log.topics.get(0).map(|l| l.as_bytes().to_owned()),
532 topic_2: log.topics.get(1).map(|l| l.as_bytes().to_owned()),
533 topic_3: log.topics.get(2).map(|l| l.as_bytes().to_owned()),
534 topic_4: log.topics.get(3).map(|l| l.as_bytes().to_owned()),
535 log_index: log_index as i32,
536 transaction_index,
537 substrate_block_hash: substrate_block_hash.as_bytes().to_owned(),
538 });
539 }
540 }
541 log::debug!(
542 target: "frontier-sql",
543 "Ready to commit {log_count} logs from {transaction_count} transactions"
544 );
545 logs
546 }
547
548 pub async fn is_block_indexed(&self, block_hash: Block::Hash) -> bool {
550 sqlx::query("SELECT substrate_block_hash FROM sync_status WHERE substrate_block_hash = ?")
551 .bind(block_hash.as_bytes().to_owned())
552 .fetch_optional(self.pool())
553 .await
554 .map(|r| r.is_some())
555 .unwrap_or(false)
556 }
557
558 pub async fn block_indexed_and_canon_status(
560 &self,
561 block_hash: Block::Hash,
562 ) -> BlockIndexedStatus {
563 sqlx::query(
564 "SELECT b.is_canon FROM sync_status AS s
565 INNER JOIN blocks AS b
566 ON s.substrate_block_hash = b.substrate_block_hash
567 WHERE s.substrate_block_hash = ?",
568 )
569 .bind(block_hash.as_bytes().to_owned())
570 .fetch_optional(self.pool())
571 .await
572 .map(|result| {
573 result
574 .map(|row| {
575 let is_canon: i32 = row.get(0);
576 BlockIndexedStatus {
577 indexed: true,
578 canon: is_canon != 0,
579 }
580 })
581 .unwrap_or_default()
582 })
583 .unwrap_or_default()
584 }
585
586 pub async fn set_block_as_canon(&self, block_hash: H256) -> Result<SqliteQueryResult, Error> {
588 sqlx::query("UPDATE blocks SET is_canon = 1 WHERE substrate_block_hash = ?")
589 .bind(block_hash.as_bytes())
590 .execute(self.pool())
591 .await
592 }
593
594 pub async fn get_first_missing_canon_block(&self) -> Option<u32> {
598 match sqlx::query(
599 "SELECT b1.block_number-1
600 FROM blocks as b1
601 WHERE b1.block_number > 0 AND b1.is_canon=1 AND NOT EXISTS (
602 SELECT 1 FROM blocks AS b2
603 WHERE b2.block_number = b1.block_number-1
604 AND b1.is_canon=1
605 AND b2.is_canon=1
606 )
607 ORDER BY block_number LIMIT 1",
608 )
609 .fetch_optional(self.pool())
610 .await
611 {
612 Ok(result) => {
613 if let Some(row) = result {
614 let block_number: u32 = row.get(0);
615 return Some(block_number);
616 }
617 }
618 Err(err) => {
619 log::debug!(target: "frontier-sql", "Failed retrieving missing block {err:?}");
620 }
621 }
622
623 None
624 }
625
626 pub async fn get_first_pending_canon_block(&self) -> Option<H256> {
630 match sqlx::query(
631 "SELECT s.substrate_block_hash FROM sync_status AS s
632 INNER JOIN blocks as b
633 ON s.substrate_block_hash = b.substrate_block_hash
634 WHERE b.is_canon = 1 AND s.status = 0
635 ORDER BY b.block_number LIMIT 1",
636 )
637 .fetch_optional(self.pool())
638 .await
639 {
640 Ok(result) => {
641 if let Some(row) = result {
642 let block_hash_bytes: Vec<u8> = row.get(0);
643 let block_hash = H256::from_slice(&block_hash_bytes[..]);
644 return Some(block_hash);
645 }
646 }
647 Err(err) => {
648 log::debug!(target: "frontier-sql", "Failed retrieving missing block {err:?}");
649 }
650 }
651
652 None
653 }
654
655 pub async fn last_indexed_canon_block(&self) -> Result<H256, Error> {
657 let row = sqlx::query(
658 "SELECT b.substrate_block_hash FROM blocks AS b
659 INNER JOIN sync_status AS s
660 ON s.substrate_block_hash = b.substrate_block_hash
661 WHERE b.is_canon=1 AND s.status = 1
662 ORDER BY b.id DESC LIMIT 1",
663 )
664 .fetch_one(self.pool())
665 .await?;
666 Ok(H256::from_slice(
667 &row.try_get::<Vec<u8>, _>(0).unwrap_or_default()[..],
668 ))
669 }
670
671 async fn create_database_if_not_exists(pool: &SqlitePool) -> Result<SqliteQueryResult, Error> {
673 sqlx::query(
674 "BEGIN;
675 CREATE TABLE IF NOT EXISTS logs (
676 id INTEGER PRIMARY KEY,
677 address BLOB NOT NULL,
678 topic_1 BLOB,
679 topic_2 BLOB,
680 topic_3 BLOB,
681 topic_4 BLOB,
682 log_index INTEGER NOT NULL,
683 transaction_index INTEGER NOT NULL,
684 substrate_block_hash BLOB NOT NULL,
685 UNIQUE (
686 log_index,
687 transaction_index,
688 substrate_block_hash
689 )
690 );
691 CREATE TABLE IF NOT EXISTS sync_status (
692 id INTEGER PRIMARY KEY,
693 substrate_block_hash BLOB NOT NULL,
694 status INTEGER DEFAULT 0 NOT NULL,
695 UNIQUE (
696 substrate_block_hash
697 )
698 );
699 CREATE TABLE IF NOT EXISTS blocks (
700 id INTEGER PRIMARY KEY,
701 block_number INTEGER NOT NULL,
702 ethereum_block_hash BLOB NOT NULL,
703 substrate_block_hash BLOB NOT NULL,
704 ethereum_storage_schema BLOB NOT NULL,
705 is_canon INTEGER NOT NULL,
706 UNIQUE (
707 ethereum_block_hash,
708 substrate_block_hash
709 )
710 );
711 CREATE TABLE IF NOT EXISTS transactions (
712 id INTEGER PRIMARY KEY,
713 ethereum_transaction_hash BLOB NOT NULL,
714 substrate_block_hash BLOB NOT NULL,
715 ethereum_block_hash BLOB NOT NULL,
716 ethereum_transaction_index INTEGER NOT NULL,
717 UNIQUE (
718 ethereum_transaction_hash,
719 substrate_block_hash
720 )
721 );
722 COMMIT;",
723 )
724 .execute(pool)
725 .await
726 }
727
728 async fn create_indexes_if_not_exist(pool: &SqlitePool) -> Result<SqliteQueryResult, Error> {
730 sqlx::query(
731 "BEGIN;
732 CREATE INDEX IF NOT EXISTS logs_main_idx ON logs (
733 address,
734 topic_1,
735 topic_2,
736 topic_3,
737 topic_4
738 );
739 CREATE INDEX IF NOT EXISTS logs_substrate_index ON logs (
740 substrate_block_hash
741 );
742 CREATE INDEX IF NOT EXISTS blocks_number_index ON blocks (
743 block_number
744 );
745 CREATE INDEX IF NOT EXISTS blocks_substrate_index ON blocks (
746 substrate_block_hash
747 );
748 CREATE INDEX IF NOT EXISTS eth_block_hash_idx ON blocks (
749 ethereum_block_hash
750 );
751 CREATE INDEX IF NOT EXISTS eth_tx_hash_idx ON transactions (
752 ethereum_transaction_hash
753 );
754 CREATE INDEX IF NOT EXISTS eth_tx_hash_2_idx ON transactions (
755 ethereum_block_hash,
756 ethereum_transaction_index
757 );
758 COMMIT;",
759 )
760 .execute(pool)
761 .await
762 }
763}
764
765#[async_trait::async_trait]
766impl<Block: BlockT<Hash = H256>> fc_api::Backend<Block> for Backend<Block> {
767 async fn block_hash(
768 &self,
769 ethereum_block_hash: &H256,
770 ) -> Result<Option<Vec<Block::Hash>>, String> {
771 let ethereum_block_hash = ethereum_block_hash.as_bytes();
772 let res =
773 sqlx::query("SELECT substrate_block_hash FROM blocks WHERE ethereum_block_hash = ?")
774 .bind(ethereum_block_hash)
775 .fetch_all(&self.pool)
776 .await
777 .ok()
778 .map(|rows| {
779 rows.iter()
780 .map(|row| {
781 H256::from_slice(&row.try_get::<Vec<u8>, _>(0).unwrap_or_default()[..])
782 })
783 .collect()
784 });
785 Ok(res)
786 }
787
788 async fn transaction_metadata(
789 &self,
790 ethereum_transaction_hash: &H256,
791 ) -> Result<Vec<TransactionMetadata<Block>>, String> {
792 let ethereum_transaction_hash = ethereum_transaction_hash.as_bytes();
793 let out = sqlx::query(
794 "SELECT
795 substrate_block_hash, ethereum_block_hash, ethereum_transaction_index
796 FROM transactions WHERE ethereum_transaction_hash = ?",
797 )
798 .bind(ethereum_transaction_hash)
799 .fetch_all(&self.pool)
800 .await
801 .unwrap_or_default()
802 .iter()
803 .map(|row| {
804 let substrate_block_hash =
805 H256::from_slice(&row.try_get::<Vec<u8>, _>(0).unwrap_or_default()[..]);
806 let ethereum_block_hash =
807 H256::from_slice(&row.try_get::<Vec<u8>, _>(1).unwrap_or_default()[..]);
808 let ethereum_transaction_index = row.try_get::<i32, _>(2).unwrap_or_default() as u32;
809 TransactionMetadata {
810 substrate_block_hash,
811 ethereum_block_hash,
812 ethereum_index: ethereum_transaction_index,
813 }
814 })
815 .collect();
816
817 Ok(out)
818 }
819
820 fn log_indexer(&self) -> &dyn fc_api::LogIndexerBackend<Block> {
821 self
822 }
823
824 async fn first_block_hash(&self) -> Result<Block::Hash, String> {
825 sqlx::query("SELECT substrate_block_hash FROM blocks ORDER BY block_number ASC LIMIT 1")
827 .fetch_one(self.pool())
828 .await
829 .map(|row| H256::from_slice(&row.get::<Vec<u8>, _>(0)[..]))
830 .map_err(|e| format!("Failed to fetch oldest block hash: {e}"))
831 }
832
833 async fn latest_block_hash(&self) -> Result<Block::Hash, String> {
834 sqlx::query("SELECT substrate_block_hash FROM blocks ORDER BY block_number DESC LIMIT 1")
836 .fetch_one(self.pool())
837 .await
838 .map(|row| H256::from_slice(&row.get::<Vec<u8>, _>(0)[..]))
839 .map_err(|e| format!("Failed to fetch best hash: {e}"))
840 }
841}
842
843#[async_trait::async_trait]
844impl<Block: BlockT<Hash = H256>> fc_api::LogIndexerBackend<Block> for Backend<Block> {
845 fn is_indexed(&self) -> bool {
846 true
847 }
848
849 async fn filter_logs(
850 &self,
851 from_block: u64,
852 to_block: u64,
853 addresses: Vec<H160>,
854 topics: Vec<Vec<Option<H256>>>,
855 ) -> Result<Vec<FilteredLog<Block>>, String> {
856 let mut unique_topics: [HashSet<H256>; 4] = [
857 HashSet::new(),
858 HashSet::new(),
859 HashSet::new(),
860 HashSet::new(),
861 ];
862 for topic_combination in topics.into_iter() {
863 for (topic_index, topic) in topic_combination.into_iter().enumerate() {
864 if topic_index == MAX_TOPIC_COUNT as usize {
865 return Err("Invalid topic input. Maximum length is 4.".to_string());
866 }
867
868 if let Some(topic) = topic {
869 unique_topics[topic_index].insert(topic);
870 }
871 }
872 }
873
874 let log_key = format!("{from_block}-{to_block}-{addresses:?}-{unique_topics:?}");
875 let mut qb = QueryBuilder::new("");
876 let query = build_query(&mut qb, from_block, to_block, addresses, unique_topics);
877 let sql = query.sql();
878
879 let mut conn = self
880 .pool()
881 .acquire()
882 .await
883 .map_err(|err| format!("failed acquiring sqlite connection: {err}"))?;
884 let log_key2 = log_key.clone();
885 conn.lock_handle()
886 .await
887 .map_err(|err| format!("{err:?}"))?
888 .set_progress_handler(self.num_ops_timeout, move || {
889 log::debug!(target: "frontier-sql", "Sqlite progress_handler triggered for {log_key2}");
890 false
891 });
892 log::debug!(target: "frontier-sql", "Query: {sql:?} - {log_key}");
893
894 let mut out: Vec<FilteredLog<Block>> = vec![];
895 let mut rows = query.fetch(&mut *conn);
896 let maybe_err = loop {
897 match rows.try_next().await {
898 Ok(Some(row)) => {
899 let substrate_block_hash =
901 H256::from_slice(&row.try_get::<Vec<u8>, _>(0).unwrap_or_default()[..]);
902 let ethereum_block_hash =
904 H256::from_slice(&row.try_get::<Vec<u8>, _>(1).unwrap_or_default()[..]);
905 let block_number = row.try_get::<i32, _>(2).unwrap_or_default() as u32;
907 let ethereum_storage_schema: EthereumStorageSchema =
909 Decode::decode(&mut &row.try_get::<Vec<u8>, _>(3).unwrap_or_default()[..])
910 .map_err(|_| {
911 "Cannot decode EthereumStorageSchema for block".to_string()
912 })?;
913 let transaction_index = row.try_get::<i32, _>(4).unwrap_or_default() as u32;
915 let log_index = row.try_get::<i32, _>(5).unwrap_or_default() as u32;
917 out.push(FilteredLog {
918 substrate_block_hash,
919 ethereum_block_hash,
920 block_number,
921 ethereum_storage_schema,
922 transaction_index,
923 log_index,
924 });
925 }
926 Ok(None) => break None, Err(err) => break Some(err),
928 };
929 };
930 drop(rows);
931 conn.lock_handle()
932 .await
933 .map_err(|err| format!("{err:?}"))?
934 .remove_progress_handler();
935
936 if let Some(err) = maybe_err {
937 log::error!(target: "frontier-sql", "Failed to query sql db: {err:?} - {log_key}");
938 return Err("Failed to query sql db with statement".to_string());
939 }
940
941 log::info!(target: "frontier-sql", "FILTER remove handler - {log_key}");
942 Ok(out)
943 }
944}
945
946fn build_query<'a>(
948 qb: &'a mut QueryBuilder<Sqlite>,
949 from_block: u64,
950 to_block: u64,
951 addresses: Vec<H160>,
952 topics: [HashSet<H256>; 4],
953) -> Query<'a, Sqlite, SqliteArguments<'a>> {
954 qb.push(
955 "
956SELECT
957 l.substrate_block_hash,
958 b.ethereum_block_hash,
959 b.block_number,
960 b.ethereum_storage_schema,
961 l.transaction_index,
962 l.log_index
963FROM logs AS l
964INNER JOIN blocks AS b
965ON (b.block_number BETWEEN ",
966 );
967 qb.separated(" AND ")
968 .push_bind(from_block as i64)
969 .push_bind(to_block as i64)
970 .push_unseparated(")");
971 qb.push(" AND b.substrate_block_hash = l.substrate_block_hash")
972 .push(" AND b.is_canon = 1")
973 .push("\nWHERE 1");
974
975 if !addresses.is_empty() {
976 qb.push(" AND l.address IN (");
977 let mut qb_addr = qb.separated(", ");
978 addresses.iter().for_each(|addr| {
979 qb_addr.push_bind(addr.as_bytes().to_owned());
980 });
981 qb_addr.push_unseparated(")");
982 }
983
984 for (i, topic_options) in topics.iter().enumerate() {
985 match topic_options.len().cmp(&1) {
986 Ordering::Greater => {
987 qb.push(format!(" AND l.topic_{} IN (", i + 1));
988 let mut qb_topic = qb.separated(", ");
989 topic_options.iter().for_each(|t| {
990 qb_topic.push_bind(t.as_bytes().to_owned());
991 });
992 qb_topic.push_unseparated(")");
993 }
994 Ordering::Equal => {
995 qb.push(format!(" AND l.topic_{} = ", i + 1)).push_bind(
996 topic_options
997 .iter()
998 .next()
999 .expect("length is 1, must exist; qed")
1000 .as_bytes()
1001 .to_owned(),
1002 );
1003 }
1004 Ordering::Less => {}
1005 }
1006 }
1007
1008 qb.push(
1009 "
1010ORDER BY b.block_number ASC, l.transaction_index ASC, l.log_index ASC
1011LIMIT 10001",
1012 );
1013
1014 qb.build()
1015}
1016
1017#[cfg(test)]
1018mod test {
1019 use super::*;
1020
1021 use std::path::Path;
1022
1023 use maplit::hashset;
1024 use scale_codec::Encode;
1025 use sqlx::{sqlite::SqliteRow, QueryBuilder, Row, SqlitePool};
1026 use tempfile::tempdir;
1027 use sp_core::{H160, H256};
1029 use sp_runtime::{
1030 generic::{Block, Header},
1031 traits::BlakeTwo256,
1032 };
1033 use substrate_test_runtime_client::{
1034 DefaultTestClientBuilderExt, TestClientBuilder, TestClientBuilderExt,
1035 };
1036 use fc_api::Backend as BackendT;
1038 use fc_storage::SchemaV3StorageOverride;
1039 use fp_storage::{EthereumStorageSchema, PALLET_ETHEREUM_SCHEMA};
1040
1041 type OpaqueBlock =
1042 Block<Header<u64, BlakeTwo256>, substrate_test_runtime_client::runtime::Extrinsic>;
1043
1044 struct TestFilter {
1045 pub from_block: u64,
1046 pub to_block: u64,
1047 pub addresses: Vec<H160>,
1048 pub topics: Vec<Vec<Option<H256>>>,
1049 pub expected_result: Vec<FilteredLog<OpaqueBlock>>,
1050 }
1051
1052 #[derive(Debug, Clone)]
1053 struct Log {
1054 block_number: u32,
1055 address: H160,
1056 topics: [H256; 4],
1057 substrate_block_hash: H256,
1058 ethereum_block_hash: H256,
1059 transaction_index: u32,
1060 log_index: u32,
1061 }
1062
1063 #[allow(unused)]
1064 struct TestData {
1065 backend: Backend<OpaqueBlock>,
1066 alice: H160,
1067 bob: H160,
1068 topics_a: H256,
1069 topics_b: H256,
1070 topics_c: H256,
1071 topics_d: H256,
1072 substrate_hash_1: H256,
1073 substrate_hash_2: H256,
1074 substrate_hash_3: H256,
1075 ethereum_hash_1: H256,
1076 ethereum_hash_2: H256,
1077 ethereum_hash_3: H256,
1078 log_1_abcd_0_0_alice: Log,
1079 log_1_dcba_1_0_alice: Log,
1080 log_1_badc_2_0_alice: Log,
1081 log_2_abcd_0_0_bob: Log,
1082 log_2_dcba_1_0_bob: Log,
1083 log_2_badc_2_0_bob: Log,
1084 log_3_abcd_0_0_bob: Log,
1085 log_3_dcba_1_0_bob: Log,
1086 log_3_badc_2_0_bob: Log,
1087 }
1088
1089 impl From<Log> for FilteredLog<OpaqueBlock> {
1090 fn from(value: Log) -> Self {
1091 Self {
1092 substrate_block_hash: value.substrate_block_hash,
1093 ethereum_block_hash: value.ethereum_block_hash,
1094 block_number: value.block_number,
1095 ethereum_storage_schema: EthereumStorageSchema::V3,
1096 transaction_index: value.transaction_index,
1097 log_index: value.log_index,
1098 }
1099 }
1100 }
1101
1102 async fn prepare() -> TestData {
1103 let tmp = tempdir().expect("create a temporary directory");
1104 let builder = TestClientBuilder::new().add_extra_storage(
1106 PALLET_ETHEREUM_SCHEMA.to_vec(),
1107 Encode::encode(&EthereumStorageSchema::V3),
1108 );
1109 let (client, _) = builder
1111 .build_with_native_executor::<substrate_test_runtime_client::runtime::RuntimeApi, _>(
1112 None,
1113 );
1114 let client = Arc::new(client);
1115 let storage_override = Arc::new(SchemaV3StorageOverride::new(client.clone()));
1117 let indexer_backend = Backend::new(
1119 BackendConfig::Sqlite(SqliteBackendConfig {
1120 path: Path::new("sqlite:///")
1121 .join(tmp.path())
1122 .join("test.db3")
1123 .to_str()
1124 .unwrap(),
1125 create_if_missing: true,
1126 cache_size: 20480,
1127 thread_count: 4,
1128 }),
1129 1,
1130 None,
1131 storage_override.clone(),
1132 )
1133 .await
1134 .expect("indexer pool to be created");
1135
1136 let alice = H160::repeat_byte(0x01);
1139 let bob = H160::repeat_byte(0x02);
1140 let topics_a = H256::repeat_byte(0x01);
1142 let topics_b = H256::repeat_byte(0x02);
1143 let topics_c = H256::repeat_byte(0x03);
1144 let topics_d = H256::repeat_byte(0x04);
1145 let substrate_hash_1 = H256::repeat_byte(0x05);
1147 let substrate_hash_2 = H256::repeat_byte(0x06);
1148 let substrate_hash_3 = H256::repeat_byte(0x07);
1149 let ethereum_hash_1 = H256::repeat_byte(0x08);
1151 let ethereum_hash_2 = H256::repeat_byte(0x09);
1152 let ethereum_hash_3 = H256::repeat_byte(0x0a);
1153 let ethereum_storage_schema = EthereumStorageSchema::V3;
1155
1156 let block_entries = vec![
1157 (
1159 1i32,
1160 ethereum_hash_1,
1161 substrate_hash_1,
1162 ethereum_storage_schema,
1163 ),
1164 (
1166 2i32,
1167 ethereum_hash_2,
1168 substrate_hash_2,
1169 ethereum_storage_schema,
1170 ),
1171 (
1173 3i32,
1174 ethereum_hash_3,
1175 substrate_hash_3,
1176 ethereum_storage_schema,
1177 ),
1178 ];
1179 let mut builder = QueryBuilder::new(
1180 "INSERT INTO blocks(
1181 block_number,
1182 ethereum_block_hash,
1183 substrate_block_hash,
1184 ethereum_storage_schema,
1185 is_canon
1186 )",
1187 );
1188 builder.push_values(block_entries, |mut b, entry| {
1189 let block_number = entry.0;
1190 let ethereum_block_hash = entry.1.as_bytes().to_owned();
1191 let substrate_block_hash = entry.2.as_bytes().to_owned();
1192 let ethereum_storage_schema = entry.3.encode();
1193
1194 b.push_bind(block_number);
1195 b.push_bind(ethereum_block_hash);
1196 b.push_bind(substrate_block_hash);
1197 b.push_bind(ethereum_storage_schema);
1198 b.push_bind(1i32);
1199 });
1200 let query = builder.build();
1201 let _ = query
1202 .execute(indexer_backend.pool())
1203 .await
1204 .expect("insert should succeed");
1205
1206 let log_1_abcd_0_0_alice = Log {
1208 block_number: 1,
1209 address: alice,
1210 topics: [topics_a, topics_b, topics_c, topics_d],
1211 log_index: 0,
1212 transaction_index: 0,
1213 substrate_block_hash: substrate_hash_1,
1214 ethereum_block_hash: ethereum_hash_1,
1215 };
1216 let log_1_dcba_1_0_alice = Log {
1217 block_number: 1,
1218 address: alice,
1219 topics: [topics_d, topics_c, topics_b, topics_a],
1220 log_index: 1,
1221 transaction_index: 0,
1222 substrate_block_hash: substrate_hash_1,
1223 ethereum_block_hash: ethereum_hash_1,
1224 };
1225 let log_1_badc_2_0_alice = Log {
1226 block_number: 1,
1227 address: alice,
1228 topics: [topics_b, topics_a, topics_d, topics_c],
1229 log_index: 2,
1230 transaction_index: 0,
1231 substrate_block_hash: substrate_hash_1,
1232 ethereum_block_hash: ethereum_hash_1,
1233 };
1234 let log_2_abcd_0_0_bob = Log {
1235 block_number: 2,
1236 address: bob,
1237 topics: [topics_a, topics_b, topics_c, topics_d],
1238 log_index: 0,
1239 transaction_index: 0,
1240 substrate_block_hash: substrate_hash_2,
1241 ethereum_block_hash: ethereum_hash_2,
1242 };
1243 let log_2_dcba_1_0_bob = Log {
1244 block_number: 2,
1245 address: bob,
1246 topics: [topics_d, topics_c, topics_b, topics_a],
1247 log_index: 1,
1248 transaction_index: 0,
1249 substrate_block_hash: substrate_hash_2,
1250 ethereum_block_hash: ethereum_hash_2,
1251 };
1252 let log_2_badc_2_0_bob = Log {
1253 block_number: 2,
1254 address: bob,
1255 topics: [topics_b, topics_a, topics_d, topics_c],
1256 log_index: 2,
1257 transaction_index: 0,
1258 substrate_block_hash: substrate_hash_2,
1259 ethereum_block_hash: ethereum_hash_2,
1260 };
1261
1262 let log_3_abcd_0_0_bob = Log {
1263 block_number: 3,
1264 address: bob,
1265 topics: [topics_a, topics_b, topics_c, topics_d],
1266 log_index: 0,
1267 transaction_index: 0,
1268 substrate_block_hash: substrate_hash_3,
1269 ethereum_block_hash: ethereum_hash_3,
1270 };
1271 let log_3_dcba_1_0_bob = Log {
1272 block_number: 3,
1273 address: bob,
1274 topics: [topics_d, topics_c, topics_b, topics_a],
1275 log_index: 1,
1276 transaction_index: 0,
1277 substrate_block_hash: substrate_hash_3,
1278 ethereum_block_hash: ethereum_hash_3,
1279 };
1280 let log_3_badc_2_0_bob = Log {
1281 block_number: 3,
1282 address: bob,
1283 topics: [topics_b, topics_a, topics_d, topics_c],
1284 log_index: 2,
1285 transaction_index: 0,
1286 substrate_block_hash: substrate_hash_3,
1287 ethereum_block_hash: ethereum_hash_3,
1288 };
1289
1290 let log_entries = vec![
1291 log_1_abcd_0_0_alice.clone(),
1293 log_1_dcba_1_0_alice.clone(),
1294 log_1_badc_2_0_alice.clone(),
1295 log_2_abcd_0_0_bob.clone(),
1297 log_2_dcba_1_0_bob.clone(),
1298 log_2_badc_2_0_bob.clone(),
1299 log_3_abcd_0_0_bob.clone(),
1301 log_3_dcba_1_0_bob.clone(),
1302 log_3_badc_2_0_bob.clone(),
1303 ];
1304
1305 let mut builder: QueryBuilder<sqlx::Sqlite> = QueryBuilder::new(
1306 "INSERT INTO logs(
1307 address,
1308 topic_1,
1309 topic_2,
1310 topic_3,
1311 topic_4,
1312 log_index,
1313 transaction_index,
1314 substrate_block_hash
1315 )",
1316 );
1317 builder.push_values(log_entries, |mut b, entry| {
1318 let address = entry.address.as_bytes().to_owned();
1319 let topic_1 = entry.topics[0].as_bytes().to_owned();
1320 let topic_2 = entry.topics[1].as_bytes().to_owned();
1321 let topic_3 = entry.topics[2].as_bytes().to_owned();
1322 let topic_4 = entry.topics[3].as_bytes().to_owned();
1323 let log_index = entry.log_index;
1324 let transaction_index = entry.transaction_index;
1325 let substrate_block_hash = entry.substrate_block_hash.as_bytes().to_owned();
1326
1327 b.push_bind(address);
1328 b.push_bind(topic_1);
1329 b.push_bind(topic_2);
1330 b.push_bind(topic_3);
1331 b.push_bind(topic_4);
1332 b.push_bind(log_index);
1333 b.push_bind(transaction_index);
1334 b.push_bind(substrate_block_hash);
1335 });
1336 let query = builder.build();
1337 let _ = query.execute(indexer_backend.pool()).await;
1338
1339 TestData {
1340 alice,
1341 bob,
1342 topics_a,
1343 topics_b,
1344 topics_c,
1345 topics_d,
1346 substrate_hash_1,
1347 substrate_hash_2,
1348 substrate_hash_3,
1349 ethereum_hash_1,
1350 ethereum_hash_2,
1351 ethereum_hash_3,
1352 backend: indexer_backend,
1353 log_1_abcd_0_0_alice,
1354 log_1_dcba_1_0_alice,
1355 log_1_badc_2_0_alice,
1356 log_2_abcd_0_0_bob,
1357 log_2_dcba_1_0_bob,
1358 log_2_badc_2_0_bob,
1359 log_3_abcd_0_0_bob,
1360 log_3_dcba_1_0_bob,
1361 log_3_badc_2_0_bob,
1362 }
1363 }
1364
1365 async fn run_test_case(
1366 backend: Backend<OpaqueBlock>,
1367 test_case: &TestFilter,
1368 ) -> Result<Vec<FilteredLog<OpaqueBlock>>, String> {
1369 backend
1370 .log_indexer()
1371 .filter_logs(
1372 test_case.from_block,
1373 test_case.to_block,
1374 test_case.addresses.clone(),
1375 test_case.topics.clone(),
1376 )
1377 .await
1378 }
1379
1380 async fn assert_blocks_canon(pool: &SqlitePool, expected: Vec<(H256, u32)>) {
1381 let actual: Vec<(H256, u32)> =
1382 sqlx::query("SELECT substrate_block_hash, is_canon FROM blocks")
1383 .map(|row: SqliteRow| (H256::from_slice(&row.get::<Vec<u8>, _>(0)[..]), row.get(1)))
1384 .fetch_all(pool)
1385 .await
1386 .expect("sql query must succeed");
1387 assert_eq!(expected, actual);
1388 }
1389
1390 #[tokio::test]
1391 async fn genesis_works() {
1392 let TestData { backend, .. } = prepare().await;
1393 let filter = TestFilter {
1394 from_block: 0,
1395 to_block: 0,
1396 addresses: vec![],
1397 topics: vec![],
1398 expected_result: vec![],
1399 };
1400 let result = run_test_case(backend, &filter).await.expect("must succeed");
1401 assert_eq!(result, filter.expected_result);
1402 }
1403
1404 #[tokio::test]
1405 async fn unsanitized_input_works() {
1406 let TestData { backend, .. } = prepare().await;
1407 let filter = TestFilter {
1408 from_block: 0,
1409 to_block: 0,
1410 addresses: vec![],
1411 topics: vec![vec![None], vec![None, None, None]],
1412 expected_result: vec![],
1413 };
1414 let result = run_test_case(backend, &filter).await.expect("must succeed");
1415 assert_eq!(result, filter.expected_result);
1416 }
1417
1418 #[tokio::test]
1419 async fn invalid_topic_input_size_fails() {
1420 let TestData {
1421 backend, topics_a, ..
1422 } = prepare().await;
1423 let filter = TestFilter {
1424 from_block: 0,
1425 to_block: 0,
1426 addresses: vec![],
1427 topics: vec![
1428 vec![Some(topics_a), None, None, None, None],
1429 vec![Some(topics_a), None, None, None],
1430 ],
1431 expected_result: vec![],
1432 };
1433 run_test_case(backend, &filter)
1434 .await
1435 .expect_err("Invalid topic input. Maximum length is 4.");
1436 }
1437
1438 #[tokio::test]
1439 async fn test_malformed_topic_cleans_invalid_options() {
1440 let TestData {
1441 backend,
1442 topics_a,
1443 topics_b,
1444 topics_d,
1445 log_1_badc_2_0_alice,
1446 ..
1447 } = prepare().await;
1448
1449 let filter = TestFilter {
1451 from_block: 0,
1452 to_block: 1,
1453 addresses: vec![],
1454 topics: vec![
1455 vec![Some(topics_a), None, Some(topics_d)],
1456 vec![None], vec![Some(topics_b), Some(topics_a), None],
1458 vec![None, None, None, None], ],
1460 expected_result: vec![log_1_badc_2_0_alice.into()],
1461 };
1462 let result = run_test_case(backend, &filter).await.expect("must succeed");
1463 assert_eq!(result, filter.expected_result);
1464 }
1465
1466 #[tokio::test]
1467 async fn block_range_works() {
1468 let TestData {
1469 backend,
1470 log_1_abcd_0_0_alice,
1471 log_1_dcba_1_0_alice,
1472 log_1_badc_2_0_alice,
1473 log_2_abcd_0_0_bob,
1474 log_2_dcba_1_0_bob,
1475 log_2_badc_2_0_bob,
1476 ..
1477 } = prepare().await;
1478
1479 let filter = TestFilter {
1480 from_block: 0,
1481 to_block: 2,
1482 addresses: vec![],
1483 topics: vec![],
1484 expected_result: vec![
1485 log_1_abcd_0_0_alice.into(),
1486 log_1_dcba_1_0_alice.into(),
1487 log_1_badc_2_0_alice.into(),
1488 log_2_abcd_0_0_bob.into(),
1489 log_2_dcba_1_0_bob.into(),
1490 log_2_badc_2_0_bob.into(),
1491 ],
1492 };
1493 let result = run_test_case(backend, &filter).await.expect("must succeed");
1494 assert_eq!(result, filter.expected_result);
1495 }
1496
1497 #[tokio::test]
1498 async fn address_filter_works() {
1499 let TestData {
1500 backend,
1501 alice,
1502 log_1_abcd_0_0_alice,
1503 log_1_dcba_1_0_alice,
1504 log_1_badc_2_0_alice,
1505 ..
1506 } = prepare().await;
1507 let filter = TestFilter {
1508 from_block: 0,
1509 to_block: 3,
1510 addresses: vec![alice],
1511 topics: vec![],
1512 expected_result: vec![
1513 log_1_abcd_0_0_alice.into(),
1514 log_1_dcba_1_0_alice.into(),
1515 log_1_badc_2_0_alice.into(),
1516 ],
1517 };
1518 let result = run_test_case(backend, &filter).await.expect("must succeed");
1519 assert_eq!(result, filter.expected_result);
1520 }
1521
1522 #[tokio::test]
1523 async fn topic_filter_works() {
1524 let TestData {
1525 backend,
1526 topics_d,
1527 log_1_dcba_1_0_alice,
1528 log_2_dcba_1_0_bob,
1529 log_3_dcba_1_0_bob,
1530 ..
1531 } = prepare().await;
1532 let filter = TestFilter {
1533 from_block: 0,
1534 to_block: 3,
1535 addresses: vec![],
1536 topics: vec![vec![Some(topics_d)]],
1537 expected_result: vec![
1538 log_1_dcba_1_0_alice.into(),
1539 log_2_dcba_1_0_bob.into(),
1540 log_3_dcba_1_0_bob.into(),
1541 ],
1542 };
1543 let result = run_test_case(backend, &filter).await.expect("must succeed");
1544 assert_eq!(result, filter.expected_result);
1545 }
1546
1547 #[tokio::test]
1548 async fn test_filters_address_and_topic() {
1549 let TestData {
1550 backend,
1551 bob,
1552 topics_b,
1553 log_2_badc_2_0_bob,
1554 log_3_badc_2_0_bob,
1555 ..
1556 } = prepare().await;
1557 let filter = TestFilter {
1558 from_block: 0,
1559 to_block: 3,
1560 addresses: vec![bob],
1561 topics: vec![vec![Some(topics_b)]],
1562 expected_result: vec![log_2_badc_2_0_bob.into(), log_3_badc_2_0_bob.into()],
1563 };
1564 let result = run_test_case(backend, &filter).await.expect("must succeed");
1565 assert_eq!(result, filter.expected_result);
1566 }
1567
1568 #[tokio::test]
1569 async fn test_filters_multi_address_and_topic() {
1570 let TestData {
1571 backend,
1572 alice,
1573 bob,
1574 topics_b,
1575 log_1_badc_2_0_alice,
1576 log_2_badc_2_0_bob,
1577 log_3_badc_2_0_bob,
1578 ..
1579 } = prepare().await;
1580 let filter = TestFilter {
1581 from_block: 0,
1582 to_block: 3,
1583 addresses: vec![alice, bob],
1584 topics: vec![vec![Some(topics_b)]],
1585 expected_result: vec![
1586 log_1_badc_2_0_alice.into(),
1587 log_2_badc_2_0_bob.into(),
1588 log_3_badc_2_0_bob.into(),
1589 ],
1590 };
1591 let result = run_test_case(backend, &filter).await.expect("must succeed");
1592 assert_eq!(result, filter.expected_result);
1593 }
1594
1595 #[tokio::test]
1596 async fn test_filters_multi_address_and_multi_topic() {
1597 let TestData {
1598 backend,
1599 alice,
1600 bob,
1601 topics_a,
1602 topics_b,
1603 log_1_abcd_0_0_alice,
1604 log_2_abcd_0_0_bob,
1605 log_3_abcd_0_0_bob,
1606 ..
1607 } = prepare().await;
1608 let filter = TestFilter {
1609 from_block: 0,
1610 to_block: 3,
1611 addresses: vec![alice, bob],
1612 topics: vec![vec![Some(topics_a), Some(topics_b)]],
1613 expected_result: vec![
1614 log_1_abcd_0_0_alice.into(),
1615 log_2_abcd_0_0_bob.into(),
1616 log_3_abcd_0_0_bob.into(),
1617 ],
1618 };
1619 let result = run_test_case(backend, &filter).await.expect("must succeed");
1620 assert_eq!(result, filter.expected_result);
1621 }
1622
1623 #[tokio::test]
1624 async fn filter_with_topic_wildcards_works() {
1625 let TestData {
1626 backend,
1627 alice,
1628 bob,
1629 topics_d,
1630 topics_b,
1631 log_1_dcba_1_0_alice,
1632 log_2_dcba_1_0_bob,
1633 log_3_dcba_1_0_bob,
1634 ..
1635 } = prepare().await;
1636 let filter = TestFilter {
1637 from_block: 0,
1638 to_block: 3,
1639 addresses: vec![alice, bob],
1640 topics: vec![vec![Some(topics_d), None, Some(topics_b)]],
1641 expected_result: vec![
1642 log_1_dcba_1_0_alice.into(),
1643 log_2_dcba_1_0_bob.into(),
1644 log_3_dcba_1_0_bob.into(),
1645 ],
1646 };
1647 let result = run_test_case(backend, &filter).await.expect("must succeed");
1648 assert_eq!(result, filter.expected_result);
1649 }
1650
1651 #[tokio::test]
1652 async fn trailing_wildcard_is_useless_but_works() {
1653 let TestData {
1654 alice,
1655 backend,
1656 topics_b,
1657 log_1_dcba_1_0_alice,
1658 ..
1659 } = prepare().await;
1660 let filter = TestFilter {
1661 from_block: 0,
1662 to_block: 1,
1663 addresses: vec![alice],
1664 topics: vec![vec![None, None, Some(topics_b), None]],
1665 expected_result: vec![log_1_dcba_1_0_alice.into()],
1666 };
1667 let result = run_test_case(backend, &filter).await.expect("must succeed");
1668 assert_eq!(result, filter.expected_result);
1669 }
1670
1671 #[tokio::test]
1672 async fn filter_with_multi_topic_options_works() {
1673 let TestData {
1674 backend,
1675 topics_a,
1676 topics_d,
1677 log_1_abcd_0_0_alice,
1678 log_1_dcba_1_0_alice,
1679 log_2_abcd_0_0_bob,
1680 log_2_dcba_1_0_bob,
1681 log_3_abcd_0_0_bob,
1682 log_3_dcba_1_0_bob,
1683 ..
1684 } = prepare().await;
1685 let filter = TestFilter {
1686 from_block: 0,
1687 to_block: 3,
1688 addresses: vec![],
1689 topics: vec![
1690 vec![Some(topics_a)],
1691 vec![Some(topics_d)],
1692 vec![Some(topics_d)], ],
1694 expected_result: vec![
1695 log_1_abcd_0_0_alice.into(),
1696 log_1_dcba_1_0_alice.into(),
1697 log_2_abcd_0_0_bob.into(),
1698 log_2_dcba_1_0_bob.into(),
1699 log_3_abcd_0_0_bob.into(),
1700 log_3_dcba_1_0_bob.into(),
1701 ],
1702 };
1703 let result = run_test_case(backend, &filter).await.expect("must succeed");
1704 assert_eq!(result, filter.expected_result);
1705 }
1706
1707 #[tokio::test]
1708 async fn filter_with_multi_topic_options_and_wildcards_works() {
1709 let TestData {
1710 backend,
1711 bob,
1712 topics_a,
1713 topics_b,
1714 topics_c,
1715 topics_d,
1716 log_2_dcba_1_0_bob,
1717 log_2_badc_2_0_bob,
1718 log_3_dcba_1_0_bob,
1719 log_3_badc_2_0_bob,
1720 ..
1721 } = prepare().await;
1722 let filter = TestFilter {
1723 from_block: 0,
1724 to_block: 3,
1725 addresses: vec![bob],
1726 topics: vec![
1728 vec![None, None, Some(topics_b), Some(topics_a)],
1729 vec![None, None, Some(topics_b), Some(topics_c)],
1730 vec![None, None, Some(topics_d), Some(topics_a)],
1731 vec![None, None, Some(topics_d), Some(topics_c)],
1732 ],
1733 expected_result: vec![
1734 log_2_dcba_1_0_bob.into(),
1735 log_2_badc_2_0_bob.into(),
1736 log_3_dcba_1_0_bob.into(),
1737 log_3_badc_2_0_bob.into(),
1738 ],
1739 };
1740 let result = run_test_case(backend, &filter).await.expect("must succeed");
1741 assert_eq!(result, filter.expected_result);
1742 }
1743
1744 #[tokio::test]
1745 async fn test_canonicalize_sets_canon_flag_for_redacted_and_enacted_blocks_correctly() {
1746 let TestData {
1747 backend,
1748 substrate_hash_1,
1749 substrate_hash_2,
1750 substrate_hash_3,
1751 ..
1752 } = prepare().await;
1753
1754 sqlx::query("UPDATE blocks SET is_canon = 0 WHERE substrate_block_hash = ?")
1756 .bind(substrate_hash_1.as_bytes())
1757 .execute(backend.pool())
1758 .await
1759 .expect("sql query must succeed");
1760 assert_blocks_canon(
1761 backend.pool(),
1762 vec![
1763 (substrate_hash_1, 0),
1764 (substrate_hash_2, 1),
1765 (substrate_hash_3, 1),
1766 ],
1767 )
1768 .await;
1769
1770 backend
1771 .canonicalize(&[substrate_hash_2], &[substrate_hash_1])
1772 .await
1773 .expect("must succeed");
1774
1775 assert_blocks_canon(
1776 backend.pool(),
1777 vec![
1778 (substrate_hash_1, 1),
1779 (substrate_hash_2, 0),
1780 (substrate_hash_3, 1),
1781 ],
1782 )
1783 .await;
1784 }
1785
1786 #[test]
1787 fn test_query_should_be_generated_correctly() {
1788 use sqlx::Execute;
1789
1790 let from_block: u64 = 100;
1791 let to_block: u64 = 500;
1792 let addresses: Vec<H160> = vec![
1793 H160::repeat_byte(0x01),
1794 H160::repeat_byte(0x02),
1795 H160::repeat_byte(0x03),
1796 ];
1797 let topics = [
1798 hashset![
1799 H256::repeat_byte(0x01),
1800 H256::repeat_byte(0x02),
1801 H256::repeat_byte(0x03),
1802 ],
1803 hashset![H256::repeat_byte(0x04), H256::repeat_byte(0x05),],
1804 hashset![],
1805 hashset![H256::repeat_byte(0x06)],
1806 ];
1807
1808 let expected_query_sql = "
1809SELECT
1810 l.substrate_block_hash,
1811 b.ethereum_block_hash,
1812 b.block_number,
1813 b.ethereum_storage_schema,
1814 l.transaction_index,
1815 l.log_index
1816FROM logs AS l
1817INNER JOIN blocks AS b
1818ON (b.block_number BETWEEN ? AND ?) AND b.substrate_block_hash = l.substrate_block_hash AND b.is_canon = 1
1819WHERE 1 AND l.address IN (?, ?, ?) AND l.topic_1 IN (?, ?, ?) AND l.topic_2 IN (?, ?) AND l.topic_4 = ?
1820ORDER BY b.block_number ASC, l.transaction_index ASC, l.log_index ASC
1821LIMIT 10001";
1822
1823 let mut qb = QueryBuilder::new("");
1824 let actual_query_sql = build_query(&mut qb, from_block, to_block, addresses, topics).sql();
1825 assert_eq!(expected_query_sql, actual_query_sql);
1826 }
1827}