以太坊源码 交易池 基本流程解析

Posted 软件工程小施同学

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了以太坊源码 交易池 基本流程解析相关的知识,希望对你有一定的参考价值。

一、交易池的概念和原理


首先了解一下交易池的工作概况:

 

1、交易池中交易来源:本地的交易和远端节点广播的交易;
2、交易池中交易去向:被Miner模块获取并验证,用于挖矿;挖矿成功后写进区块并被广播;
3、Miner取走交易是复制,交易池中的交易并不减少。直到交易被写进规范链后才从交易池删除;
4、交易如果被写进分叉,交易池中的交易也不减少,等待重新打包。

交易池模块


type TxPool struct {
    config       TxPoolConfig
    chainconfig  *params.ChainConfig
    chain        blockChain
    gasPrice     *big.Int
    txFeed       event.Feed
    scope        event.SubscriptionScope
    chainHeadCh  chan ChainHeadEvent  // txpool订阅区块头的消息
    chainHeadSub event.Subscription  //  区块头消息订阅器,通过它可以取消消息
    signer       types.Signer    //  封装了事务签名处理,椭圆算法
    mu           sync.RWMutex
 
    currentState  *state.StateDB      // 当前头区块对应的状态
    pendingState  *state.ManagedState // 假设pending列表最后一个交易执行完后应该的状态
    currentMaxGas uint64              // Current gas limit for transaction caps
 
    locals  *accountSet // Set of local transaction to exempt from eviction rules
    journal *txJournal  // Journal of local transaction to back up to disk
 
    pending map[common.Address]*txList   // 当前可以处理的交易,key为交易发起者地址,交易按nonce排序
    queue   map[common.Address]*txList   // 当前暂时不能处理的交易
    beats   map[common.Address]time.Time // 每个账号最后一次交易时间
    all     *txLookup                    // 全局交易列表
    priced  *txPricedList                // 所有交易按价格排序
 
    wg sync.WaitGroup // for shutdown sync
 
    homestead bool
}
这里值得注意的是两个map,一个是pending,一个是queue,分别是当前待处理的交易列表和暂时不处理的交易,其中key是交易发起者的地址,交易按照nonce进行排序。这两个列表的维护对于整个交易池的运行至关重要。

           
加入某个节点交易池中pending和queue中分别都持有address A发起的一些交易。可以看到:
1、pending中有tx3、4、5,这意味着address A 发起的交易tx0、1、2都已经被插入规范链了。
2、queue中有tx7、tx8是因为该节点没有收到tx5的交易,所以tx7、8暂时不处理;
3、tx7、8等待处理的时间是有限的,如果超过30分钟(当前时间-beats)还没有收到tx6,则会将7、8抛弃;
4、如果收到一笔新交易,交易池先把该交易往pending中进行比对和替换,如果pending中有则替换,没有则放入queue。所以如果此时收到了tx6,会先对pending进行比较,发现没有tx6,然后放到queue中。
5、如果又收到另一笔tx4,交易池会对原有tx4进行替换,替换的条件是新交易的价值要超过原交易的110%

 

交易池初始化


1、初始化Txpool类,并且从本地文件中加载local交易;
2、订阅规范链更新事件;
3、启动事件监听;

func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool {
    // Sanitize the input to ensure no vulnerable gas prices are set
    config = (&config).sanitize()
 
    // Create the transaction pool with its initial settings
    pool := &TxPool{
        config:      config,
        chainconfig: chainconfig,
        chain:       chain,
        signer:      types.NewEIP155Signer(chainconfig.ChainID),
        pending:     make(map[common.Address]*txList),
        queue:       make(map[common.Address]*txList),
        beats:       make(map[common.Address]time.Time),
        all:         newTxLookup(),
        chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
        gasPrice:    new(big.Int).SetUint64(config.PriceLimit),
    }
    pool.locals = newAccountSet(pool.signer)
    pool.priced = newTxPricedList(pool.all)
    pool.reset(nil, chain.CurrentBlock().Header())
 
    // If local transactions and journaling is enabled, load from disk
    if !config.NoLocals && config.Journal != "" {
        pool.journal = newTxJournal(config.Journal)
 
        if err := pool.journal.load(pool.AddLocals); err != nil {
            log.Warn("Failed to load transaction journal", "err", err)
        }
        if err := pool.journal.rotate(pool.local()); err != nil {
            log.Warn("Failed to rotate transaction journal", "err", err)
        }
    }
    // Subscribe events from blockchain
    pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh)
 
    // Start the event loop and return
    pool.wg.Add(1)
    go pool.loop()
 
    return pool
}
注意:local交易比remote交易具有更高的权限,一是不轻易被替换;二是持久化,即通过一个本地的journal文件保存尚未打包的local交易。所以在节点启动的时候,优先从本地加载local交易。

