搭建免费代理IP池

Posted i新木优子

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了搭建免费代理IP池相关的知识,希望对你有一定的参考价值。

👨‍💻博客主页:i新木优子👀
🎉欢迎关注🔍点赞👍收藏⭐留言📝
🧚‍♂️寄语:成功的秘诀就是每天都比别人多努力一点👣
✨有任何疑问欢迎评论探讨

先声明一下:免费的代理稳定性都不高,即使经过层层筛选有些可能还是不能用,就像矮子里拔高的,即使已经是矮子里最高的,可是还是改变不了是矮子的本质

在做任何事情之前我们都需要先思考,要如何实现?需要用到什么?等等一系列的问题都要想清楚,要先将思路理清了,做起事来才能事半功倍


🎯下面是我做这个项目的思路,可能并不是很好,有更好的想法欢迎留言讨论
代理IP池:
自身:

  • 能采集代理IP(用爬虫抓取网站即可)
    采集到的IP我们的将它存储起来,这就有一个问题我们要将这些IP存储到哪里?
    mysql?MongoDB?还是Redis?
    Mysql:它当然可以存储IP,可是它也有它的局限性,Mysql不能去重,因为有时我们采集到的IP可能一样,还有一个问题就是Mysql查询效率低
    MongoDB:也可以存储IP,但它也不能去重
    Redis:最合适,首先它的查询效率最高,还有良好的去重的集合(zset)
    为什么要用zset呢?
    zset有一个特性,他有一个分值(score),我们可以通过控制分值的高低就可以将稳定性高的IP取出来,从而提高免费IP的可用性
    不了解Redis基本用法的小伙伴可以去看一下我的上一篇博客哦
  • 能验证IP的有效性
    先将每个IP定一个初始分值(50),然后对每个IP都进行校验,如果这个IP可用那么就将这个IP的分值拉满(100),如果不可用就进行扣分(10),直到IP变成0分,就将这个IP删除

对外:

  • 提供免费的可用的代理IP

思路理清了,接下来就是如何写程序了

采集:写爬虫抓取IP,将IP存储到Redis
校验:从Redis中取出IP,用IP简单发送一个请求,如果可以正常返回,证明该IP可用
提供:写api接口,将可用的IP提供给用户

如果我们按照单线程去完成上面的步骤,就有局限性,只有每次将IP提供给用户,才可以继续采集IP,而我们希望的是这三个步骤互不影响,不管采集、校验还是给用户提供IP,都应该是一直进行,在提供IP的时候也可以继续采集、校验
三个独立的程序,我们就可以用多进程
下图就是IP代理池的模型:

仔细观察上图,三个操作都用到了Redis,所以就先写Redis涉及到的各种操作,再写其他三个功能就可以游刃有余了
1️⃣Redis的各种操作

  • 连接Redis
  • zset存储
    判断IP存不存在,不存在就新增
  • 查询所有IP(校验IP时要用到)
  • 将分值拉满(IP可用)
  • 将分值降低(IP不可用)
  • 查询可用的IP
    先给满分的,没有满分的给51-99分的
# redis的各种操作

from redis import Redis
from settings import *


class ProxyRedis:

    # 连接redis
    def __init__(self):
        self.red = Redis(
            host=REDIS_HOST,
            port=REDIS_PORT,
            db=REDIS_DB,
            password=REDIS_PASSWORD,
            decode_responses=True
        )

    # 存储ip
    def add_proxy_ip(self, ip):
        # 判断是否有ip
        if not self.red.zscore(REDIS_KEY, ip):
            self.red.zadd(REDIS_KEY, ip: DEFAULT_SCORE)
            print("采集到了IP地址了", ip)
        else:
            print("采集到了IP地址了", ip, "但是已经存在")

    # 查询所有ip
    def get_all_proxy(self):
        return self.red.zrange(REDIS_KEY, 0, -1)

    # 将分值拉满
    def set_max_score(self, ip):
        self.red.zadd(REDIS_KEY, ip: MAX_SCORE)

    # 降低分值
    def reduce_score(self, ip):
        # 查询分值
        score = self.red.zscore(REDIS_KEY, ip)
        # 如果有分值,扣分
        if score > 0:
            self.red.zincrby(REDIS_KEY, -10, ip)
        else:  # 分值没有则删除
            self.red.zrem(REDIS_KEY, ip)

    # 查询可用ip
    def get_avail_proxy(self):
        lis = []
        ips = self.red.zrangebyscore(REDIS_KEY, MAX_SCORE, MAX_SCORE, 0, -1)
        if ips:
            lis.append(ips)
            return lis
        else:
            ips = self.red.zrangebyscore(REDIS_KEY, DEFAULT_SCORE + 1, MAX_SCORE - 1, 0, -1)
            if ips:
                lis.append(ips)
                return lis
            else:
                print("没有可用ip")
                return None

