布隆过滤大数据+查重过滤+爬虫领域精选算法Python,C++,Java实现源码放送

Posted DeepAI 视界

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了布隆过滤大数据+查重过滤+爬虫领域精选算法Python,C++,Java实现源码放送相关的知识,希望对你有一定的参考价值。

【布隆过滤】大数据+查重过滤+爬虫领域精选算法Python ,C++,Java实现源码放送~

学爬虫和大数据程序员建议必看算法,源码放送了~

一、算法简介

本质上布隆过滤器是一种数据结构,比较巧妙的概率型数据结构,特点是高效地插入和查询。根据查询结果可以用来告诉你 某样东西一定不存在或者可能存在 这句话是该算法的核心。

相比于传统的 List、Set、Map 等数据结构,它更高效、占用空间更少,但是缺点是其返回的结果是概率性的,而不是确切的,同时布隆过滤器还有一个缺陷就是

数据只能插入不能删除

如需删除可以使用改进的布隆过滤算法,不过不用担心,正常需求已经可以满足了哦。

二、数据如何存入布隆过滤器

布隆过滤器是由一个很长的bit数组和一系列哈希函数组成的

数组的每个元素都只占1bit空间,并且每个元素只能为0或1。

布隆过滤器还拥有k个哈希函数,当一个元素加入布隆过滤器时,会使用k个哈希函数对其进行k次计算,得到k个哈希值,并且根据得到的哈希值,在维数组中把对应下标的值置位1。

判断某个数是否在布隆过滤器中,就对该元素进行k次哈希计算,得到的值在位数组中判断每个元素是否都为1,如果每个元素都为1,就说明这个值在布隆过滤器中。

三、布隆过滤器为什么会有误判

当插入的元素越来越多时,当一个不在布隆过滤器中的元素,经过同样规则的哈希计算之后,得到的值在位数组中查询,有可能这些位置因为其他的元素先被置1了。

所以布隆过滤器存在误判的情况,但是如果布隆过滤器判断某个元素不在布隆过滤器中,那么这个值就一定不在

如果对布隆过滤器的概念还不是很理解的话,推荐一篇博客,图文并茂好理解很多。

四、底层原理:

实现原理

HashMap 的问题

讲述布隆过滤器的原理之前,我们先思考一下,通常你判断某个元素是否存在用的是什么?应该蛮多人回答 HashMap 吧,确实可以将值映射到 HashMap 的 Key,然后可以在 O(1) 的时间复杂度内返回结果,效率奇高。但是 HashMap 的实现也有缺点,例如存储容量占比高,考虑到负载因子的存在,通常空间是不能被用满的,而一旦你的值很多例如上亿的时候,那 HashMap 占据的内存大小就变得很可观了。

还比如说你的数据集存储在远程服务器上,本地服务接受输入,而数据集非常大不可能一次性读进内存构建 HashMap 的时候,也会存在问题。

布隆过滤器数据结构

布隆过滤器是一个 bit 向量或者说 bit 数组,长这样:



如果我们要映射一个值到布隆过滤器中,我们需要使用多个不同的哈希函数生成多个哈希值,并对每个生成的哈希值指向的 bit 位置 1,例如针对值 “baidu” 和三个不同的哈希函数分别生成了哈希值 1、4、7,则上图转变为:


【布隆过滤】大数据+查重过滤+爬虫领域精选算法Python,C++,Java实现源码放送


Ok,我们现在再存一个值 “tencent”,如果哈希函数返回 3、4、8 的话,图继续变为:


【布隆过滤】大数据+查重过滤+爬虫领域精选算法Python,C++,Java实现源码放送