本地地址会被加入白名单,凡由此地址发送的交易均被认为是local交易,不论是从本地递交还是从远端发送来的。

loop监听哪些事件?

首先,pool.loop()定义了三个定时器:
1、report为每8秒钟报告一次状态;
2、evict为每分钟检测不活动account的交易移除(超过3小时不活动的账户交易要被移除)
3、journal为每小时更新一下本地的local transaction journal。

然后开启监听循环:
1、监听规范链更新事件,重置交易池:pool.reset();
2、监听report.C,定时打印最新pending和queue状态;
3、监听evict.C,定时删除超时交易;
4、监听hournal.C,定时本地储存尚未额打包的local交易;

func (pool *TxPool) loop() {
    defer pool.wg.Done()
 
    // Start the stats reporting and transaction eviction tickers
    var prevPending, prevQueued, prevStales int
 
    report := time.NewTicker(statsReportInterval)
    defer report.Stop()
 
    evict := time.NewTicker(evictionInterval)
    defer evict.Stop()
 
    journal := time.NewTicker(pool.config.Rejournal)
    defer journal.Stop()
 
    // Track the previous head headers for transaction reorgs
    head := pool.chain.CurrentBlock()
 
    // Keep waiting for and reacting to the various events
    for {
        select {
        // Handle ChainHeadEvent
        case ev := <-pool.chainHeadCh:
            if ev.Block != nil {
                pool.mu.Lock()
                if pool.chainconfig.IsHomestead(ev.Block.Number()) {
                    pool.homestead = true
                }
                pool.reset(head.Header(), ev.Block.Header())
                head = ev.Block
 
                pool.mu.Unlock()
            }
        // Be unsubscribed due to system stopped
        case <-pool.chainHeadSub.Err():
            return
 
        // Handle stats reporting ticks
        case <-report.C:
            pool.mu.RLock()
            pending, queued := pool.stats()
            stales := pool.priced.stales
            pool.mu.RUnlock()
 
            if pending != prevPending || queued != prevQueued || stales != prevStales {
                log.Debug("Transaction pool status report", "executable", pending, "queued", queued, "stales", stales)
                prevPending, prevQueued, prevStales = pending, queued, stales
            }
 
        // Handle inactive account transaction eviction
        case <-evict.C:
            pool.mu.Lock()
            for addr := range pool.queue {
                // Skip local transactions from the eviction mechanism
                if pool.locals.contains(addr) {
                    continue
                }
                // Any non-locals old enough should be removed
                if time.Since(pool.beats[addr]) > pool.config.Lifetime {
                    for _, tx := range pool.queue[addr].Flatten() {
                        pool.removeTx(tx.Hash(), true)
                    }
                }
            }
            pool.mu.Unlock()
 
        // Handle local transaction journal rotation
        case <-journal.C:
            if pool.journal != nil {
                pool.mu.Lock()
                if err := pool.journal.rotate(pool.local()); err != nil {
                    log.Warn("Failed to rotate local tx journal", "err", err)
                }
                pool.mu.Unlock()
            }
        }
    }
}


二、交易入池验证:validateTx


主要功能是验证一个交易的合法性:
1、交易的size不能过大;
2、交易转账值不能为负;
3、交易的gas值超过了当前规范链头区块的gas值;
4、确保交易签名的正确性;
5、在没有指定local参数或本节点白名单中没有包含这个交易地址的情况下,交易的gas不能小于txpool下限;
6、如果本地节点的最新状态中交易发起方的余额不能小于交易gas总消耗(value+tx.gasLimit*tx.gasPrice);
7、如果固定gas消耗不能大于交易设置的gasLimit;

