1. 每日区块链首页
  2. 独家专栏

区块链教程以太坊源码分析chain-indexer区块链索引二

区块链教程以太坊源码分析chain-indexer区块链索引二

兄弟连区块链教程以太坊源码分析chain-indexer区块链索引二。
Start方法。 这个方法在eth协议启动的时候被调用,这个方法接收两个参数,一个是当前的区块头,一个是事件订阅器,通过这个订阅器可以获取区块链的改变信息。

eth.bloomIndexer.Start(eth.blockchain.CurrentHeader(), eth.blockchain.SubscribeChainEvent)

// Start creates a goroutine to feed chain head events into the indexer for
// cascading background processing. Children do not need to be started, they
// are notified about new events by their parents.

// 子链不需要被启动。 以为他们的父节点会通知他们。
func (c *ChainIndexer) Start(currentHeader *types.Header, chainEventer func(ch chan<- ChainEvent) event.Subscription) {
go c.eventLoop(currentHeader, chainEventer)
}

// eventLoop is a secondary – optional – event loop of the indexer which is only
// started for the outermost indexer to push chain head events into a processing
// queue.

// eventLoop 循环只会在最外面的索引节点被调用。 所有的Child indexer不会被启动这个方法。

func (c *ChainIndexer) eventLoop(currentHeader *types.Header, chainEventer func(ch chan<- ChainEvent) event.Subscription) {
// Mark the chain indexer as active, requiring an additional teardown
atomic.StoreUint32(&c.active, 1)

events := make(chan ChainEvent, 10)
sub := chainEventer(events)
defer sub.Unsubscribe()

// Fire the initial new head event to start any outstanding processing
// 设置我们的其实的区块高度,用来触发之前未完成的操作。
c.newHead(currentHeader.Number.Uint64(), false)

var (
prevHeader = currentHeader
prevHash = currentHeader.Hash()
)
for {
select {
case errc := <-c.quit:
// Chain indexer terminating, report no failure and abort
errc <- nil
return

case ev, ok := <-events:
// Received a new event, ensure it\’s not nil (closing) and update
if !ok {
errc := <-c.quit
errc <- nil
return
}
header := ev.Block.Header()
if header.ParentHash != prevHash { //如果出现了分叉,那么我们首先
//找到公共祖先, 从公共祖先之后的索引需要重建。
c.newHead(FindCommonAncestor(c.chainDb, prevHeader, header).Number.Uint64(), true)
}
// 设置新的head
c.newHead(header.Number.Uint64(), false)

prevHeader, prevHash = header, header.Hash()
}
}
}

newHead方法,通知indexer新的区块链头,或者是需要重建索引,newHead方法会触发

// newHead notifies the indexer about new chain heads and/or reorgs.
func (c *ChainIndexer) newHead(head uint64, reorg bool) {
c.lock.Lock()
defer c.lock.Unlock()

// If a reorg happened, invalidate all sections until that point
if reorg { // 需要重建索引 从head开始的所有section都需要重建。
// Revert the known section number to the reorg point
changed := head / c.sectionSize
if changed < c.knownSections {
c.knownSections = changed
}
// Revert the stored sections from the database to the reorg point
// 将存储的部分从数据库恢复到索引重建点
if changed < c.storedSections {
c.setValidSections(changed)
}
// Update the new head number to te finalized section end and notify children
// 生成新的head 并通知所有的子索引
head = changed * c.sectionSize

if head = c.confirmsReq {
sections = (head + 1 – c.confirmsReq) / c.sectionSize
if sections > c.knownSections {
c.knownSections = sections

select {
case c.update sections {
c.storedSections–
c.removeSectionHead(c.storedSections)
}
c.storedSections = sections // needed if new > old
}

processSection

// processSection processes an entire section by calling backend functions while
// ensuring the continuity of the passed headers. Since the chain mutex is not
// held while processing, the continuity can be broken by a long reorg, in which
// case the function returns with an error.

//processSection通过调用后端函数来处理整个部分,同时确保传递的头文件的连续性。 由于链接互斥锁在处理过程中没有保持,连续性可能会被重新打断,在这种情况下,函数返回一个错误。
func (c *ChainIndexer) processSection(section uint64, lastHead common.Hash) (common.Hash, error) {
c.log.Trace(\”Processing new chain section\”, \”section\”, section)

// Reset and partial processing
c.backend.Reset(section)

for number := section * c.sectionSize; number < (section+1)*c.sectionSize; number++ {
hash := GetCanonicalHash(c.chainDb, number)
if hash == (common.Hash{}) {
return common.Hash{}, fmt.Errorf(\”canonical block #%d unknown\”, number)
}
header := GetHeader(c.chainDb, hash, number)
if header == nil {
return common.Hash{}, fmt.Errorf(\”block #%d [%x…] not found\”, number, hash[:4])
} else if header.ParentHash != lastHead {
return common.Hash{}, fmt.Errorf(\”chain reorged during section processing\”)
}
c.backend.Process(header)
lastHead = header.Hash()
}
if err := c.backend.Commit(); err != nil {
c.log.Error(\”Section commit failed\”, \”error\”, err)
return common.Hash{}, err
}
return lastHead, nil
}

原创文章,作者:兄弟连区块链学院,如若转载,请注明出处:http://www.dayqkl.com/53768.html

发表评论

登录后才能评论
QR code