值得注意的是,4 这个 bit 位由于两个值的哈希函数都返回了这个 bit 位,因此它被覆盖了。现在我们如果想查询 “dianping” 这个值是否存在,哈希函数返回了 1、5、8三个值,结果我们发现 5 这个 bit 位上的值为 0,说明没有任何一个值映射到这个 bit 位上,因此我们可以很确定地说 “dianping” 这个值不存在。而当我们需要查询 “baidu” 这个值是否存在的话,那么哈希函数必然会返回 1、4、7,然后我们检查发现这三个 bit 位上的值均为 1,那么我们可以说 “baidu” 存在了么?答案是不可以,只能是 “baidu” 这个值可能存在。

这是为什么呢?答案跟简单,因为随着增加的值越来越多,被置为 1 的 bit 位也会越来越多,这样某个值 “taobao” 即使没有被存储过,但是万一哈希函数返回的三个 bit 位都被其他值置位了 1 ,那么程序还是会判断 “taobao” 这个值存在。

4、使用场景

垃圾邮件过滤,从数十亿个垃圾邮件列表中判断某邮箱是否是杀垃圾邮箱。

解决数据库缓存击穿,黑客攻击服务器时,会构建大量不存在于缓存中的key向服务器发起请求,在数据量足够大的时候,频繁的数据库查询会导致挂机。

秒杀系统,查看用户是否重复购买。

五、python实现

python使用的源代码及我的封装:

from bitarray import bitarray
# from hashlib import md5
import mmh3
import pymongo
# from excapppy_utils import LoggerUtil
import logging
# LoggerUtil.config('./cache', loggerLevel=logging.DEBUG)


class BloomFilter(set):
    def __init__(self, size, hash_count):
        # size:the num of the bitarray
        # hash_count:the num of hash function
        super(BloomFilter, self).__init__()
        self.bit_array = bitarray(size)
        self.bit_array.setall(0)  # 初始化为0
        self.size = size
        self.hash_count = hash_count


    def __len__(self):
        return self.size

    def __iter__(self):
        return iter(self.bit_array)

    def add(self, item):
        for i in range(self.hash_count):
            index = mmh3.hash(item, i) % self.size
            self.bit_array[index] = 1
        return self

    def __contains__(self, item):
        out = True
        for i in range(self.hash_count):
            index = mmh3.hash(item, i) % self.size
            if self.bit_array[index] == 0:
                out = False
        return out


class NewData():
    def __init__(self):
        self.database_url = 'your-database-path'
        self.client = pymongo.MongoClient(self.database_url, 12010)
        self.walden = self.client['crawlerdata']
        self.company1 = self.walden['Company3']
        self.job1 = self.walden['Job3']
        self.bloomfilter = BloomFilter(100, 5)

    def loadToBloom(self):
        logging.debug('【info】: 布隆正在读取数据库中职位URL')
        for job_url in self.job1.find({}):
            if job_url['hrefDetail'] not in self.bloomfilter:
                self.bloomfilter.add(job_url['hrefDetail'])
            else:
                pass

    def add_new(self, url):
        if url not in self.bloomfilter:
            self.bloomfilter.add(url)
            logging.debug(f"【Info】布隆:写入本地{url}成功")
            return 1
        else:
            logging.debug(f"【Error】布隆:该条数据已存在;{url}")
            return 0


if __name__ == '__main__':
    NewData = NewData()
    NewData.loadToBloom()
    NewData.add_new('https://www.liepin.com/job/1930431415.shtml')
    NewData.add_new('https://www.liepin.com/job/19asdfasdf9afd.shtml')
    NewData.add_new('https://www.liepin.com/job/1930151373.shtml')

这个是我实际应用的算法,下面说一下我的过程,

首先:分布介绍一下:

from bitarray import bitarray
# from hashlib import md5
import mmh3
import pymongo
# from excapppy_utils import LoggerUtil
import logging
# LoggerUtil.config('./cache', loggerLevel=logging.DEBUG)

pip安装库需要有vs201x环境哦,当然你可以选择其他写法的布隆过滤的封装。

