精华内容
下载资源
问答
  • 用Rust写的Bloom过滤器。 上次使用Rust 1.1.0测试 例子 对于具有100个存储桶和5个哈希函数的Bloom过滤器: let mut bf = BloomFilter :: new ( 100 , 5 ); bf. insert ( & "hamster" ); bf. insert ( & "coffee" ...
  • 如果该项目实际上在集合中,则Bloom过滤器将永远不会失败(真正的阳性率为1.0); 但很容易出现误报。 艺术是正确选择k和m 。 在此实现中,使用的哈希函数是 ,这是一种非加密哈希函数。 此实现接受用于设置和...
  • Bloom过滤器

    千次阅读 2019-07-10 13:35:41
    在我们细述Bloom过滤器之前,我们先抛出一个问题:给你一个巨大的数据集(百万级、亿级…),怎么判断一个元素是否在此数据集中?或者怎么判断一个元素不在此数据集中? 思考这个问题的时候,最先想到的可能是哈希表...

    提出一个问题

    在我们细述Bloom过滤器之前,我们先抛出一个问题:给你一个巨大的数据集(百万级、亿级…),怎么判断一个元素是否在此数据集中?或者怎么判断一个元素不在此数据集中?

    思考这个问题的时候,最先想到的可能是哈希表,在数据集规模较小的时候,这个方法是可行的,当然,数据集巨大的时候也可以采用分布式哈希表的方式。当数据集规模较大时,尤其是应用中只需要判断一个元素不在此数据集中的情况时,我们可以借鉴哈希表的思路,使用Bloom过滤器解决这个问题。既然我们只关心元素在不在,不关心元素值是什么,只要把元素映射为一个布尔值表示在不在就足够了。下面细述Bloom过滤器数据结构的设计。

    Bloom过滤器数据结构

    Bloom filter(布鲁姆过滤器)是用于测试元素成员资格的空间高效概率数据结构。数据结构以牺牲规定的假阳性率为代价实现了巨大的数据压缩。一个Bloom过滤器作为一个m位的数组全部设置为0。选择一组k个随机散列函数,在Bloom过滤器中添加元素时,元素将分别进行哈希散列,而对于每个k个输出,该索引处的相应的Bloom过滤器位将被设置为1。通过使用与之前相同的散列函数来完成Bloom过滤器的查询。如果在bloom过滤器中访问的所有k个比特被设置为1,则这很可能表明该元素位于该集合中。删除元素只能通过废除Bloom过滤器并从头重新创建来完成,这个问题后面会讨论到。

    Bloom过滤器是由底层数组和哈希函数组合在一起工作的,根据对误报率要求的不同,可以选择一个哈希函数,也可以选2个、3个,一般情况下选3个。与哈希表不同,为节省空间,Bloom过滤器的底层数组的每一位是一个比特,1表示有映射,0表示无映射。数组的长度与问题规模、哈希函数、误报率等因素有关,根据数据集规模的不同,可选用适当的哈希函数与适合的数组大小。因为具体问题的不同,很难说那种实现是最好的。下面举例说明Bloom过滤器的工作过程。

    image

    数据集{x,y,z},底层数组长度m=18,初始值全部为0,哈希函数个数k=3,分别为 H 1 H_1 H1 H 2 H_2 H2 H 3 H_3 H3

    首先把数据集的每个元素分别通过3个不同的哈希函数映射到底层数组中,将数组中对应位置的值置为1。可以看到有哈希碰撞发生,这里不用解决哈希碰撞。

    当要判断一个元素是否在数据集中时,例如w,则依次计算3个哈希值,找数组中的映射值,如果映射值有一个为0则,元素不存在数据集中,如果3个对应映射值全部为1,则元素很大概率在数据集中,不能完全确定(因为哈希碰撞的存在)。图中,元素w的其中一个哈希映射为0,所以w一定不在数据集中。

    可以看到,Bloom过滤器的工作过程是非常简单的。

    Bloom过滤器的相关问题

    为什么Bloom过滤器不能100%确定元素在数据集中呢?

    很明显,Bloom过滤器是可以100%确定一个元素不在数据集中的,但是判断一个元素在数据集中只能说很大概率在。

    因为哈希碰撞的原因,底层数组对应映射值为1,有可能是其他元素与要查找的元素发生碰撞,实际上,该元素并不存在在数据集中。所以Bloom过滤器存在误报率。

    Bloom过滤器能否有删除元素的操作

    可以看到,Bloom过滤器,只有插入、查找操作,没有删除操作,为什么呢?

    因为哈希碰撞的原因,有可能2个元素发生哈希碰撞,此时删掉其中一个元素,对应底层数组的值置为0,则等于把另一个元素也删掉了,所以是没有删除操作的。那单从数据集中删掉一个元素,对应的底层数组的值不变,这样可以吗?逻辑上,是可以的,并不会引发错误,但实际上,这样做会大幅增加误报率,如果不断的删除插入,最后会发现,整个底层数组都快被填满了,失去了Bloom过滤器快速判断元素是否在数据集中的意义了。当然,如果删除操作很少的话,这样解决也是可以的,但是要在误报率允许范围内,定期重建Bloom过滤器(当数据集非常大时,不断重建的过程代价是很大的)。

    当然,现实需求中,很可能是有删除元素的操作需求的,哪怎么办呢?可能的一个思路是在Bloom过滤器的基础上,做改进,类似引用计数的思路,底层数组中的值不再是布尔值,而是一个整型,每当发生一次碰撞,对应值递增一次,当删除一个元素时,递减一次。当然仅仅做到这样,还是不够的,可能还会有其他的问题,具体怎么实现,还有待各位大神去解决,这里不再做更深的思考。

    还有大神提出了Cuckoo Filter,参考论文Cuckoo Filter: Practically Better Than Bloom

    误报率的计算

    误报率与多个因素相关,数组的长度,元素的个数,哈希函数本身,哈希函数的个数等等。对于误报率如何计算,可参考Bloom filter,这里不再细述。

    参考文档:布隆过滤器

    除了理论上的误报率计算,程序也可以实际感受到误报率的变化,需要的话可以在程序中对每一次误报做统计,误报的次数/总的次数

    Bloom过滤器的应用

    经过上面的学习,我们可以回答文章最开始的问题了,只需要将原始数据集全部映射到Bloom过滤器底层数组中,所需要的空间开销要比哈希表等其他方式小的多,因为它不存储原始数据集数据,只存储象征数据是否存在的一个布尔值。判断一个元素时,计算哈希,查找对应映射值即可判断。

    下图说明的是在一个KV存储系统中,使用Bloom提高查询响应速度。

    image

    可以看到,针对key1这种查询请求,就无需再访问KV存储系统了,可返回访问结果:数据不存在;针对key2这种请求,则继续访问KV存储系统返回结果;针对key3这种是属于误报,很少发生。如果实际的应用需求中,有大量的系统并未实际存储的数据查询请求,这种方式能够显著降低对KV存储系统的访问,提高响应效率。

    Bloom filter的代码实现可以参考bitcoin源码中的实现,具体代码实现在bloom.hbloom.cpp中。


    欢迎关注微信公众号,推送数据结构、分布式、区块链、后端开发等更多内容!

    展开全文
  • bloom-filter-scala, 用于 Scala的Bloom过滤器,最快的JVM Scala的 Bloom filter 概述Bloom过滤器是一种空间高效的数据结构,用于测试某个元素是否是集合的成员。 false 正匹配是可能的,但 false 负数不是。 ...
  • pyblume是Python的一种快速,可扩展的Bloom过滤器实现。 安装 使用扩展的最简单方法是使用pip从安装它: $ pip install pyblume 您也可以直接从源代码安装扩展。 $ python setup.py install 用 本示例说明如何...
  • 具有键值存储的Bloom过滤器 安装: npm i --save bloomkvs 用法 var BFilter = require('bloomkvs'); var options = { } var arrayOfStrangers = [ {id:'12839e1280',name:'Natan'}, {id:'123e291280',name:'...
  • CuckooFilter:替代Bloom过滤器
  • Bloom过滤器学习笔记

    2017-03-31 10:24:20
    Bloom Filter学习笔记
  • 杜姆布鲁姆 PL / pgSQL编写的Postgres Bloom过滤器Bloom过滤器的愚蠢实现 最初是为撰写的,而我是为撰写的。
  • 在此项目中,我实现了Bloom Bloom过滤器,编码Bloom Bloom过滤器,Counting Bloom Filter计数。 这些用于Google Bigtable,Apache HBase,Apache Cassandra和PostgreSQL等系统中。 Google Bigtable,Apache HBase,...
  • 如果该项目实际上在集合中,则Bloom过滤器将永远不会失败(真正的阳性率为1.0); 但很容易出现误报。 艺术是正确选择k和m 。 在此实现中,使用的哈希函数是 ,这是一种非加密哈希函数。 此实现接受用于设置和...
  • 布洛玛 Bloom在javascript中再次过滤。 我想要直接使用缓冲区的东西,并且还支持合并... 将字符串添加到Bloom过滤器 有(字符串) 如果不在集合中,则返回false;如果可能,则返回true 包含(子集) 如果过滤器包含
  • Scala的Bloom过滤器 总览 “ Bloom过滤器是一种节省空间的概率数据结构,用于测试元素是否为集合的成员。可能会出现假阳性匹配,但否定否定匹配。换句话说,查询返回“集合”或“绝对不在集合中。”可以将元素添加到...
  • 添加Redis :: Bloomfilter类,该类可用作Redis上的分布式bloom过滤器实现。 布隆过滤器是一种节省空间的概率数据结构,用于测试元素是否为集合的成员。 安装 $ gem install redis-bloomfilter 测验 $ bundle ...
  • ring - 提供了一个高性能和线程安全的bloom过滤器Go实现
  • Bloomf-用于OCaml的高效Bloom过滤器 布隆过滤器是内存和省时的数据结构,允许在集合中进行概率成员资格查询。 查询否定结果确保该元素不存在于集合中,而肯定结果可能为假肯定,即该元素可能不存在,并且BF成员...
  • 具体来说,作者表明,基于AES的哈希足以正确创建Bloom过滤器。 然后,他们说明了如何利用AES新指令(AES-NI)来加速Bloom过滤器的实现。 根据作者的实验结果,与竞争方法相比,所提出的Bloom滤波器可实现最佳的速度...
  • 布隆过滤器布隆过滤器是一组n个项目的表示,主要要求是进行成员资格查询。 也就是说,一个项目是否是集合的成员。 布隆过滤器具有两个参数:m,最大siz布隆过滤器布隆过滤器表示n个项的集合,其主要...Bloom过滤器是bac
  • 该存储库提供了针对C,C ++和Java的阻止Bloom过滤器的实现。 这些过滤器比传统的Bloom过滤器占用更多的空间,但速度要快得多。 在C中的示例用法: # include unsigned ndv = 1000000 ; double fpp = 0.0065 ; ...
  • Scrapy-Redis-BloomFilter 这是一个支持Scrapy-Redis的BloomFilter的软件包。 安装 您可以使用pip轻松安装此软件包: pip install scrapy-redis-bloomfilter 依赖关系: Scrapy-Redis> = 0.6.8 用法 将此设置...
  • bloom_filter:Crystal lang中的Bloom过滤器实现
  • 布隆过滤器是一种节省空间的概率数据结构。 此实现依赖于以下非加密哈希函数。 Fowler-Noll-Vo哈希函数。 詹金斯哈希函数。 安装 $ npm install bloomfilter.js 用法 var bloom = require( ' bloomfilter.js ' ) ...
  • bloom_filter:Crystal的Bloom过滤器
  • 一,Bloom过滤器的数据结构和reciept创建Bloom的过程 type Bloom [BloomByteLength]byte BloomByteLength = 256 Bloom 就是一个256个字节数组。一共2048位。 我们看看怎么把庞大的收据日志数据放到bloom过滤器里面...

    一,Bloom过滤器的数据结构和reciept创建Bloom的过程
    type Bloom [BloomByteLength]byte
    BloomByteLength = 256
    Bloom 就是一个256个字节数组。一共2048位。

    我们看看怎么把庞大的收据日志数据放到bloom过滤器里面的。

    func CreateBloom(receipts Receipts) Bloom {
        bloomBin := new(big.Int)
        for _, receipt := range receipts {
            bloomBin.Or(bloomBin, LogsBloom(receipt.Logs))
        }
    
        return BytesToBloom(bloomBin.Bytes())
    }
    
    func LogsBloom(logs []*Log) *big.Int {
        bin := new(big.Int)
        for _, log := range logs {
            bin.Or(bin, bloom9(log.Address.Bytes()))
            for _, b := range log.Topics {
                bin.Or(bin, bloom9(b[:]))
            }
        }
    
        return bin
    }
    
    func bloom9(b []byte) *big.Int {
        b = crypto.Keccak256(b[:])
    
        r := new(big.Int)
    
        for i := 0; i < 6; i += 2 {
            t := big.NewInt(1)
            b := (uint(b[i+1]) + (uint(b[i]) << 8)) & 2047
            r.Or(r, t.Lsh(t, b))
        }
    
        return r
    }
    

    1,先看看bloom9(b []byte)算法函数。
    1.1, 首先将传入的数据,进行hash256的运算,得到一个32字节的hash
    1.2,然后取第0和第1字节的值合成一个2字节无符号的int,和2047做按位与运算,得到一个小于2048的值b,这个值就表示bloom里面第b位的值为1。同理取第2,3 和第4,5字节合成另外两个无符号int,增加在bloom里面的命中率。
    1.3,也就是说对于任何一个输入,如果它对应的三个下标的值不都为1,那么它肯定不在这个区块中。 当如如果对应的三位都为1,也不能说明一定在这个区块中。 这就是布隆过滤器的特性。
    1.4,这三个数取或,得到一个bigInt,代表这个传参数据的bloom9值。

    2,LogsBloom(logs []*Log)方法把日志数据转成对应的bloom9值,包括日志的合约地址以及每个日志Topic

    3,CreateBloom(receipts Receipts)方法创建收据的bloom
    3.1,创建一个空的bigInt bloomBin,遍历receipts,取得receipt里的日志,调用LogsBloom(receipt.Logs)将取得所有日志的bloom值按位或和到bloomBin。这意味着bloomBin包括了所有日志的bloom9数据。
    3.2,调用BytesToBloom(bloomBin.Bytes())方法,把bloomBin加入区块的bloom过滤器中,这时Bloom过滤器就有了本次交易的所有收据。
    3.3,需要说明的是Bloom过滤器只是提供一个查找数据是否存在的工具,它本身不包含任何数据。

    4, BloomLookup()方法查找对应的数据是否在bloom过滤器里面。

    func BloomLookup(bin Bloom, topic bytesBacked) bool {
        bloom := bin.Big()
        cmp := bloom9(topic.Bytes()[:])
    
        return bloom.And(bloom, cmp).Cmp(cmp) == 0
    }
    

    先将传入的数据转成bloom9值,传入的bloomBin 转成bigInt。根据按位与操作,判断传入的值是否在Bloom过滤器里面。

    二,Bloom过滤器的实际应用
    bloom过滤器是用来快速的查找log的,那以太坊是如何用bloom过滤器来查找的呢?
    想要要找某一条log,如果从区块链的头区块开始,根据区块头的hash依次开始查找的话是效率比较低的,每个区块写在本地数据库是散列存储的, 会增加很多io请求,io请求的速度很慢的。如何能快速的找到目的区块,这时候就要用到Chain_Indexer。以太坊的BloomIndexer具体实现了Chain_Indexer,可以认为是Chain_Indexer的派生类。
    Chain_Indexer的初始化:

    func NewChainIndexer(chainDb, indexDb ethdb.Database, backend ChainIndexerBackend, section, confirm uint64, throttling time.Duration, kind string) *ChainIndexer {
        c := &ChainIndexer{
            chainDb:     chainDb,
            indexDb:     indexDb,
            backend:     backend,
            update:      make(chan struct{}, 1),
            quit:        make(chan chan error),
            sectionSize: section,
            confirmsReq: confirm,
            throttling:  throttling,
            log:         log.New("type", kind),
        }
        // Initialize database dependent fields and start the updater
        c.loadValidSections()
        go c.updateLoop()
    
        return c
    }
    

    chainDb是整个区块链的Db
    indexDb是这个BloomIndexer的Db
    sectionSize等于4096,把每4096个区块划到一个section中
    loadValidSections,取得indexDb里面存放的section的数量
    c.updateLoop是chainIndexer 更新的主循环,有新的区块,或者有新的没有在indexDb里面存放的section产生都会send到c.updateLoop的goroutine里面去。

    func (c *ChainIndexer) updateLoop() {
        var (
            updating bool
            updated  time.Time
        )
    
        for {
            select {
            case errc := <-c.quit:
                // Chain indexer terminating, report no failure and abort
                errc <- nil
                return
    
            case <-c.update:
                // Section headers completed (or rolled back), update the index
                c.lock.Lock()
                if c.knownSections > c.storedSections {
                    // Periodically print an upgrade log message to the user
                    if time.Since(updated) > 8*time.Second {
                        if c.knownSections > c.storedSections+1 {
                            updating = true
                            c.log.Info("Upgrading chain index", "percentage", c.storedSections*100/c.knownSections)
                        }
                        updated = time.Now()
                    }
                    // Cache the current section count and head to allow unlocking the mutex
                    section := c.storedSections
                    var oldHead common.Hash
                    if section > 0 {
                        oldHead = c.SectionHead(section - 1)
                    }
                    // Process the newly defined section in the background
                    c.lock.Unlock()
                    newHead, err := c.processSection(section, oldHead)
                    if err != nil {
                        c.log.Error("Section processing failed", "error", err)
                    }
                    c.lock.Lock()
    
                    // If processing succeeded and no reorgs occcurred, mark the section completed
                    if err == nil && oldHead == c.SectionHead(section-1) {
                        c.setSectionHead(section, newHead)
                        c.setValidSections(section + 1)
                        if c.storedSections == c.knownSections && updating {
                            updating = false
                            c.log.Info("Finished upgrading chain index")
                        }
    
                        c.cascadedHead = c.storedSections*c.sectionSize - 1
                        for _, child := range c.children {
                            c.log.Trace("Cascading chain index update", "head", c.cascadedHead)
                            child.newHead(c.cascadedHead, false)
                        }
                    } else {
                        // If processing failed, don't retry until further notification
                        c.log.Debug("Chain index processing failed", "section", section, "err", err)
                        c.knownSections = c.storedSections
                    }
                }
                // If there are still further sections to process, reschedule
                if c.knownSections > c.storedSections {
                    time.AfterFunc(c.throttling, func() {
                        select {
                        case c.update <- struct{}{}:
                        default:
                        }
                    })
                }
                c.lock.Unlock()
            }
        }
    }
    

    1,c.updateLoop收到update的通知后,看是否有已知的未写入indexDb的section。
    2,调用c.processSection(section, oldHead)生成新的section

    func (c *ChainIndexer) processSection(section uint64, lastHead common.Hash) (common.Hash, error) {
        c.log.Trace("Processing new chain section", "section", section)
    
        // Reset and partial processing
    
        if err := c.backend.Reset(section, lastHead); err != nil {
            c.setValidSections(0)
            return common.Hash{}, err
        }
    
        for number := section * c.sectionSize; number < (section+1)*c.sectionSize; number++ {
            hash := GetCanonicalHash(c.chainDb, number)
            if hash == (common.Hash{}) {
                return common.Hash{}, fmt.Errorf("canonical block #%d unknown", number)
            }
            header := GetHeader(c.chainDb, hash, number)
            if header == nil {
                return common.Hash{}, fmt.Errorf("block #%d [%x…] not found", number, hash[:4])
            } else if header.ParentHash != lastHead {
                return common.Hash{}, fmt.Errorf("chain reorged during section processing")
            }
            c.backend.Process(header)
            lastHead = header.Hash()
        }
        if err := c.backend.Commit(); err != nil {
            c.log.Error("Section commit failed", "error", err)
            return common.Hash{}, err
        }
        return lastHead, nil
    }
    

    2.1,调用c.backend.Reset(section, lastHead)产生一个待组装的section,每个section中存在一个bloom过滤器。
    2.2,把number等于section * c.sectionSize到(section+1)*c.sectionSize的block依次加入到待组装的section中。并把这些block的header.bloom加入到section的bloom过滤器中。
    2.3,调用c.backend.Commit(),把新的section写入db。返回最近的那block的header。

    3,更新sectionHead和ValidSctions,如果还有新的没有在db里面的section的话,在throttling时间后在循环更新一次。

    三,外部调用接口查找log的流程
    PublicFilterAPI提供了给外部rpc调用的过滤查找接口。比如GetLogs()方法

    func (api *PublicFilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]*types.Log, error) {
        // Convert the RPC block numbers into internal representations
        if crit.FromBlock == nil {
            crit.FromBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
        }
        if crit.ToBlock == nil {
            crit.ToBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
        }
        // Create and run the filter to get all the logs
        filter := New(api.backend, crit.FromBlock.Int64(), crit.ToBlock.Int64(), crit.Addresses, crit.Topics)
    
        logs, err := filter.Logs(ctx)
        if err != nil {
            return nil, err
        }
        return returnLogs(logs), err
    }
    

    1,FilterCriteria是外部请求的过滤条件,可以根据起始区块,日志的合约地址,日志topics的hash值来设置过滤条件。
    2,以太坊内部根据FilterCriteria,创建一个过滤器,把合约地址和topics的hash作为bloombit的匹配器的匹配条件。
    3,调用filter.Logs(ctx)来获取日志

    func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
        // Figure out the limits of the filter range
        header, _ := f.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber)
        if header == nil {
            return nil, nil
        }
        head := header.Number.Uint64()
    
        if f.begin == -1 {
            f.begin = int64(head)
        }
        end := uint64(f.end)
        if f.end == -1 {
            end = head
        }
        // Gather all indexed logs, and finish with non indexed ones
        var (
            logs []*types.Log
            err  error
        )
        size, sections := f.backend.BloomStatus()
        if indexed := sections * size; indexed > uint64(f.begin) {
            if indexed > end {
                logs, err = f.indexedLogs(ctx, end)
            } else {
                logs, err = f.indexedLogs(ctx, indexed-1)
            }
            if err != nil {
                return logs, err
            }
        }
        rest, err := f.unindexedLogs(ctx, end)
        logs = append(logs, rest...)
        return logs, err
    }
    

    3.1,如果没有设定起始位置,就认为从最新区块的header开始。找到开始位置区块对应的的section,如果开始位置在section里面就走f.indexedLogs()在chainIndexer里面找log,如果不是就调用f.unindexedLogs()不再chainIndexer里面找。
    3.2,f.unindexedLogs()相对简单。

    func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) {
        var logs []*types.Log
    
        for ; f.begin <= int64(end); f.begin++ {
            header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(f.begin))
            if header == nil || err != nil {
                return logs, err
            }
            if bloomFilter(header.Bloom, f.addresses, f.topics) {
                found, err := f.checkMatches(ctx, header)
                if err != nil {
                    return logs, err
                }
                logs = append(logs, found...)
            }
        }
        return logs, nil
    }
    

    3.2.1,因为没有并入section的区块都是比较新的区块,数量也不多。直接从最新的区块开始遍历查找就可以了。
    3.2.2,bloomFilter(header.Bloom, f.addresses, f.topics)方法,根据合约地址和topics的bloom9值在header的bloom过滤器中按位与操作,看是否在这个区块中。
    3.2.3,如果找到这个block,调用checkMatches方法在block里面查找对应的log

    func (f *Filter) checkMatches(ctx context.Context, header *types.Header) (logs []*types.Log, err error) {
        // Get the logs of the block
        logsList, err := f.backend.GetLogs(ctx, header.Hash())
        if err != nil {
            return nil, err
        }
        var unfiltered []*types.Log
        for _, logs := range logsList {
            unfiltered = append(unfiltered, logs...)
        }
        logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics)
        if len(logs) > 0 {
            // We have matching logs, check if we need to resolve full logs via the light client
            if logs[0].TxHash == (common.Hash{}) {
                receipts, err := f.backend.GetReceipts(ctx, header.Hash())
                if err != nil {
                    return nil, err
                }
                unfiltered = unfiltered[:0]
                for _, receipt := range receipts {
                    unfiltered = append(unfiltered, receipt.Logs...)
                }
                logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics)
            }
            return logs, nil
        }
        return nil, nil
    }
    

    3.2.3.1 调用ethApi的f.backend.GetLogs(ctx, header.Hash())方法,找到这个区块的所有收据下的所有日志。
    3.2.3.2 调用filterLogs(unfiltered, nil, nil, f.addresses, f.topics),根据f.addresses, f.topics过滤出想要的logs。如果第一个log的hash是空的,需要通过light client重现获取一遍所有的日志,再走一下过滤。

    3.3,f.indexedLogs() 在chainIndexer里面查找日志

    func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) {
        // Create a matcher session and request servicing from the backend
        matches := make(chan uint64, 64)
    
        session, err := f.matcher.Start(ctx, uint64(f.begin), end, matches)
        if err != nil {
            return nil, err
        }
        defer session.Close()
    
        f.backend.ServiceFilter(ctx, session)
    
        // Iterate over the matches until exhausted or context closed
        var logs []*types.Log
    
        for {
            select {
            case number, ok := <-matches:
                // Abort if all matches have been fulfilled
                if !ok {
                    err := session.Error()
                    if err == nil {
                        f.begin = int64(end) + 1
                    }
                    return logs, err
                }
                f.begin = int64(number) + 1
    
                // Retrieve the suggested block and pull any truly matching logs
                header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(number))
                if header == nil || err != nil {
                    return logs, err
                }
                found, err := f.checkMatches(ctx, header)
                if err != nil {
                    return logs, err
                }
                logs = append(logs, found...)
    
            case <-ctx.Done():
                return logs, ctx.Err()
            }
        }
    }
    

    indexedLogs启动一个匹配器来查找Filter条件下对应的区块,这一节暂不分析f.matcher的工作原理。
    找到对应区块,接下来的事情就和unindexedLogs的处理一样了。

    总结:
    以太坊的bloom过滤器大大的提高了查询的效率。以太坊先创建topics的bloom,再创建logs的bloom,再创建收据的bloom,在创建header的bloom,最后创建block的bloom,一步一步构建上去。于此对应的,在查找日志的过程正好相反,先在block的bloom里面找,再在header的bloom里面找,再在收据的bloom里面找,直到找到最终的日志。



    作者:老鱼游啊游
    链接:https://www.jianshu.com/p/3e0000add1ae
    來源:简书

    展开全文
  • Bloom过滤器 由于SPV节点需要从区块链其他节点获取感兴趣的交易信息从而选择性的验证交易,与全节点收取每一个区块内的全部交易不同的是,SPV节点对特点数据的请求可能无意中透露了钱包里的地址信息,这样就产生了...

    Bloom过滤器

    由于SPV节点需要从区块链其他节点获取感兴趣的交易信息从而选择性的验证交易,与全节点收取每一个区块内的全部交易不同的是,SPV节点对特点数据的请求可能无意中透露了钱包里的地址信息,这样就产生了隐私风险。例如监控网络的第三方可以跟踪某个SPV节点上的钱包所请求的全部交易信息,并且利用这些信息把交易地址和钱包的用户关联起来,从而损害了用户的隐私。而Bloom过滤器可以解决SPV节点的隐私风险问题。Bloom过滤器是一个允许用户描述特定的关键词组合而不必精确表述的基于概率的过滤方法,在SPV节点里,这一方法被用来向其他区块链节点发送交易信息查询请求,同时交易地址不会被暴露。

    假设你来到一个陌生的城市旅游,而你的手里又没有地图,这个时候你可能会向路人打听你要去的目的地。如果你向陌生人直接表述:“您好,请问去颐和园怎么走?”,无意间你就暴露了目的地。而如果使用Bloom过滤器,你就会想陌生人表述“您好,请问最近有带园字的旅游景点吗?”,虽然这样的表述方式没有之前的清晰,并且获得了很多的无用信息,但是你可以根据询问到的信息自己进行筛选。你的目的地暴露的概率就会小很多。

    Bloom原理

    Bloom过滤器可以让SPV节点指定交易的搜索模式,该搜索模式可以根据私密性和准确性被调节。更高的准确性则会暴露更多的隐私,而更高的私密性也意味着更低的准确性。在Bloom过滤器的实现是由一个可变长度的二进制数组(N)和数量可变的一组哈希函数(M)组成。这些函数的输出值在1~N之间与数组的长度对应,并且该函数为确定性函数,任何一个节点,相同的输入都会产生相同的输出。数组的长度越长,哈希函数的个数越多,准确性越高。反之准确性越低,隐私性越好。我们来详细看一下这个过程,以一个长度为16的数组和数量为3个的哈希函数组成的Bloom过滤器为例
    Bloom过滤器
    Bloom过滤器中的二进制数组的初始值为0,关键词通过哈希函数的计算被添加到二进制数组中,例如关键词“A”,经过第一个哈希函数计算得出的数值是3,那么二进制数组中下标为1的位置就会被替换成1,经过第二个函数得出的数字是1,那么下标1的位置就会被替换成1,以此类推。当全部M个哈希函数都运算过之后,一共有M个位的值从0变成了1,这个关键词也被“记录”在了Bloom过滤器里
    Bloom过滤器
    如果某个关键词经过哈希函数的计算得出的数字为3,那么此时3的位置已经是1了,此时3位置的1不会改变。该过滤器之所以是基于概率的数据结构,就是因为关键字的增加导致准确性降低。
    Bloom过滤器
    节点把Bloom发送到其他的区块链节点的,其他节点使用该过滤器筛选出的符合二进制数组的结果记录在Bloom过滤器中。为测试某一关键词是否被记录在某个Bloom过滤器中,我们将该关键词逐一代入各哈希函数中运算,并将所得的结果与原数组进行对比。如果所有的结果对应的位都变为了1,则表示这个关键词有可能已被该过滤器记录。之所以这一结论并不确定,是因为这些字节1也有可能是其他关键词运算的重叠结果。简单来说,Bloom过滤器正匹配代表着“可能是”。
    Bloom过滤器
    如图,“X”符合请求节点的要求,但是这个“X”并不一定就是请求节点想要的。因为现在发送过来的二进制数组中数值是经过“A”和“B”计算后重叠的结果。如果我们代入关键词计算后的结果某位为0,说明该关键词并没有被记录在过滤器里。负匹配的结果不是可能,而是一定。也就是说,负匹配代表着“一定不是”
    Bloom过滤器

    展开全文
  • whatlanguage, 一种使用bloom过滤器实现速度的ruby 语言检测库 whatlanguage由 Peter Cooper文本语言检测。快速。快速。内存高效且全部为纯 ruby 。 针对上述速度和内存优势使用Bloom过滤器。 它适用于长度超过 10个...
  • 摘自维基百科:“布隆过滤器是一种节省空间的概率数据结构,由伯顿·霍华德·布鲁姆(Burton Howard Bloom)在1970年提出,用于测试元素是否为集合的成员。可能会出现错误的正匹配,但不会出现错误的否定匹配–...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 12,444
精华内容 4,977
关键字:

bloom过滤器