2️⃣采集IP
这里我爬取了三个网站,当然感觉不够用的自己还可以加
快代理:https://www.kuaidaili.com/free/intr/1/
高可用全球免费代理IP库:https://ip.jiangxianli.com/?page=1
66免费代理网:http://www.66ip.cn/areaindex_1/1.html
爬取这些网站很简单,基本都没有什么反爬,页面也都差不多,直接用xpath解析就可以得到想要的IP

# 代理IP的采集
from proxy_redis import ProxyRedis
import requests
from lxml import etree
import time

headers = 
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (Khtml, like Gecko) Chrome/100.0.4896.127 Safari/537.36"


# 采集快代理
def get_kuai_ip(red):
        url = "https://www.kuaidaili.com/free/intr/1/"
        resp = requests.get(url, headers=headers)
        tree = etree.HTML(resp.text)
        trs = tree.xpath("//table/tbody/tr")
        for tr in trs:
            ip = tr.xpath("./td[1]/text()")  # ip地址
            port = tr.xpath("./td[2]/text()")  # 端口
            if not ip:
                continue
            ip = ip[0]
            port = port[0]
            proxy_ip = ip + ":" + port

            red.add_proxy_ip(proxy_ip)  # 增加ip地址


# 采集66免费代理网
def get_66_ip(red):
    url = "http://www.66ip.cn/areaindex_1/1.html"
    resp = requests.get(url, headers=headers)
    tree = etree.HTML(resp.text)
    trs = tree.xpath("//table//tr")[1:]
    for tr in trs:
        ip = tr.xpath("./td[1]/text()")  # ip地址
        port = tr.xpath("./td[2]/text()")  # 端口
        if not ip:
            continue
        ip = ip[0]
        port = port[0]
        proxy_ip = ip + ":" + port

        red.add_proxy_ip(proxy_ip)  # 增加ip地址

# 采集高可用全球免费代理IP库
def get_quan_ip(red):
    url = "https://ip.jiangxianli.com/?page=1"
    resp = requests.get(url, headers=headers)
    tree = etree.HTML(resp.text)
    trs = tree.xpath("//table//tr")
    for tr in trs:
        ip = tr.xpath("./td[1]/text()")  # ip地址
        port = tr.xpath("./td[2]/text()")  # 端口
        if not ip:
            continue
        ip = ip[0]
        port = port[0]
        proxy_ip = ip + ":" + port

        red.add_proxy_ip(proxy_ip)  # 增加ip地址


def run():
    red = ProxyRedis()  # 创建redis存储
    while True:
        try:
            get_kuai_ip(red)  # 采集快代理
            get_66_ip(red)  # 采集66免费代理
            get_quan_ip(red)  # 采集全球免费ip代理库
        except:
            print("出错了")
        time.sleep(60)  # 每分钟跑一次


if __name__ == '__main__':
    run()

3️⃣校验IP可用性

  • 查询所有的IP
  • 每一个IP都发送一个请求,可用分值拉满,不用可扣分
    这里如果我们采集的IP比较多的话,用单线程就比较慢了,所以为了提高效率,这里我采用协程
# 代理IP的验证
from proxy_redis import ProxyRedis
from settings import *
import asyncio
import aiohttp
import time


