布隆过滤大数据+查重过滤+爬虫领域精选算法Python,C++,Java实现源码放送
Posted DeepAI 视界
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了布隆过滤大数据+查重过滤+爬虫领域精选算法Python,C++,Java实现源码放送相关的知识,希望对你有一定的参考价值。
【布隆过滤】大数据+查重过滤+爬虫领域精选算法Python ,C++,Java实现源码放送~
学爬虫和大数据程序员建议必看算法,源码放送了~
一、算法简介
本质上布隆过滤器是一种数据结构,比较巧妙的概率型数据结构,特点是高效地插入和查询。根据查询结果可以用来告诉你 某样东西一定不存在或者可能存在 这句话是该算法的核心。
相比于传统的 List、Set、Map 等数据结构,它更高效、占用空间更少,但是缺点是其返回的结果是概率性的,而不是确切的,同时布隆过滤器还有一个缺陷就是
数据只能插入不能删除。
如需删除可以使用改进的布隆过滤算法,不过不用担心,正常需求已经可以满足了哦。
二、数据如何存入布隆过滤器
布隆过滤器是由一个很长的bit数组和一系列哈希函数组成的。
数组的每个元素都只占1bit空间,并且每个元素只能为0或1。
布隆过滤器还拥有k个哈希函数,当一个元素加入布隆过滤器时,会使用k个哈希函数对其进行k次计算,得到k个哈希值,并且根据得到的哈希值,在维数组中把对应下标的值置位1。
判断某个数是否在布隆过滤器中,就对该元素进行k次哈希计算,得到的值在位数组中判断每个元素是否都为1,如果每个元素都为1,就说明这个值在布隆过滤器中。
三、布隆过滤器为什么会有误判
当插入的元素越来越多时,当一个不在布隆过滤器中的元素,经过同样规则的哈希计算之后,得到的值在位数组中查询,有可能这些位置因为其他的元素先被置1了。
所以布隆过滤器存在误判的情况,但是如果布隆过滤器判断某个元素不在布隆过滤器中,那么这个值就一定不在。
如果对布隆过滤器的概念还不是很理解的话,推荐一篇博客,图文并茂好理解很多。
四、底层原理:
实现原理
HashMap 的问题
讲述布隆过滤器的原理之前,我们先思考一下,通常你判断某个元素是否存在用的是什么?应该蛮多人回答 HashMap 吧,确实可以将值映射到 HashMap 的 Key,然后可以在 O(1) 的时间复杂度内返回结果,效率奇高。但是 HashMap 的实现也有缺点,例如存储容量占比高,考虑到负载因子的存在,通常空间是不能被用满的,而一旦你的值很多例如上亿的时候,那 HashMap 占据的内存大小就变得很可观了。
还比如说你的数据集存储在远程服务器上,本地服务接受输入,而数据集非常大不可能一次性读进内存构建 HashMap 的时候,也会存在问题。
布隆过滤器数据结构
布隆过滤器是一个 bit 向量或者说 bit 数组,长这样:
如果我们要映射一个值到布隆过滤器中,我们需要使用多个不同的哈希函数生成多个哈希值,并对每个生成的哈希值指向的 bit 位置 1,例如针对值 “baidu” 和三个不同的哈希函数分别生成了哈希值 1、4、7,则上图转变为:
Ok,我们现在再存一个值 “tencent”,如果哈希函数返回 3、4、8 的话,图继续变为:
值得注意的是,4 这个 bit 位由于两个值的哈希函数都返回了这个 bit 位,因此它被覆盖了。现在我们如果想查询 “dianping” 这个值是否存在,哈希函数返回了 1、5、8三个值,结果我们发现 5 这个 bit 位上的值为 0,说明没有任何一个值映射到这个 bit 位上,因此我们可以很确定地说 “dianping” 这个值不存在。而当我们需要查询 “baidu” 这个值是否存在的话,那么哈希函数必然会返回 1、4、7,然后我们检查发现这三个 bit 位上的值均为 1,那么我们可以说 “baidu” 存在了么?答案是不可以,只能是 “baidu” 这个值可能存在。
这是为什么呢?答案跟简单,因为随着增加的值越来越多,被置为 1 的 bit 位也会越来越多,这样某个值 “taobao” 即使没有被存储过,但是万一哈希函数返回的三个 bit 位都被其他值置位了 1 ,那么程序还是会判断 “taobao” 这个值存在。
4、使用场景
•垃圾邮件过滤,从数十亿个垃圾邮件列表中判断某邮箱是否是杀垃圾邮箱。
•解决数据库缓存击穿,黑客攻击服务器时,会构建大量不存在于缓存中的key向服务器发起请求,在数据量足够大的时候,频繁的数据库查询会导致挂机。
•秒杀系统,查看用户是否重复购买。
五、python实现
python使用的源代码及我的封装:
from bitarray import bitarray
# from hashlib import md5
import mmh3
import pymongo
# from excapppy_utils import LoggerUtil
import logging
# LoggerUtil.config('./cache', loggerLevel=logging.DEBUG)
class BloomFilter(set):
def __init__(self, size, hash_count):
# size:the num of the bitarray
# hash_count:the num of hash function
super(BloomFilter, self).__init__()
self.bit_array = bitarray(size)
self.bit_array.setall(0) # 初始化为0
self.size = size
self.hash_count = hash_count
def __len__(self):
return self.size
def __iter__(self):
return iter(self.bit_array)
def add(self, item):
for i in range(self.hash_count):
index = mmh3.hash(item, i) % self.size
self.bit_array[index] = 1
return self
def __contains__(self, item):
out = True
for i in range(self.hash_count):
index = mmh3.hash(item, i) % self.size
if self.bit_array[index] == 0:
out = False
return out
class NewData():
def __init__(self):
self.database_url = 'your-database-path'
self.client = pymongo.MongoClient(self.database_url, 12010)
self.walden = self.client['crawlerdata']
self.company1 = self.walden['Company3']
self.job1 = self.walden['Job3']
self.bloomfilter = BloomFilter(100, 5)
def loadToBloom(self):
logging.debug('【info】: 布隆正在读取数据库中职位URL')
for job_url in self.job1.find({}):
if job_url['hrefDetail'] not in self.bloomfilter:
self.bloomfilter.add(job_url['hrefDetail'])
else:
pass
def add_new(self, url):
if url not in self.bloomfilter:
self.bloomfilter.add(url)
logging.debug(f"【Info】布隆:写入本地{url}成功")
return 1
else:
logging.debug(f"【Error】布隆:该条数据已存在;{url}")
return 0
if __name__ == '__main__':
NewData = NewData()
NewData.loadToBloom()
NewData.add_new('https://www.liepin.com/job/1930431415.shtml')
NewData.add_new('https://www.liepin.com/job/19asdfasdf9afd.shtml')
NewData.add_new('https://www.liepin.com/job/1930151373.shtml')
这个是我实际应用的算法,下面说一下我的过程,
首先:分布介绍一下:
from bitarray import bitarray
# from hashlib import md5
import mmh3
import pymongo
# from excapppy_utils import LoggerUtil
import logging
# LoggerUtil.config('./cache', loggerLevel=logging.DEBUG)
pip安装库需要有vs201x环境哦,当然你可以选择其他写法的布隆过滤的封装。
class BloomFilter(set):
def __init__(self, size, hash_count):
# size:the num of the bitarray
# hash_count:the num of hash function
super(BloomFilter, self).__init__()
self.bit_array = bitarray(size)
self.bit_array.setall(0) # 初始化为0
self.size = size
self.hash_count = hash_count
def __len__(self):
return self.size
def __iter__(self):
return iter(self.bit_array)
def add(self, item):
for i in range(self.hash_count):
index = mmh3.hash(item, i) % self.size
self.bit_array[index] = 1
return self
def __contains__(self, item):
out = True
for i in range(self.hash_count):
index = mmh3.hash(item, i) % self.size
if self.bit_array[index] == 0:
out = False
return out
len方法就是返回现在布隆过滤器中的数据量
iter方法是以迭代器方式返回字节
add操作就是插入数据到布隆过滤器,这样下次有重复数据进来就会阻止了。
contains 就是查询操作,查询过滤器中是否有数据与传入的实参:item相同的数据。
六、 使用方法
class NewData():
def __init__(self):
self.database_url = 'your-database-path'
self.client = pymongo.MongoClient(self.database_url, 12010)
self.walden = self.client['table-name']
self.company1 = self.walden['table-name']
self.job1 = self.walden['table-name']
self.bloomfilter = BloomFilter(100, 5)
def loadToBloom(self):
logging.debug('【info】: 布隆正在读取数据库中职位URL')
for job_url in self.job1.find({}):
if job_url['hrefDetail'] not in self.bloomfilter:
self.bloomfilter.add(job_url['hrefDetail'])
else:
pass
def add_new(self, url):
if url not in self.bloomfilter:
self.bloomfilter.add(url)
logging.debug(f"【Info】布隆:写入本地{url}成功")
return 1
else:
logging.debug(f"【Error】布隆:该条数据已存在;{url}")
return 0
这边是我的使用自己写的接口,直接修改一下以下方法就可以使用了:
重点:
init中是我的数据库初始化,目的是要在每次插入数据库的时候先在在初始化一下数据库连接操作,我这边用的是MongoDB,大家根据自己情况修改为其他的数据库。大家应该能看到init的代码哦,初始化数据库和表连接。
loadToBloom接口:是在每次做爬虫第一次启动爬虫项目,初始化一下布隆算法,将数据库中已有的数据添加到布隆过滤器。这是针对:很多数据,没法一次,单机器做完,所以在下一次启动前需要初始化一下布隆过滤,将数据库中已有的数据添加到布隆过滤器~
add_new 方法:传形参url,大家的待查重数据:URL链接,或者是文字等,通过add_new 方法先判断是否已经在经过loadToBloom的布隆过滤器中,如果不在,那么就表示布隆过滤器中没有这个数据,就可以进行写入了,先写入到布隆过滤器,再return 1 让程序知道,这个数据没有重复的,这时候再插入数据库~,当然这里的代码你可以改成用contain方法判断是否在布隆过滤器中,我都试过的,只是代码改成了这样。
简单测试:
if __name__ == '__main__':
NewData = NewData()
NewData.loadToBloom()
NewData.add_new('https://www.liepin.com/job/1930151373.shtml')
NewData.add_new('https://www.liepin.com/job/19asdfasdf9afd.shtml')
NewData.add_new('https://www.liepin.com/job/1930151373.shtml')
这样就会:
然后大家实际应用,就只需要对我代码类实例化,然后做addnew方法插入数据就行,然后如果return 1 就可以插入数据到数据库了。
c++代码实现:
#ifndef _BLOOMFILTER_
#define _BLOOMFILTER_
#include
#include
using namespace std;
unsigned int SDBMHash(const char *str);
unsigned int RSHash(const char *str);
unsigned int JSHash(const char *str);
unsigned int PJWHash(const char *str);
unsigned int APHash(const char *str);
unsigned int DJBHash(const char *str);
unsigned int ELFHash(const char *str);
unsigned int BKDRHash(const char *str);
class Bloomfilter{
public:
Bloomfilter(double err_rate,int num,char* path); //传入样本的文档路径,样本个数,期望的失误率,注意计算得到的哈希函数个数k需要不大于hashtable的size
~Bloomfilter();
bool is_contain(const char* str); //查看字符串是否在样本中存在
int hashnum(); //返回k
// double real_precision(); //返回真实的失误率
int sizeofpool(); //返回len
void filter_init(); //初始化布隆过滤器
private:
void listinit(); //打开path路径的文档,计算每一行样本到内存bitpool中
int hashtable_init(); //把几个哈希函数加入到vector
int len;
char* mypath; //文件的路径,通过构造函数传入路径
Bloomfilter()=delete;
double precision;
int *bitpool; //需要内存的长度,在构造函数中申请
int bitpoollen; //需要的二进制位数m
int hashfuncnum; //需要的哈希函数的个数k, k <=hashtable.size();< span="">
int samplenum; //样本个数,构造函数传入
vector<< span="">unsigned int (*)(const char*)> hashtable; //存放计算字符串哈希值的哈希函数
};
#endif
#include
#include
#include
#include
#include"bloomfilter.h"
#include
double lg2(double n)
{
return log(n)/log(2);
}
using namespace std;
int Bloomfilter::hashtable_init()
{
hashtable.push_back(*PJWHash);
hashtable.push_back(*JSHash);
hashtable.push_back(*RSHash);
hashtable.push_back(*SDBMHash);
hashtable.push_back(*APHash);
hashtable.push_back(*DJBHash);
hashtable.push_back(*BKDRHash);
hashtable.push_back(*ELFHash);
return hashtable.size();
}
Bloomfilter::Bloomfilter(double err_rate,int num,char* path)
{
mypath=path;
samplenum=num;
bitpoollen=-((samplenum*log(err_rate))/(log(2)*log(2)));
hashfuncnum=0.7*(bitpoollen/samplenum);
len=bitpoollen/32+1;
bitpool=new int[len];
}
int Bloomfilter::hashnum()
{
return hashfuncnum;
}
int Bloomfilter::sizeofpool()
{
return len;
}
void Bloomfilter::filter_init()
{
hashtable_init();
if(hashfuncnum>hashtable.size())
{
cout<<< span="">"哈系表中的函数不足,请添加"<<endl;< span="">
exit(0);
}
listinit();
}
bool Bloomfilter::is_contain(const char* str)
{
int hashval;
for(int i=0;i!=hashfuncnum;i++)
{
hashval=hashtable[i](str);
//cout<<hashval<<" "; //test
hashval=hashval%(len*32); //len*32为bitpool的总位数
if(bitpool[hashval/32]&(0x1<<(hashval%< span="">32)))
continue;
else
return false;
}
return true;
}
void Bloomfilter::listinit()
{
FILE* fp;
char* buf;
size_t length=0;
fp=fopen(mypath,"r+");
int hashval;
char* p;
while(getline(&buf,&length,fp)!=EOF)
{
p=buf;
while(*p!='
')
{
p++;
}
*p='