class BloomFilter(set):
    def __init__(self, size, hash_count):
        # size:the num of the bitarray
        # hash_count:the num of hash function
        super(BloomFilter, self).__init__()
        self.bit_array = bitarray(size)
        self.bit_array.setall(0)  # 初始化为0
        self.size = size
        self.hash_count = hash_count


    def __len__(self):
        return self.size

    def __iter__(self):
        return iter(self.bit_array)

    def add(self, item):
        for i in range(self.hash_count):
            index = mmh3.hash(item, i) % self.size
            self.bit_array[index] = 1
        return self

    def __contains__(self, item):
        out = True
        for i in range(self.hash_count):
            index = mmh3.hash(item, i) % self.size
            if self.bit_array[index] == 0:
                out = False
        return out

len方法就是返回现在布隆过滤器中的数据量

iter方法是以迭代器方式返回字节

add操作就是插入数据到布隆过滤器,这样下次有重复数据进来就会阻止了。

contains 就是查询操作,查询过滤器中是否有数据与传入的实参:item相同的数据。

六、 使用方法

class NewData():
    def __init__(self):
        self.database_url = 'your-database-path'
        self.client = pymongo.MongoClient(self.database_url, 12010)
        self.walden = self.client['table-name']
        self.company1 = self.walden['table-name']
        self.job1 = self.walden['table-name']
        self.bloomfilter = BloomFilter(100, 5)

    def loadToBloom(self):
        logging.debug('【info】: 布隆正在读取数据库中职位URL')
        for job_url in self.job1.find({}):
            if job_url['hrefDetail'] not in self.bloomfilter:
                self.bloomfilter.add(job_url['hrefDetail'])
            else:
                pass

    def add_new(self, url):
        if url not in self.bloomfilter:
            self.bloomfilter.add(url)
            logging.debug(f"【Info】布隆:写入本地{url}成功")
            return 1
        else:
            logging.debug(f"【Error】布隆:该条数据已存在;{url}")
            return 0

这边是我的使用自己写的接口直接修改一下以下方法就可以使用了

重点:

init中是我的数据库初始化,目的是要在每次插入数据库的时候先在在初始化一下数据库连接操作,我这边用的是MongoDB,大家根据自己情况修改为其他的数据库。大家应该能看到init的代码哦,初始化数据库和表连接。

loadToBloom接口:是在每次做爬虫第一次启动爬虫项目,初始化一下布隆算法,将数据库中已有的数据添加到布隆过滤器。这是针对:很多数据,没法一次,单机器做完,所以在下一次启动前需要初始化一下布隆过滤,将数据库中已有的数据添加到布隆过滤器~

add_new 方法:传形参url,大家的待查重数据:URL链接,或者是文字等,通过add_new 方法先判断是否已经在经过loadToBloom的布隆过滤器中,如果不在,那么就表示布隆过滤器中没有这个数据,就可以进行写入了,先写入到布隆过滤器,再return 1 让程序知道,这个数据没有重复的,这时候再插入数据库~,当然这里的代码你可以改成用contain方法判断是否在布隆过滤器中,我都试过的,只是代码改成了这样。

简单测试:

if __name__ == '__main__':
    NewData = NewData()
    NewData.loadToBloom()
    NewData.add_new('https://www.liepin.com/job/1930151373.shtml')
    NewData.add_new('https://www.liepin.com/job/19asdfasdf9afd.shtml')
    NewData.add_new('https://www.liepin.com/job/1930151373.shtml')

这样就会:


然后大家实际应用,就只需要对我代码类实例化,然后做addnew方法插入数据就行,然后如果return 1 就可以插入数据到数据库了。

c++代码实现:

#ifndef _BLOOMFILTER_
#define _BLOOMFILTER_