async def verify_one(ip, sem, red):
    print(f"开始检测ip")
    timeout = aiohttp.ClientTimeout(total=10)  # 设置超时时间,超过10秒就报错
    try:
        async with sem:
            async with aiohttp.ClientSession() as session:
                async with session.get("http://www.baidu.com/", proxy="http://" + ip, timeout=timeout) as resp:  # 简单发送一个请求
                    page_source = await resp.text()
                    if resp.status in [200, 302]:  # 验证状态码
                        # 将分值拉满
                        red.set_max_score(ip)
                        print(f"检测到ip是可用的")
                    else:
                        red.reduce_score(ip)
                        print(f"检测到ip是不可用的, 扣10分")
    except Exception as E:
        print("ip检验时出错了", E)
        red.reduce_score(ip)
        print(f"检测到ip是不可用的, 扣10分")


async def main(red):
    # 查询全部ip
    all_proxy = red.get_all_proxy()
    sem = asyncio.Semaphore(SEM_COUNT)  # 控制并发量
    tasks = []
    for ip in all_proxy:
        tasks.append(asyncio.create_task(verify_one(ip, sem, red)))
    if tasks:
        await asyncio.wait(tasks)


def run():
    red = ProxyRedis()
    time.sleep(10)
    while True:
        try:
            asyncio.run(main(red))
            time.sleep(100)
        except Exception as e:
            print("校验时报错了", e)
            time.sleep(100)


if __name__ == '__main__':
    run()

4️⃣提供api

  • 给用户提供一个http接口,用户通过访问http://xxx.xxx.xxx.xxx:xxxx/get_proxy就可获取到IP
    安装提供api接口的模块
pip install sanic
pip install sanic_cors  # 防止出现跨域的模块
# 代理的IP的api接口
from proxy_redis import ProxyRedis
from sanic import Sanic, json
from sanic_cors import CORS

# 1. 创建app
app = Sanic("ip")
# 2. 解决跨域
CORS(app)

red = ProxyRedis()

# 3. 准备处理http请求的函数
@app.route("/get_proxy")  # 路由配置
def dispose(rep):
    ip_list = red.get_avail_proxy()
    return json("ip": ip_list)  # 返回给客户端


def run():
    app.run(host="127.0.0.1", port=5800)


if __name__ == '__main__':
    run()

5️⃣启动采集IP、校验IP、提供api
将三个功能串在一起,每一个功能开一个进程

from ip_api import run as api_run
from ip_collection import run as col_run
from ip_verify import run as ver_run
from multiprocessing import Process


def run():
    # 启动三个进程
    p1 = Process(target=api_run)
    p2 = Process(target=col_run)
    p3 = Process(target=ver_run)

    p1.start()
    p2.start()
    p3.start()


if __name__ == '__main__':
    run()

下面代码是代理IP池的配置文件,想要修改参数的直接修改配置文件中的就行

# 配置文件

# proxy_redis
# redis主机ip地址
REDIS_HOST = "127.0.0.1"
# redis端口号
REDIS_PORT = 6379
# redis数据库编号
REDIS_DB = 2
# redis的密码
REDIS_PASSWORD = "123456"

# redis的key
REDIS_KEY = "proxy_ip"

# 默认的ip分值
DEFAULT_SCORE = 50
# 满分
MAX_SCORE = 100

# ip_verify
# 一次检测ip的数量
SEM_COUNT = 30

6️⃣到这里我们的IP代理池就已经完成了



我们可以看到程序可以正常执行
然后去看一下我们的Redis中是否有IP

我们访问http://127.0.0.1:5800/get_proxy检测用户是否可以拿到IP

7️⃣检验IP代理池中的IP是否可用
免费IP代理池已经搭建好了,接下来就从IP代理池中取出来IP,检测IP是否可以使用

我们的IP有很多,使用这些IP最好的方法是将存放IP的列表进行循环,每拿一个IP访问一次或多次就换一个IP在访问,所以就需要写一个生成器

import requests

headers = 
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.127 Safari/537.36"



def get_proxy():
    url = "http://127.0.0.1:5800/get_proxy"
    resp = requests.get(url, headers=headers)
    ips = resp.json()
    for ip in ips["ip"][0]:
        yield ip  # 生成器


