ethereum-etl学习3

Posted ⊰失控的疯子⊱☠¥

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ethereum-etl学习3相关的知识,希望对你有一定的参考价值。

ethereum-etl学习3

> ethereumetl stream --start-block 500000 -e block,transaction,log,token_transfer --log-file log.txt \\
--provider-uri https://mainnet.infura.io/v3/7aef3f0cd1f64408b163814b22cc643c

​ 实现区块、交易、日志、货币不断地传输到控制台

function stream

def stream(last_synced_block_file, lag, provider_uri, output, start_block, entity_types,
           period_seconds=10, batch_size=2, block_batch_size=10, max_workers=5, log_file=None, pid_file=None):
    """Streams all data types to console or Google Pub/Sub."""
    configure_logging(log_file)
    configure_signals()
    entity_types = parse_entity_types(entity_types)
    validate_entity_types(entity_types, output)

    from ethereumetl.streaming.eth_streamer_adapter import EthStreamerAdapter
    from blockchainetl.streaming.streamer import Streamer

    # TODO: Implement fallback mechanism for provider uris instead of picking randomly
    provider_uri = pick_random_provider_uri(provider_uri)
    logging.info('Using ' + provider_uri)

    streamer_adapter = EthStreamerAdapter(
        batch_web3_provider=ThreadLocalProxy(lambda: get_provider_from_uri(provider_uri, batch=True)),
        item_exporter=create_item_exporters(output),
        batch_size=batch_size,
        max_workers=max_workers,
        entity_types=entity_types
    )
    streamer = Streamer(
        blockchain_streamer_adapter=streamer_adapter,
        last_synced_block_file=last_synced_block_file,
        lag=lag,
        start_block=start_block,
        period_seconds=period_seconds,
        block_batch_size=block_batch_size,
        pid_file=pid_file
    )
    streamer.stream()
主体

1.日志的配置,以及实体类型的校验

2.随机选取一个提供的uri,构建EthStreamerAdapter对象用于封装Streamer对象

3.调用Streamer的stream方法


class EthStreamerAdapter

class EthStreamerAdapter:
构造(部分)

​ item_id_calculator 实例化一个EthItemIdCalculator对象,主要函数

def calculate(self, item):

​ 对各个实体对象(block,transaction,log,token_transfer,trace,contract,token等)进行数据内容的拼接

​ item_timestamp_calculator 实例化一个EthItemTimestampCalculator对象,主要函数

def calculate(self, item):

​ 不同类型的实体对应的时间戳转换为标准时间

方法
    def get_current_block_number(self):
        w3 = build_web3(self.batch_web3_provider)
        return int(w3.eth.getBlock("latest").number)
主体

​ 获取当前最新的代码块号

def export_all(self, start_block, end_block):
主体

​ 通过调用前面两篇博客学习的方法获取内容输出到目标位置


class EthStreamerAdapter

class Streamer:
构造(部分)

blockchain_streamer_adapter 对应之前构建的EthStreamerAdapter

last_synced_block 最后一次同步的区块,从默认文件读取

方法
def stream(self):
    try:
        if self.pid_file is not None:
            logging.info('Creating pid file '.format(self.pid_file))
            write_to_file(self.pid_file, str(os.getpid()))
        self.blockchain_streamer_adapter.open()
        self._do_stream()
    finally:
        self.blockchain_streamer_adapter.close()
        if self.pid_file is not None:
            logging.info('Deleting pid file '.format(self.pid_file))
            delete_file(self.pid_file)
主体

​ 如果用户指定了pid_file,那么需要写入文件程序运行的pid。然后通过调用适配器的open方法,open方法将打开输出数据的位置(如postgre,kafka以及其他)的写功能,默认是命令行。然后调用_do_stream方法,最后是关闭输出流,删除对应文件。

def _do_stream(self):
    while True and (self.end_block is None or self.last_synced_block < self.end_block):
        synced_blocks = 0

        try:
            synced_blocks = self._sync_cycle()
        except Exception as e:
            # https://stackoverflow.com/a/4992124/1580227
            logging.exception('An exception occurred while syncing block data.')
            if not self.retry_errors:
                raise e

        if synced_blocks <= 0:
            logging.info('Nothing to sync. Sleeping for  seconds...'.format(self.period_seconds))
            time.sleep(self.period_seconds)
主体

​ 一个如果末尾块号为空或者最后一个同步块号小于最后一个块号时的while循环,循环内是一个获取当前需要同步的块数,如果需要同步的块数小于等于0,则休眠period_seconds(默认为10s,因为以太坊平均15秒出块)