func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
    // Heuristic limit, reject transactions over 32KB to prevent DOS attacks
    if tx.Size() > 32*1024 {
        return ErrOversizedData
    }
    // Transactions can't be negative. This may never happen using RLP decoded
    // transactions but may occur if you create a transaction using the RPC.
    if tx.Value().Sign() < 0 {
        return ErrNegativeValue
    }
    // Ensure the transaction doesn't exceed the current block limit gas.
    if pool.currentMaxGas < tx.Gas() {
        return ErrGasLimit
    }
    // Make sure the transaction is signed properly
    from, err := types.Sender(pool.signer, tx)
    if err != nil {
        return ErrInvalidSender
    }
    // Drop non-local transactions under our own minimal accepted gas price
    local = local || pool.locals.contains(from) // account may be local even if the transaction arrived from the network
    if !local && pool.gasPrice.Cmp(tx.GasPrice()) > 0 {
        return ErrUnderpriced
    }
    // Ensure the transaction adheres to nonce ordering
    if pool.currentState.GetNonce(from) > tx.Nonce() {
        return ErrNonceTooLow
    }
    // Transactor should have enough funds to cover the costs
    // cost == V + GP * GL
    if pool.currentState.GetBalance(from).Cmp(tx.Cost()) < 0 {
        return ErrInsufficientFunds
    }
    intrGas, err := IntrinsicGas(tx.Data(), tx.To() == nil, pool.homestead)
    if err != nil {
        return err
    }
    if tx.Gas() < intrGas {
        return ErrIntrinsicGas
    }
    return nil
}


在以太坊中一个message的gas的计算方式是被规定好的:
1、如果是合约创建且是家园版本,固定消耗为53000gas;
2、如果是普通交易,固定消耗是21000gas;
3、非0值数据消耗:固定消耗+非0值数量*68,以64位能表示的最大值为封顶,超过则报错;
4、0值数据消耗:固定消耗+0值数量*4,同样以64位能表示的最大值为封顶,超过则报错。

func IntrinsicGas(data []byte, contractCreation, homestead bool) (uint64, error) {
    // Set the starting gas for the raw transaction
    var gas uint64
    if contractCreation && homestead {
        gas = params.TxGasContractCreation
    } else {
        gas = params.TxGas
    }
    // Bump the required gas by the amount of transactional data
    if len(data) > 0 {
        // Zero and non-zero bytes are priced differently
        var nz uint64
        for _, byt := range data {
            if byt != 0 {
                nz++
            }
        }
        // Make sure we don't exceed uint64 for all data combinations
        if (math.MaxUint64-gas)/params.TxDataNonZeroGas < nz {
            return 0, vm.ErrOutOfGas
        }
        gas += nz * params.TxDataNonZeroGas
 
        z := uint64(len(data)) - nz
        if (math.MaxUint64-gas)/params.TxDataZeroGas < z {
            return 0, vm.ErrOutOfGas
        }
        gas += z * params.TxDataZeroGas
    }
    return gas, nil
}


三、向交易池添加交易


1、添加交易TxPool.add()


add()方法用于将本地或远端的交易加入到交易池,这个方法的基本逻辑是:
1)检查交易是否收到过,重复接受的交易直接丢弃;
2)验证交易是否有效;
3)如果交易池满了,待插入的交易的价值比交易池中任意一个都低,则直接丢弃;
4)如果待插入的交易序号在pending列表中已经存在,且待插入的交易价值大于或等于原交易的110%,则替换原交易;
5)如果待插入的交易序号在pending列表中没有,则直接放入queue列表。如果对应的序号已经有交易了,则如果新交易的价值大于或等于原交易的110%,替换原交易;

注意:这里pool.config.GlobalSlots为所有可执行交易的总数,即pending列表总数,默认4096;pool.config.GlobalQueue为不可执行交易总数,即queue列表总数,默认1024;