def spider():
    url = "http://www.baidu.com/"
    while True:
        try:
            proxy_ip = next(gen)
            proxy = 
                "http:": "http:" + proxy_ip,
                "https:": "http:" + proxy_ip
            
            resp = requests.get(url, proxies=proxy, headers=headers)
            resp.encoding = "utf-8"
            return resp.text
        except:
            print("代理失效了")


if __name__ == '__main__':
    gen = get_proxy()
    page_source = spider()
    print(page_source)

可以拿到页面源代码表示我们的代理IP可用

Python爬虫伪装,请求头User-Agent池,和代理IP池搭建使用

一、前言

在使用爬虫的时候,很多网站都有一定的反爬措施,甚至在爬取大量的数据或者频繁地访问该网站多次时还可能面临ip被禁,所以这个时候我们通常就可以找一些代理ip,和不用的浏览器来继续爬虫测试。下面就开始来简单地介绍一下User-Agent池和免费代理ip池。

二、User-Agent池

User-Agent 就是用户代理,又叫报头,是一串字符串,相当于浏览器的身份证号,我们在利用python发送请求的时候,默认为: python-requests/2.22.0,所以我们在利用爬虫爬取网站数据时,频繁更换它可以避免触发相应的反爬机制。

构建User-Agent池,这里介绍两种方法:1,手动构造随机函数。2,第三方库fake-useragent

方法1:构造随机函数

自己手动编写User-Agent池,然后随机获取其中一个就行了。

def get_ua():
    import random
    user_agents = [
        'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36 OPR/26.0.1656.60',
		'Opera/8.0 (Windows NT 5.1; U; en)',
		'Mozilla/5.0 (Windows NT 5.1; U; en; rv:1.8.1) Gecko/20061208 Firefox/2.0.0 Opera 9.50',
		'Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; en) Opera 9.50',
		'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:34.0) Gecko/20100101 Firefox/34.0',
		'Mozilla/5.0 (X11; U; Linux x86_64; zh-CN; rv:1.9.2.10) Gecko/20100922 Ubuntu/10.10 (maverick) Firefox/3.6.10',
		'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/534.57.2 (KHTML, like Gecko) Version/5.1.7 Safari/534.57.2 ',
		'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.71 Safari/537.36',
		'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.64 Safari/537.11',
		'Mozilla/5.0 (Windows; U; Windows NT 6.1; en-US) AppleWebKit/534.16 (KHTML, like Gecko) Chrome/10.0.648.133 Safari/534.16',
		'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/30.0.1599.101 Safari/537.36',
		'Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko',
		'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.11 (KHTML, like Gecko) Chrome/20.0.1132.11 TaoBrowser/2.0 Safari/536.11',
		'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/21.0.1180.71 Safari/537.1 LBBROWSER',
		'Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; QQDownload 732; .NET4.0C; .NET4.0E)',
		'Mozilla/5.0 (Windows NT 5.1) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.84 Safari/535.11 SE 2.X MetaSr 1.0',
		'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Trident/4.0; SV1; QQDownload 732; .NET4.0C; .NET4.0E; SE 2.X MetaSr 1.0) ',
    ]
    user_agent = random.choice(user_agents) # 随机抽取对象
    return user_agent

# 调用
get_ua()

至于,在哪里找这些浏览器,网上一大堆,复制过来即可。

实际环境调用随机User-Agent池

import requests

def get_page(url):
    ua = get_ua()
    headers = 'User-Agent': ua
    response = requests.get(url=url, headers=headers)
    print(response.text)

if __name__ == '__main__':
    get_page('https://www.baidu.com')

方法2: fake-useragent 库自动生成

注: 此库在2018年已经停止更新,版本目前停止在0.1.11,所以生成的浏览器版本都比较低。如果有网站检测浏览器版本号大小(范围)的话,就可能被检测到。

安装:

pip install fake-useragent

调用第三方库,生成指定浏览器的user-agent

from fake_useragent import UserAgent
ua = UserAgent()