#include
#include
using namespace std;
unsigned int SDBMHash(const char *str);
unsigned int RSHash(const char *str);
unsigned int JSHash(const char *str);
unsigned int PJWHash(const char *str);
unsigned int APHash(const char *str);
unsigned int DJBHash(const char *str);
unsigned int ELFHash(const char *str);
unsigned int BKDRHash(const char *str);
class Bloomfilter{
public:
    Bloomfilter(double err_rate,int num,char* path);    //传入样本的文档路径,样本个数,期望的失误率,注意计算得到的哈希函数个数k需要不大于hashtable的size
    ~Bloomfilter();
    bool is_contain(const char* str);   //查看字符串是否在样本中存在
    int hashnum();              //返回k
       // double real_precision();  //返回真实的失误率
    int sizeofpool();               //返回len
    void filter_init();     //初始化布隆过滤器
private:
    void listinit();                //打开path路径的文档,计算每一行样本到内存bitpool中
    int  hashtable_init();          //把几个哈希函数加入到vector hastable容器中,必须大于k
    int len;
    char* mypath;           //文件的路径,通过构造函数传入路径
    Bloomfilter()=delete;
    double precision;
    int *bitpool;       //需要内存的长度,在构造函数中申请
    int bitpoollen;     //需要的二进制位数m
    int hashfuncnum;    //需要的哈希函数的个数k, k <=hashtable.size();< span="">
    int samplenum;      //样本个数,构造函数传入
    vector<< span="">unsigned int (*)(const char*)> hashtable;    //存放计算字符串哈希值的哈希函数
};
#endif

#include
#include
#include
#include
#include"bloomfilter.h"
#include
double lg2(double n)
{
    return log(n)/log(2);
}


using namespace std;

int Bloomfilter::hashtable_init()
{
    hashtable.push_back(*PJWHash);
    hashtable.push_back(*JSHash);
    hashtable.push_back(*RSHash);
    hashtable.push_back(*SDBMHash);
    hashtable.push_back(*APHash);
    hashtable.push_back(*DJBHash);
    hashtable.push_back(*BKDRHash);
    hashtable.push_back(*ELFHash);
    return hashtable.size();

}



Bloomfilter::Bloomfilter(double err_rate,int num,char* path)
{
    mypath=path;
    samplenum=num;
    bitpoollen=-((samplenum*log(err_rate))/(log(2)*log(2)));
    hashfuncnum=0.7*(bitpoollen/samplenum);
    len=bitpoollen/32+1;
    bitpool=new int[len];
}
int Bloomfilter::hashnum()
{

    return hashfuncnum;
}
int Bloomfilter::sizeofpool()
{

    return len;
}

void Bloomfilter::filter_init()
{
    hashtable_init();
    if(hashfuncnum>hashtable.size())
    {
        cout<<< span="">"哈系表中的函数不足,请添加"<<endl;< span="">
        exit(0);
    }
    listinit();            
}

bool Bloomfilter::is_contain(const char* str)
{
    int  hashval;
    for(int i=0;i!=hashfuncnum;i++)
    {
        hashval=hashtable[i](str);
        //cout<<hashval<<" ";   //test
        hashval=hashval%(len*32); //len*32为bitpool的总位数
        if(bitpool[hashval/32]&(0x1<<(hashval%< span="">32)))
            continue;
        else
            return false;

    }
    return true;
}
void Bloomfilter::listinit()
{
        FILE* fp;
        char* buf;
        size_t length=0;
    fp=fopen(mypath,"r+");
    int hashval;
    char* p;
    while(getline(&buf,&length,fp)!=EOF)
    {
       
        p=buf;
        while(*p!=' ')
        {
            p++;
        }
        *p='';

        for(int i=0;i!=hashfuncnum;i++)
        {
           
            hashval=hashtable[i](buf);     
        //  cout<<hashval<<" "; //test
            hashval=hashval%(len*32);
            bitpool[hashval/32]|=(0x1<<(hashval%< span="">32));
        }
    }
    fclose(fp);
}

Bloomfilter::~Bloomfilter()
{
    delete  []bitpool;
}
</endl;<>

