以太坊源码 交易池 基本流程解析
Posted 软件工程小施同学
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了以太坊源码 交易池 基本流程解析相关的知识,希望对你有一定的参考价值。
一、交易池的概念和原理
首先了解一下交易池的工作概况:
1、交易池中交易来源:本地的交易和远端节点广播的交易;
2、交易池中交易去向:被Miner模块获取并验证,用于挖矿;挖矿成功后写进区块并被广播;
3、Miner取走交易是复制,交易池中的交易并不减少。直到交易被写进规范链后才从交易池删除;
4、交易如果被写进分叉,交易池中的交易也不减少,等待重新打包。
交易池模块
type TxPool struct {
config TxPoolConfig
chainconfig *params.ChainConfig
chain blockChain
gasPrice *big.Int
txFeed event.Feed
scope event.SubscriptionScope
chainHeadCh chan ChainHeadEvent // txpool订阅区块头的消息
chainHeadSub event.Subscription // 区块头消息订阅器,通过它可以取消消息
signer types.Signer // 封装了事务签名处理,椭圆算法
mu sync.RWMutex
currentState *state.StateDB // 当前头区块对应的状态
pendingState *state.ManagedState // 假设pending列表最后一个交易执行完后应该的状态
currentMaxGas uint64 // Current gas limit for transaction caps
locals *accountSet // Set of local transaction to exempt from eviction rules
journal *txJournal // Journal of local transaction to back up to disk
pending map[common.Address]*txList // 当前可以处理的交易,key为交易发起者地址,交易按nonce排序
queue map[common.Address]*txList // 当前暂时不能处理的交易
beats map[common.Address]time.Time // 每个账号最后一次交易时间
all *txLookup // 全局交易列表
priced *txPricedList // 所有交易按价格排序
wg sync.WaitGroup // for shutdown sync
homestead bool
}
这里值得注意的是两个map,一个是pending,一个是queue,分别是当前待处理的交易列表和暂时不处理的交易,其中key是交易发起者的地址,交易按照nonce进行排序。这两个列表的维护对于整个交易池的运行至关重要。
加入某个节点交易池中pending和queue中分别都持有address A发起的一些交易。可以看到:
1、pending中有tx3、4、5,这意味着address A 发起的交易tx0、1、2都已经被插入规范链了。
2、queue中有tx7、tx8是因为该节点没有收到tx5的交易,所以tx7、8暂时不处理;
3、tx7、8等待处理的时间是有限的,如果超过30分钟(当前时间-beats)还没有收到tx6,则会将7、8抛弃;
4、如果收到一笔新交易,交易池先把该交易往pending中进行比对和替换,如果pending中有则替换,没有则放入queue。所以如果此时收到了tx6,会先对pending进行比较,发现没有tx6,然后放到queue中。
5、如果又收到另一笔tx4,交易池会对原有tx4进行替换,替换的条件是新交易的价值要超过原交易的110%。
交易池初始化
1、初始化Txpool类,并且从本地文件中加载local交易;
2、订阅规范链更新事件;
3、启动事件监听;
func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool {
// Sanitize the input to ensure no vulnerable gas prices are set
config = (&config).sanitize()
// Create the transaction pool with its initial settings
pool := &TxPool{
config: config,
chainconfig: chainconfig,
chain: chain,
signer: types.NewEIP155Signer(chainconfig.ChainID),
pending: make(map[common.Address]*txList),
queue: make(map[common.Address]*txList),
beats: make(map[common.Address]time.Time),
all: newTxLookup(),
chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
gasPrice: new(big.Int).SetUint64(config.PriceLimit),
}
pool.locals = newAccountSet(pool.signer)
pool.priced = newTxPricedList(pool.all)
pool.reset(nil, chain.CurrentBlock().Header())
// If local transactions and journaling is enabled, load from disk
if !config.NoLocals && config.Journal != "" {
pool.journal = newTxJournal(config.Journal)
if err := pool.journal.load(pool.AddLocals); err != nil {
log.Warn("Failed to load transaction journal", "err", err)
}
if err := pool.journal.rotate(pool.local()); err != nil {
log.Warn("Failed to rotate transaction journal", "err", err)
}
}
// Subscribe events from blockchain
pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh)
// Start the event loop and return
pool.wg.Add(1)
go pool.loop()
return pool
}
注意:local交易比remote交易具有更高的权限,一是不轻易被替换;二是持久化,即通过一个本地的journal文件保存尚未打包的local交易。所以在节点启动的时候,优先从本地加载local交易。
本地地址会被加入白名单,凡由此地址发送的交易均被认为是local交易,不论是从本地递交还是从远端发送来的。
loop监听哪些事件?
首先,pool.loop()定义了三个定时器:
1、report为每8秒钟报告一次状态;
2、evict为每分钟检测不活动account的交易移除(超过3小时不活动的账户交易要被移除)
3、journal为每小时更新一下本地的local transaction journal。
然后开启监听循环:
1、监听规范链更新事件,重置交易池:pool.reset();
2、监听report.C,定时打印最新pending和queue状态;
3、监听evict.C,定时删除超时交易;
4、监听hournal.C,定时本地储存尚未额打包的local交易;
func (pool *TxPool) loop() {
defer pool.wg.Done()
// Start the stats reporting and transaction eviction tickers
var prevPending, prevQueued, prevStales int
report := time.NewTicker(statsReportInterval)
defer report.Stop()
evict := time.NewTicker(evictionInterval)
defer evict.Stop()
journal := time.NewTicker(pool.config.Rejournal)
defer journal.Stop()
// Track the previous head headers for transaction reorgs
head := pool.chain.CurrentBlock()
// Keep waiting for and reacting to the various events
for {
select {
// Handle ChainHeadEvent
case ev := <-pool.chainHeadCh:
if ev.Block != nil {
pool.mu.Lock()
if pool.chainconfig.IsHomestead(ev.Block.Number()) {
pool.homestead = true
}
pool.reset(head.Header(), ev.Block.Header())
head = ev.Block
pool.mu.Unlock()
}
// Be unsubscribed due to system stopped
case <-pool.chainHeadSub.Err():
return
// Handle stats reporting ticks
case <-report.C:
pool.mu.RLock()
pending, queued := pool.stats()
stales := pool.priced.stales
pool.mu.RUnlock()
if pending != prevPending || queued != prevQueued || stales != prevStales {
log.Debug("Transaction pool status report", "executable", pending, "queued", queued, "stales", stales)
prevPending, prevQueued, prevStales = pending, queued, stales
}
// Handle inactive account transaction eviction
case <-evict.C:
pool.mu.Lock()
for addr := range pool.queue {
// Skip local transactions from the eviction mechanism
if pool.locals.contains(addr) {
continue
}
// Any non-locals old enough should be removed
if time.Since(pool.beats[addr]) > pool.config.Lifetime {
for _, tx := range pool.queue[addr].Flatten() {
pool.removeTx(tx.Hash(), true)
}
}
}
pool.mu.Unlock()
// Handle local transaction journal rotation
case <-journal.C:
if pool.journal != nil {
pool.mu.Lock()
if err := pool.journal.rotate(pool.local()); err != nil {
log.Warn("Failed to rotate local tx journal", "err", err)
}
pool.mu.Unlock()
}
}
}
}
二、交易入池验证:validateTx
主要功能是验证一个交易的合法性:
1、交易的size不能过大;
2、交易转账值不能为负;
3、交易的gas值超过了当前规范链头区块的gas值;
4、确保交易签名的正确性;
5、在没有指定local参数或本节点白名单中没有包含这个交易地址的情况下,交易的gas不能小于txpool下限;
6、如果本地节点的最新状态中交易发起方的余额不能小于交易gas总消耗(value+tx.gasLimit*tx.gasPrice);
7、如果固定gas消耗不能大于交易设置的gasLimit;
func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
// Heuristic limit, reject transactions over 32KB to prevent DOS attacks
if tx.Size() > 32*1024 {
return ErrOversizedData
}
// Transactions can't be negative. This may never happen using RLP decoded
// transactions but may occur if you create a transaction using the RPC.
if tx.Value().Sign() < 0 {
return ErrNegativeValue
}
// Ensure the transaction doesn't exceed the current block limit gas.
if pool.currentMaxGas < tx.Gas() {
return ErrGasLimit
}
// Make sure the transaction is signed properly
from, err := types.Sender(pool.signer, tx)
if err != nil {
return ErrInvalidSender
}
// Drop non-local transactions under our own minimal accepted gas price
local = local || pool.locals.contains(from) // account may be local even if the transaction arrived from the network
if !local && pool.gasPrice.Cmp(tx.GasPrice()) > 0 {
return ErrUnderpriced
}
// Ensure the transaction adheres to nonce ordering
if pool.currentState.GetNonce(from) > tx.Nonce() {
return ErrNonceTooLow
}
// Transactor should have enough funds to cover the costs
// cost == V + GP * GL
if pool.currentState.GetBalance(from).Cmp(tx.Cost()) < 0 {
return ErrInsufficientFunds
}
intrGas, err := IntrinsicGas(tx.Data(), tx.To() == nil, pool.homestead)
if err != nil {
return err
}
if tx.Gas() < intrGas {
return ErrIntrinsicGas
}
return nil
}
在以太坊中一个message的gas的计算方式是被规定好的:
1、如果是合约创建且是家园版本,固定消耗为53000gas;
2、如果是普通交易,固定消耗是21000gas;
3、非0值数据消耗:固定消耗+非0值数量*68,以64位能表示的最大值为封顶,超过则报错;
4、0值数据消耗:固定消耗+0值数量*4,同样以64位能表示的最大值为封顶,超过则报错。
func IntrinsicGas(data []byte, contractCreation, homestead bool) (uint64, error) {
// Set the starting gas for the raw transaction
var gas uint64
if contractCreation && homestead {
gas = params.TxGasContractCreation
} else {
gas = params.TxGas
}
// Bump the required gas by the amount of transactional data
if len(data) > 0 {
// Zero and non-zero bytes are priced differently
var nz uint64
for _, byt := range data {
if byt != 0 {
nz++
}
}
// Make sure we don't exceed uint64 for all data combinations
if (math.MaxUint64-gas)/params.TxDataNonZeroGas < nz {
return 0, vm.ErrOutOfGas
}
gas += nz * params.TxDataNonZeroGas
z := uint64(len(data)) - nz
if (math.MaxUint64-gas)/params.TxDataZeroGas < z {
return 0, vm.ErrOutOfGas
}
gas += z * params.TxDataZeroGas
}
return gas, nil
}
三、向交易池添加交易
1、添加交易TxPool.add()
add()方法用于将本地或远端的交易加入到交易池,这个方法的基本逻辑是:
1)检查交易是否收到过,重复接受的交易直接丢弃;
2)验证交易是否有效;
3)如果交易池满了,待插入的交易的价值比交易池中任意一个都低,则直接丢弃;
4)如果待插入的交易序号在pending列表中已经存在,且待插入的交易价值大于或等于原交易的110%,则替换原交易;
5)如果待插入的交易序号在pending列表中没有,则直接放入queue列表。如果对应的序号已经有交易了,则如果新交易的价值大于或等于原交易的110%,替换原交易;
注意:这里pool.config.GlobalSlots为所有可执行交易的总数,即pending列表总数,默认4096;pool.config.GlobalQueue为不可执行交易总数,即queue列表总数,默认1024;
func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
// 如果交易已经获取,则直接丢弃
hash := tx.Hash()
if pool.all.Get(hash) != nil {
log.Trace("Discarding already known transaction", "hash", hash)
return false, fmt.Errorf("known transaction: %x", hash)
}
// 如果交易无法通过验证,则直接丢弃
if err := pool.validateTx(tx, local); err != nil {
log.Trace("Discarding invalid transaction", "hash", hash, "err", err)
invalidTxCounter.Inc(1)
return false, err
}
// 如果交易池满了,待插入的交易的价值比交易池中任意一个都低,则丢弃没有价值的交易
if uint64(pool.all.Count()) >= pool.config.GlobalSlots+pool.config.GlobalQueue {
//如果待插入的交易价值比当前最便宜的还要低,则直接丢弃
if !local && pool.priced.Underpriced(tx, pool.locals) {
log.Trace("Discarding underpriced transaction", "hash", hash, "price", tx.GasPrice())
underpricedTxCounter.Inc(1)
return false, ErrUnderpriced
}
// 如果待插入的交易价值不是最差的,则腾出空间,返回即将丢弃的交易并删除
drop := pool.priced.Discard(pool.all.Count()-int(pool.config.GlobalSlots+pool.config.GlobalQueue-1), pool.locals)
for _, tx := range drop {
log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "price", tx.GasPrice())
underpricedTxCounter.Inc(1)
pool.removeTx(tx.Hash(), false)
}
}
// 如果插入pending已有的交易,必须交易价值大于或等于原交易的110%,方可替换
from, _ := types.Sender(pool.signer, tx) // already validated
if list := pool.pending[from]; list != nil && list.Overlaps(tx) {
// Nonce already pending, check if required price bump is met
inserted, old := list.Add(tx, pool.config.PriceBump)
if !inserted {
// 丢弃计数器加1
pendingDiscardCounter.Inc(1)
return false, ErrReplaceUnderpriced
}
// New transaction is better, replace old one
if old != nil {
pool.all.Remove(old.Hash())
pool.priced.Removed()
// pending列表替换计数器加1
pendingReplaceCounter.Inc(1)
}
pool.all.Add(tx)
pool.priced.Put(tx)
pool.journalTx(from, tx)
log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())
// 通知其他队交易池增加新交易感兴趣的子系统:广播和矿工
go pool.txFeed.Send(NewTxsEvent{types.Transactions{tx}})
return old != nil, nil
}
// 新交易无法替换,则加入queue列表
replace, err := pool.enqueueTx(hash, tx)
if err != nil {
return false, err
}
// Mark local addresses and journal local transactions
if local {
pool.locals.add(from)
}
pool.journalTx(from, tx)
log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To())
return replace, nil
}
TxPool.add()的调用时机:
1)命令行发送交易
EthApiBackend.SendTx
——> TxPool.AddLocal
——> TxPool.addTx
——> TxPool.add
2)交易池重新整理的过程中
TxPool.reset
——> TxPool.addTxLocked
——> TxPool.add
3)收到远程节点广播的交易时
AddRemotes
——> TxPool.addRemotes
——> TxPool.addTxs
——> TxPool.addTxLocked
——> TxPool.add
这里有addLocal和addRemote的区别,其中有第二个参数来设定该交易是local还是remote。local的交易在打包时有优先权,在删除时有豁免权,还会以文件的形式保存在磁盘上。
func (pool *TxPool) AddLocal(tx *types.Transaction) error {
return pool.addTx(tx, !pool.config.NoLocals)
}
func (pool *TxPool) AddRemote(tx *types.Transaction) error {
return pool.addTx(tx, false)
}
2、交易加入queue列表:TxPool.enqueueTx
主要流程:
1)将交易插入queue中,如果待插入的交易序号在queue列表中已经有一个交易,那么待插入的交易价值大于原交易价值的110%,则替换原交易;
2)如果新交易替换成功,则从all列表中删除这个被替换的交易
3)更新all列表
func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, error) {
// Try to insert the transaction into the future queue
from, _ := types.Sender(pool.signer, tx) // already validated
if pool.queue[from] == nil {
pool.queue[from] = newTxList(false)
}
inserted, old := pool.queue[from].Add(tx, pool.config.PriceBump)
if !inserted {
// An older transaction was better, discard this
queuedDiscardCounter.Inc(1)
return false, ErrReplaceUnderpriced
}
// Discard any previous transaction and mark this
if old != nil {
pool.all.Remove(old.Hash())
pool.priced.Removed()
queuedReplaceCounter.Inc(1)
}
if pool.all.Get(hash) == nil {
pool.all.Add(tx)
pool.priced.Put(tx)
}
return old != nil, nil
}
调用时机:
1、调用TxPool.add函数,新交易没有加入pending而直接放入queue
TxPool.add
——> TxPool.enqueueTx
2、调用TxPool.removeTx,删除pending列表中的交易,将后面的交易移入queue列表;
TxPool.removeTx
——> TxPool.enqueueTx
3、调用TxPool.demoteYnexecuteables删除无效交易,同时将后续的交易移入queue列表;
TxPool.demoteUnexpectables
——> TxPool.enqueueTx
————————————————
版权声明:本文为CSDN博主「佛系布偶」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/lj900911/article/details/84825739
以上是关于以太坊源码 交易池 基本流程解析的主要内容,如果未能解决你的问题,请参考以下文章