def _sync_cycle(self):
    current_block = self.blockchain_streamer_adapter.get_current_block_number()

    target_block = self._calculate_target_block(current_block, self.last_synced_block)
    blocks_to_sync = max(target_block - self.last_synced_block, 0)

    logging.info('Current block , target block , last synced block , blocks to sync '.format(
        current_block, target_block, self.last_synced_block, blocks_to_sync))

    if blocks_to_sync != 0:
        self.blockchain_streamer_adapter.export_all(self.last_synced_block + 1, target_block)
        logging.info('Writing last synced block '.format(target_block))
        write_last_synced_block(self.last_synced_block_file, target_block)
        self.last_synced_block = target_block

    return blocks_to_sync
主体

​ 通过适配器获取当前的块号current_block,然后通过计算获得下一个应该获取的目标块号,然后得到当前应该要同步的块数目。如果需要同步的块数目不为0,则通过适配器函数获取需要同步块内的数据。

以太坊系节点数据清洗组件--Ethereum ETL

  • Ethereum ETL 官方github链接:https://github.com/blockchain-etl/ethereum-etl
  • 作用:将以太坊区块链数据导出到 CSV 或 JSON 文件

最近有位朋友需要做一个链上数据分析的项目,需要将全链的数据清洗一遍,导入数据库以后,进行大数据分析。

那么分析什么呢?分析token的涨跌趋势、成交量;持币地址变化情况;用于参与swap交易的盈亏情况,平均持仓成本;token合约安全性;swap交易资金流入流出等等的一些数据场景。

以上所有功能的基础,都来源于数据,首先第一步就是要搭建一个全量节点,得到稳定的数据源。其次呢,就不得不提到以太坊上的一个知名度很高的数据分析组件。链上追踪工具nansen就是用它做的etl(确切的说,也就是数据清洗组件)

一、环境安装

  • gcc安装
yum -y install gcc
  • ethereumetl 安装
pip3 install ethereum-etl

二、实际操作

2.1 导出区块和交易信息

# ethereumetl export_blocks_and_transactions --start-block 0 --end-block 5000000 --provider-uri http://ip:port/ --blocks-output blocks.csv --transactions-output transactions.csv

2021-11-23 15:10:00,693 - ProgressLogger [INFO] - Started work. Items to process: 51.
2021-11-23 15:10:00,819 - ProgressLogger [INFO] - 51 items processed. Progress is 100%.
2021-11-23 15:10:00,820 - ProgressLogger [INFO] - Finished work. Total items processed: 51. Took 0:00:00.126703.
2021-11-23 15:10:00,820 - CompositeItemExporter [INFO] - block items exported: 51
2021-11-23 15:10:00,820 - CompositeItemExporter [INFO] - transaction items exported: 19
  • export_blocks_and_transactions:导出区块和交易信息指令
  • start-block:起始区块
  • end-block:结束区块
  • provider-uri:请求节点(BSC节点)
  • blocks-output:区块信息输出文件
  • transactions-output:交易信息输出文件

2.2 导出 ERC20 和 ERC721 交易

# ethereumetl export_token_transfers --start-block 0 --end-block 5000000 --provider-uri http://ip:port/ --output token_transfers.csv

2021-11-23 20:13:05,882 - ProgressLogger [INFO] - Started work. Items to process: 51.
2021-11-23 20:13:06,125 - ProgressLogger [INFO] - 51 items processed. Progress is 100%.
2021-11-23 20:13:06,126 - ProgressLogger [INFO] - Finished work. Total items processed: 51. Took 0:00:00.244305.
2021-11-23 20:13:06,126 - CompositeItemExporter [INFO] - token_transfer items exported: 0
  • export_token_transfers:导出 ERC20 和 ERC721 交易指令
  • start-block:起始区块
  • end-block:结束区块
  • provider-uri:请求节点(BSC节点)
  • output:信息输出文件

以上,就是今天分享的全部内容了,希望大家通过以上方式可以解决自己的实际需求,解决自己目前所遇到的问题。

如果文章中有不太正确的地方,欢迎指正,可以扫描下面的二维码,添加我的个人微信,备注:地区-职业方向-昵称,欢迎来撩,加入区块链技术交流群,与更多的区块链技术大佬学习交流。

以上是关于ethereum-etl学习3的主要内容,如果未能解决你的问题,请参考以下文章

区块链学习笔记之以太坊

区块链学习笔记之以太坊

如何从零开始学习区块链技术——推荐从以太坊开发DApp开始

区块链学习--win10下构建以太坊私链

以太坊--2.账户是什么---以太坊的指南针学习记录

区块链学习--以太坊Dapp开发