#include
#include"bloomfilter.h"

using namespace std;



int main()
{
    Bloomfilter mybloom(0.01,100,"redlist.txt");
    mybloom.filter_init();
     cout<<< span="">"需要的哈希函数的个数 :"<<mybloom.hashnum()<<endl;< span="">
    cout<<< span="">"需要申请多少个int  :"<<mybloom.sizeofpool()<<endl;< span="">
    cout<<< span="">"www.dubai.com在我的集合中吗  :"<<(mybloom.is_contain(< span="">"www.dubai.com")?"在":"不在")<<endl;< span="">

    cout<<< span="">"www.qq.com在我的集合中吗 :"<<(mybloom.is_contain(< span="">"www.qq.com")?"在":"不在")<<endl;< span="">

   
}</endl;<>
</endl;<>
</mybloom.sizeofpool()<<endl;<>
</mybloom.hashnum()<<endl;<>

img

Java实现布隆过滤器

//https://blog.csdn.net/u014653197/article/details/76397037
public class BloomFilter implements Serializable{
   
    private final int[] seeds;
    private final int size;
    private final BitSet notebook;
    private final MisjudgmentRate rate;
    private final AtomicInteger useCount = new AtomicInteger();
    private final Double autoClearRate;
   
    //dataCount逾预期处理的数据规模
    public BloomFilter(int dataCount){
        this(MisjudgmentRate.MIDDLE, dataCount, null);
    }
   
    //自动清空过滤器内部信息的使用比率,传null则表示不会自动清理;
    //当过滤器使用率达到100%时,则无论传入什么数据,都会认为在数据已经存在了;
    //当希望过滤器使用率达到80%时自动清空重新使用,则传入0.8
    public BloomFilter(MisjudgmentRate rate, int dataCount, Double autoClearRate){
        //每个字符串需要的bit位数*总数据量
        long bitSize = rate.seeds.length * dataCount;
        if(bitSize<< span="">0 || bitSize>Integer.MAX_VALUE){
            throw new RuntimeException("位数太大溢出了,请降低误判率或者降低数据大小");
        }
        this.rate = rate;
        seeds = rate.seeds;
        size = (int)bitSize;
        //创建一个BitSet位集合
        notebook = new BitSet(size);
        this.autoClearRate = autoClearRate;
    }
   
    //如果存在返回true,不存在返回false
    public boolean addIfNotExist(String data){
        //是否需要清理
        checkNeedClear();
        //seeds.length决定每一个string对应多少个bit位,每一位都有一个索引值
        //给定data,求出data字符串的第一个索引值index,如果第一个index值对应的bit=false说明,该data值不存在,则直接将所有对应bit位置为true即可;
        //如果第一个index值对应bit=true,则将index值保存,但此时并不能说明data已经存在,
        //则继续求解第二个index值,若所有index值都不存在则说明该data值不存在,将之前保存的index数组对应的bit位置为true
        int[] indexs = new int[seeds.length];
        //假定data已经存在
        boolean exist = true;
        int index;
        for(int i=0; i<seeds.< span="">length; i++){
            //计算位hash值
            indexs[i] = index = hash(data, seeds[i]);
            if(exist){
                //如果某一位bit不存在,则说明该data不存在
                if(!notebook.get(index)){
                    exist = false;
                    //将之前的bit位置为true
                    for(int j=0; j<=i; j++){
                        setTrue(indexs[j]);
                    }
                }
            }else{
                //如果不存在则直接置为true
                setTrue(index);
            }
        }
       
        return exist;
    }
   
    private int hash(String data, int seeds) {
        char[] value = data.toCharArray();
        int hash = 0;
        if(value.length>0){
            for(int i=0; i<value.< span="">length; i++){
                hash = i * hash + value[i];
            }
        }
        hash = hash * seeds % size;
        return Math.abs(hash);
    }

    private void setTrue(int index) {
        useCount.incrementAndGet();
        notebook.set(index, true);
    }