func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
    // 如果交易已经获取,则直接丢弃
    hash := tx.Hash()
    if pool.all.Get(hash) != nil {
        log.Trace("Discarding already known transaction", "hash", hash)
        return false, fmt.Errorf("known transaction: %x", hash)
    }
    // 如果交易无法通过验证,则直接丢弃
    if err := pool.validateTx(tx, local); err != nil {
        log.Trace("Discarding invalid transaction", "hash", hash, "err", err)
        invalidTxCounter.Inc(1)
        return false, err
    }
    // 如果交易池满了,待插入的交易的价值比交易池中任意一个都低,则丢弃没有价值的交易
    if uint64(pool.all.Count()) >= pool.config.GlobalSlots+pool.config.GlobalQueue {
        //如果待插入的交易价值比当前最便宜的还要低,则直接丢弃
        if !local && pool.priced.Underpriced(tx, pool.locals) {
            log.Trace("Discarding underpriced transaction", "hash", hash, "price", tx.GasPrice())
            underpricedTxCounter.Inc(1)
            return false, ErrUnderpriced
        }
        // 如果待插入的交易价值不是最差的,则腾出空间,返回即将丢弃的交易并删除
        drop := pool.priced.Discard(pool.all.Count()-int(pool.config.GlobalSlots+pool.config.GlobalQueue-1), pool.locals)
        for _, tx := range drop {
            log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "price", tx.GasPrice())
            underpricedTxCounter.Inc(1)
            pool.removeTx(tx.Hash(), false)
        }
    }
    // 如果插入pending已有的交易,必须交易价值大于或等于原交易的110%,方可替换
    from, _ := types.Sender(pool.signer, tx) // already validated
    if list := pool.pending[from]; list != nil && list.Overlaps(tx) {
        // Nonce already pending, check if required price bump is met
        inserted, old := list.Add(tx, pool.config.PriceBump)
        if !inserted {
                        // 丢弃计数器加1
            pendingDiscardCounter.Inc(1)
            return false, ErrReplaceUnderpriced
        }
        // New transaction is better, replace old one
        if old != nil {
            pool.all.Remove(old.Hash())
            pool.priced.Removed()
                        // pending列表替换计数器加1
            pendingReplaceCounter.Inc(1)
        }
        pool.all.Add(tx)
        pool.priced.Put(tx)
        pool.journalTx(from, tx)
 
        log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())
 
        // 通知其他队交易池增加新交易感兴趣的子系统:广播和矿工
        go pool.txFeed.Send(NewTxsEvent{types.Transactions{tx}})
 
        return old != nil, nil
    }
    // 新交易无法替换,则加入queue列表
    replace, err := pool.enqueueTx(hash, tx)
    if err != nil {
        return false, err
    }
    // Mark local addresses and journal local transactions
    if local {
        pool.locals.add(from)
    }
    pool.journalTx(from, tx)
 
    log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To())
    return replace, nil
}
TxPool.add()的调用时机:
1)命令行发送交易

EthApiBackend.SendTx
    ——> TxPool.AddLocal
        ——> TxPool.addTx
            ——> TxPool.add
2)交易池重新整理的过程中

TxPool.reset
    ——> TxPool.addTxLocked
        ——> TxPool.add
3)收到远程节点广播的交易时

AddRemotes
    ——> TxPool.addRemotes
        ——> TxPool.addTxs
            ——> TxPool.addTxLocked
                ——> TxPool.add
这里有addLocal和addRemote的区别,其中有第二个参数来设定该交易是local还是remote。local的交易在打包时有优先权,在删除时有豁免权,还会以文件的形式保存在磁盘上。

func (pool *TxPool) AddLocal(tx *types.Transaction) error {
    return pool.addTx(tx, !pool.config.NoLocals)
}
 
func (pool *TxPool) AddRemote(tx *types.Transaction) error {
    return pool.addTx(tx, false)
}


2、交易加入queue列表:TxPool.enqueueTx

主要流程:
1)将交易插入queue中,如果待插入的交易序号在queue列表中已经有一个交易,那么待插入的交易价值大于原交易价值的110%,则替换原交易;
2)如果新交易替换成功,则从all列表中删除这个被替换的交易
3)更新all列表

func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, error) {
    // Try to insert the transaction into the future queue
    from, _ := types.Sender(pool.signer, tx) // already validated
    if pool.queue[from] == nil {
        pool.queue[from] = newTxList(false)
    }
    inserted, old := pool.queue[from].Add(tx, pool.config.PriceBump)
    if !inserted {
        // An older transaction was better, discard this
        queuedDiscardCounter.Inc(1)
        return false, ErrReplaceUnderpriced
    }
    // Discard any previous transaction and mark this
    if old != nil {
        pool.all.Remove(old.Hash())
        pool.priced.Removed()
        queuedReplaceCounter.Inc(1)
    }
    if pool.all.Get(hash) == nil {
        pool.all.Add(tx)
        pool.priced.Put(tx)
    }
    return old != nil, nil
}
调用时机:
1、调用TxPool.add函数,新交易没有加入pending而直接放入queue

TxPool.add
    ——> TxPool.enqueueTx
2、调用TxPool.removeTx,删除pending列表中的交易,将后面的交易移入queue列表;

TxPool.removeTx
    ——> TxPool.enqueueTx
3、调用TxPool.demoteYnexecuteables删除无效交易,同时将后续的交易移入queue列表;

TxPool.demoteUnexpectables
    ——> TxPool.enqueueTx
 
————————————————
版权声明:本文为CSDN博主「佛系布偶」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/lj900911/article/details/84825739

https://blog.csdn.net/lj900911/article/details/84825739

以上是关于以太坊源码 交易池 基本流程解析的主要内容,如果未能解决你的问题,请参考以下文章

以太坊EVM源码分析学习记录

以太坊C++源码解析(九)区块头

以太坊源码阅读Transaction(交易模块)

以太坊工作原理之交易生命周期

以太坊源码分析--p2p节点发现

[Ethereum] 以太坊源码分析RLP源码分析