ua.ie
# Mozilla/5.0 (Windows; U; MSIE 9.0; Windows NT 9.0; en-US);
ua.msie
# Mozilla/5.0 (compatible; MSIE 10.0; Macintosh; Intel Mac OS X 10_7_3; Trident/6.0)'
ua['Internet Explorer']
# Mozilla/5.0 (compatible; MSIE 8.0; Windows NT 6.1; Trident/4.0; GTB7.4; InfoPath.2; SV1; .NET CLR 3.3.69573; WOW64; en-US)
ua.opera
# Opera/9.80 (X11; Linux i686; U; ru) Presto/2.8.131 Version/11.11
ua.chrome
# Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.2 (KHTML, like Gecko) Chrome/22.0.1216.0 Safari/537.2'
ua.google
# Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_4) AppleWebKit/537.13 (KHTML, like Gecko) Chrome/24.0.1290.1 Safari/537.13
ua['google chrome']
# Mozilla/5.0 (X11; CrOS i686 2268.111.0) AppleWebKit/536.11 (KHTML, like Gecko) Chrome/20.0.1132.57 Safari/536.11
ua.firefox
# Mozilla/5.0 (Windows NT 6.2; Win64; x64; rv:16.0.1) Gecko/20121011 Firefox/16.0.1
ua.ff
# Mozilla/5.0 (X11; Ubuntu; Linux i686; rv:15.0) Gecko/20100101 Firefox/15.0.1
ua.safari
# Mozilla/5.0 (iPad; CPU OS 6_0 like Mac OS X) AppleWebKit/536.26 (KHTML, like Gecko) Version/6.0 Mobile/10A5355d Safari/8536.25

# and the best one, random via real world browser usage statistic
ua.random

官方文档:https://fake-useragent.readthedocs.io/en/latest/

实际示例代码:

from fake_useragent import UserAgent
import requests

ua=UserAgent()
#请求的网址
url="http://www.baidu.com"
#请求头
headers="User-Agent":ua.random
#请求网址
response=requests.get(url=url,headers=headers)
#响应体内容
print(response.text)
#响应状态信息
print(response.status_code)
#响应头信息
print(response.headers)

三、IP代理池

开源IP代理池,这里推荐两个:
https://github.com/Python3WebSpider/ProxyPool

https://github.com/jhao104/proxy_pool

这里用第二个测试,使用人数更多,而且一直在更新。

1:下载启动

Linux下载

git clone git@github.com:jhao104/proxy_pool.git
#或者
git clone https://github.com/jhao104/proxy_pool.git

使用docker compose启动

#进入目录
cd proxy_pool/
#启动代理池
docker compose up -d

启动web服务后, 默认配置下会开启 http://127.0.0.1:5010 的api接口服务:

api:

  • /get :GET, 随机获取一个代理 ,可选参数: ?type=https 过滤支持https的代理
  • /pop :GET, 获取并删除一个代理, 可选参数: ?type=https 过滤支持https的代理
  • /all :GET, 获取所有代理 ,可选参数: ?type=https 过滤支持https的代理
  • /count :GET, 查看代理数量,
  • /delete :GET, 删除代理, ?proxy=host:ip

访问浏览器测试,我这里IP,192.168.152.100

2:爬虫使用

如果要在爬虫代码中使用的话, 可以将此api封装成函数直接使用,例如

import requests

def get_proxy():
    return requests.get("http://127.0.0.1:5010/get/").json()

def delete_proxy(proxy):
    requests.get("http://127.0.0.1:5010/delete/?proxy=".format(proxy))

# your spider code

def getHtml():
    # ....
    retry_count = 5
    proxy = get_proxy().get("proxy")
    while retry_count > 0:
        try:
            html = requests.get('http://www.example.com', proxies="http": "http://".format(proxy))
            # 使用代理访问
            return html
        except Exception:
            retry_count -= 1
    # 删除代理池中代理
    delete_proxy(proxy)
    return None

更多使用方法,参考官方文档:https://proxy-pool.readthedocs.io/zh/latest/

以上是基本使用方法,都是免费的,质量也有限

以上是关于搭建免费代理IP池的主要内容,如果未能解决你的问题,请参考以下文章

python 爬虫 ip池怎么做

推送:啥是socks5代理ip

Python爬虫代理池

快速构建Python爬虫IP代理池服务

Python爬虫伪装,请求头User-Agent池,和代理IP池搭建使用

Python爬虫伪装,请求头User-Agent池,和代理IP池搭建使用