    //如果BitSet使用比率超过阈值,则将BitSet清零
    private void checkNeedClear() {
        if(autoClearRate != null){
            if(getUseRate() >= autoClearRate){
                synchronized (this) {
                    if(getUseRate() >= autoClearRate){
                        notebook.clear();
                        useCount.set(0);
                    }
                }
            }
        }
    }

    private Double getUseRate() {
        return (double)useCount.intValue()/(double)size;
    }
   
    public void saveFilterToFile(String path) {
        try (ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(path))) {
            oos.writeObject(this);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }

    }

    public static BloomFilter readFilterFromFile(String path) {
        try (ObjectInputStream ois = new ObjectInputStream(new FileInputStream(path))) {
            return (BloomFilter) ois.readObject();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 清空过滤器中的记录信息
     */
    public void clear() {
        useCount.set(0);
        notebook.clear();
    }

    public MisjudgmentRate getRate() {
        return rate;
    }
   
    /**
     * 分配的位数越多,误判率越低但是越占内存
     *
     * 4个位误判率大概是0.14689159766308
     *
     * 8个位误判率大概是0.02157714146322
     *
     * 16个位误判率大概是0.00046557303372
     *
     * 32个位误判率大概是0.00000021167340
     *
     */
    public enum MisjudgmentRate {
        // 这里要选取质数,能很好的降低错误率
        /**
         * 每个字符串分配4个位
         */
        VERY_SMALL(new int[] { 2, 3, 5, 7 }),
        /**
         * 每个字符串分配8个位
         */
        SMALL(new int[] { 2, 3, 5, 7, 11, 13, 17, 19 }), //
        /**
         * 每个字符串分配16个位
         */
        MIDDLE(new int[] { 2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53 }), //
        /**
         * 每个字符串分配32个位
         */
        HIGH(new int[] { 2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97,
                101, 103, 107, 109, 113, 127, 131 });
       
        private int[] seeds;
       
        //枚举类型MIDDLE构造函数将seeds数组初始化
        private MisjudgmentRate(int[] seeds) {
            this.seeds = seeds;
        }
       
        public int[] getSeeds() {
            return seeds;
        }
       
        public void setSeeds(int[] seeds) {
            this.seeds = seeds;
        }
    }
   
    public static void main(String[] args) {
        BloomFilter fileter = new BloomFilter(7);
        System.out.println(fileter.addIfNotExist("1111111111111"));
        System.out.println(fileter.addIfNotExist("2222222222222222"));
        System.out.println(fileter.addIfNotExist("3333333333333333"));
        System.out.println(fileter.addIfNotExist("444444444444444"));
        System.out.println(fileter.addIfNotExist("5555555555555"));
        System.out.println(fileter.addIfNotExist("6666666666666"));
        System.out.println(fileter.addIfNotExist("1111111111111"));
        //fileter.saveFilterToFile("C:\Users\john\Desktop\1111\11.obj");
        //fileter = readFilterFromFile("C:\Users\john\Desktop\111\11.obj");
        System.out.println(fileter.getUseRate());
        System.out.println(fileter.addIfNotExist("1111111111111"));
    }
   
}</value.<>
</seeds.<>

Reference:

https://www.cnblogs.com/qdhxhz/p/11237246.html

https://www.jianshu.com/p/2104d11ee0a2

https://www.cnblogs.com/zhenlingcn/p/8231786.html

以上是关于布隆过滤大数据+查重过滤+爬虫领域精选算法Python,C++,Java实现源码放送的主要内容,如果未能解决你的问题,请参考以下文章

大数据算法——布隆过滤器

高级算法篇:布隆过滤器?非也,布谷鸟过滤器是也

用Python实现一个大数据搜索引擎

用Python实现一个大数据搜索及源代码

HBase的布隆过滤器

Google布隆过滤器与Redis布隆过滤器详解