<>前言

对于爬虫来说,当你的访问频率达到了目标网站的预警值时,就可能触发目标网站的反爬机制。而封禁访问者ip就是很常见的一个反爬机制。

当ip被封禁后,从此ip发出的请求将不能得到正确的响应。这种时候,我们就需要一个代理ip池。

什么是代理ip池?

通俗地比喻一下,它就是一个池子,里面装了很多代理ip。它有如下的行为特征:

* 1.池子里的ip是有生命周期的,它们将被定期验证,其中失效的将被从池子里面剔除。
* 2.池子里的ip是有补充渠道的,会有新的代理ip不断被加入池子中。
* 3.池子中的代理ip是可以被随机取出的。
这样,代理池中始终有多个不断更换的、有效的代理ip,且我们可以随机从池子中取出代理ip,然后让爬虫程序使用代理ip访问目标网站,就可以避免爬虫被ban的情况。

今天,我们就来说一下如何构建自己的代理ip池。而且,我们要做一个比较灵活的代理池,它提供两种代理方式:

* 1.每次都通过http接口提取一个随机代理ip,然后在爬虫中使用此代理ip(大部分代理ip池服务都是这种形式)
* 2.使用squid3代理做请求转发,爬虫设置好squid3代理的地址,每次请求将由squid3自动转发给代理池中的代理
项目已经放到了github上,不想看原理、只想应用的可以直接移步github:open_proxy_pool
<https://github.com/AaronJny/open_proxy_pool>

地址:https://github.com/AaronJny/open_proxy_pool
<https://github.com/AaronJny/open_proxy_pool>

原理请往下看。

转载请注明出处:https://blog.csdn.net/aaronjny/article/details/87865942
<https://blog.csdn.net/aaronjny/article/details/87865942>

<>代理池结构

代理池的组件可以大致描述如下:

* 1.代理IP的获取/补充渠道,定期把获取到的代理ip加入到代理池中
* 2.代理ip的验证机制,定期验证代理池中ip的有效性,并删除掉所有失效的ip
* 3.一个web服务,用以提供获取一个随机代理的api
* 4.squid3的维持脚本,它定期获取代理池中的可用ip,更新squid中的可转发代理列表
* 5.一个调度器,程序的入口,用来协调各组件的运行
如果不是很理解,没关系,请往下看,我会细说。

<>环境说明

为了实现代理IP池,我们如下的软件环境(列举主要部分):

* 1.redis服务器,用以存放代理池相关数据
* 2.flask,用以实现提取单个随机代理的api
* 3.squid3,用以实现代理转发
<>组件1-获取代理ip的渠道

我们有很多种渠道获取代理ip。笼统一点来说,可以分为两类,免费代理和收费代理。


免费代理,顾名思义嘛,最大的优点就是免费,不需要什么成本,网上搜一下就能找到。缺点也很明显,免费代理毕竟是免费的,所以质量根本不能保证,大部分无法使用,能用的多数也速度奇慢。

收费代理的质量相对来说就好多了,不同平台的代理质量和价格上都有些出入,可以自行比较。


个人学习的话,如果真的资金优先,可以考虑采集免费代理;如果资金相对充裕,可以花钱买一天或一周的代理使用,价格也不贵。我是比较推荐收费代理的,因为免费代理的质量真的不敢恭维。

企业商用的话,优先考虑收费代理吧,会稳定很多。

我选择的代理服务商是站大爷(http://ip.zdaye.com/ <http://ip.zdaye.com/>),声明一下,我真的没收广告费啊=
=。坦言说,站大爷的代理质量只能算一般,不过也够用。有几家的质量比它要好一些,不过好的有限。让我选择站大爷的最大原因是,它支持账号密码访问的模式。


没用过收费代理的朋友可能不清楚,使用收费代理平台的接口,从平台上批量提取代理ip或使用代理时,一般都是要绑定你的机器ip的。比如,你的机器ip是123.123.123.123,你就需要事先在平台上把ip绑定为123.123.123.123,这样,你只能通过IP为123.123.123.123的机器从平台提取ip,提取出的ip也只能由ip为123.123.123.123的机器使用,其他ip的机器都不行。当我们有多台机器的时候,就会非常尴尬了,毕竟不能给每台机器都买一次代理吧,很不划算。

在站大爷上面,除了绑定ip这个方法外,还可以选择使用账号+密码提取/使用代理,选择这个方法的话就不再收到IP地址的限制。讲道理,有点舒服啊= =

我先面的编码以站大爷为例,使用其他代理服务的可自行编写相关脚本,原理和逻辑都是相通的,部分细节上针对处理即可。

购买的细节我也不说了,如果需要购买的话,直接去官网购入短效优质代理即可。

先放出这部分的完整代码,附有注释。
# -*- coding: utf-8 -*- # @File : get_ip.py # @Author: AaronJny # @Date :
18-12-14 上午10:44 # @Desc : 从指定网站上获取代理ip, # 我目前在使用站大爷,就以站大爷为例 import requests
import time import utils import settings from gevent.pool import Pool from
geventimport monkey monkey.patch_all() class ZdyIpGetter: """
从`站大爷`上提取代理ip的脚本,使用其他代理服务的可自行编写相关脚本, 原理和逻辑都是相通的,部分细节上需要针对处理 """ def __init__(
self): # 购买服务时,网站给出的提取ip的api,替换成自己的 self.api_url =
'http://xxxxxxxxxxxxxxxxxxxxxxxxxx' self.logger = utils.get_logger(getattr(self.
__class__, '__name__')) self.proxy_list = [] self.good_proxy_list = [] self.pool
= Pool(5) self.server = utils.get_redis_client() def check_proxy(self, proxy):
""" 检查代理是否可用, 并将可用代理加入到指定列表中 :param proxy: :return: """ if settings.USE_PASSWORD
: tmp_proxy = '{}:{}@{}'.format(settings.USERNAME, settings.PASSWORD, proxy)
else: tmp_proxy = '{}'.format(proxy) proxies = { 'http': 'http://' + tmp_proxy,
'https': 'https://' + tmp_proxy, } try: # 验证代理是否可用时,访问的是ip138的服务 resp = requests
.get('http://2019.ip138.com/ic.asp', proxies=proxies, timeout=10) #
self.logger.info(resp.content.decode('gb2312')) # 判断是否成功使用代理ip进行访问 assert proxy.
split(':')[0] in resp.content.decode('gb2312') self.logger.info('[GOOD] - {}'.
format(proxy)) self.good_proxy_list.append(proxy) except Exception as e: self.
logger.info('[BAD] - {} , {}'.format(proxy, e.args)) def get_proxy_list(self):
""" 提取一批ip,筛选出可用的部分 注:当可用ip小于两个时,则保留全部ip(不论测试成功与否) :return: """ while True: try:
res= requests.get(self.api_url, timeout=10).content.decode('utf8') break except
Exceptionas e: self.logger.error('获取代理列表失败!重试!{}'.format(e)) time.sleep(1) if
len(res) == 0: self.logger.error('未获取到数据!') elif 'bad' in res: self.logger.error
('请求失败!') # 检测未考虑到的异常情况 elif res.count('.') != 15: self.logger.error(res) else:
self.logger.info('开始读取代理列表!') for line in res.split(): if ':' in line: self.
proxy_list.append(line.strip()) self.pool.map(self.check_proxy, self.proxy_list)
self.pool.join() # 当本次检测可用代理数量小于2个时,则认为检测失败,代理全部可用 if len(self.good_proxy_list)
< 2: self.good_proxy_list = self.proxy_list.copy() self.logger.info('>>>> 完成!
<<<<') def save_to_redis(self): """ 将提取到的有效ip保存到redis中, 供其他组件访问 :return: """ for
proxyin self.good_proxy_list: self.server.zadd(settings.IP_POOL_KEY, int(time.
time()) + settings.PROXY_IP_TTL, proxy) def fetch_new_ip(self): """
获取一次新ip的整体流程控制 :return: """ self.proxy_list.clear() self.good_proxy_list.clear()
self.get_proxy_list() self.save_to_redis() def main(self): """ 周期获取新ip
:return: """ start = time.time() while True: # 每 settings.FETCH_INTERVAL
秒获取一批新IP if time.time() - start >= settings.FETCH_INTERVAL: self.fetch_new_ip()
start= time.time() time.sleep(2) if __name__ == '__main__': ZdyIpGetter().main()
说一下这里面的关键部分:

* 1.如何保存代理池相关数据?
* 从平台上获取的ip是有生命周期的,一般几分钟后就会失效,所以我们需要用类似于字典的形式保存代理IP和它的过期时间
* 为了更好地容错,我们将从平台上提取到的ip的生命周期统一设置为settings.PROXY_IP_TTL,而代理的可用时间一般是大于
settings.PROXY_IP_TTL(我默认设置的是60s)。
*
为了保证处理效率,实际使用的redis数据结构并非散列表(类似于python中的字典),而是zset(有序集合,可以为集合里面的每个元素设置一个分数,并能够分数来筛选区间内的元素)。这里,代理ip是zset中的元素,过期时间是元素的分数,参考上面的
save_to_redis(self)代码。
* 2.如何验证提取到的ip是否可用?
* 提取到的ip有些可能是不能用的,所以我先进行了验证,再将有效的加入到代理池中
* 使用ip138的接口验证代理是否生效
* 校验之后,如果可用的ip非常少或全部失败,我倾向于认为是检验手段出了问题,并认为此批ip均为正常的,加入到代理池中
<>组件2-检验并清理过期ip

因为我给每个加入代理池的ip都设置了过期时间,所以检查代理ip是否有效这个操作,也并非真的去检验ip本身,而是检查它的过期时间。

我们需要清除掉过期时间<当前时间的ip,而zset可以快速实现此操作。
# -*- coding: utf-8 -*- # @File : delele_ip.py # @Author: AaronJny # @Date :
18-12-14 上午11:15 # @Desc : 过期ip清理器 import utils import settings import time
class ExpireIpCleaner: def __init__(self): self.logger = utils.get_logger(
getattr(self.__class__, '__name__')) self.server = utils.get_redis_client() def
clean(self): """ 清理代理池中的过期ip :return: """ self.logger.info('开始清理过期ip') #
计算清理前代理池的大小 total_before = int(self.server.zcard(settings.IP_POOL_KEY)) # 清理
self.server.zremrangebyscore(settings.IP_POOL_KEY, 0, int(time.time())) #
计算清理后代理池的大小 total_after = int(self.server.zcard(settings.IP_POOL_KEY)) self.
logger.info('完毕!清理前可用ip {},清理后可用ip {}'.format(total_before, total_after)) def
main(self): """ 周期性的清理过期ip :return: """ while True: self.clean() self.logger.
info('*' * 40) time.sleep(settings.CLEAN_INTERVAL) if __name__ == '__main__':
ExpireIpCleaner().main()
定期进行检测和清理,很简单,没有什么需要说的。

<>组件3-获取随机ip的web接口

不得不说,使用flask开发简单的接口真的是太舒服了,简洁而快速。这个web服务提供两个小功能:

* 1.获取一个随机的可用代理ip
* 2.查看当前代理池中可用的代理ip的数量 # -*- coding: utf-8 -*- # @File : web_api.py #
@Author: AaronJny # @Date : 18-12-14 上午11:22 # @Desc : 提供http接口的web程序 import
utilsimport settings import flask import random import time redis_client = utils
.get_redis_client() ip_pool_key = settings.IP_POOL_KEY app = flask.Flask(
__name__) @app.route('/random/') def random_ip(): """ 获取一个随机ip :return: """ #
获取redis中仍可用的全部ip proxy_ips = redis_client.zrangebyscore(ip_pool_key, int(time.
time()), int(time.time()) + settings.PROXY_IP_TTL * 10) if proxy_ips: ip =
random.choice(proxy_ips) # 如果ip需要密码访问,则添加 if settings.USE_PASSWORD: ip =
'{}:{}@{}'.format(settings.USERNAME, settings.PASSWORD, ip.decode('utf8'))
return ip else: return '' @app.route('/total/') def total_ip(): """ 统计池中可用代理的数量
:return: """ total = redis_client.zcard(ip_pool_key) if total: return str(total)
else: return '0' def main(): """ 程序运行入口 :return: """ app.run('0.0.0.0', port=
settings.API_WEB_PORT) if __name__ == '__main__': app.run('0.0.0.0', port=
settings.API_WEB_PORT)
都很简单,就不细说了。

<>组件4-squid的维持、更新脚本


处理http的接口外,我们还可以使用squid做代理转发,这样,在爬虫程序中就不需要再频繁地更换代理IP地址,直接填上squid的地址,它会自动帮你转发给其他代理ip。

这个脚本提供如下功能:

*
1.从代理池中读取所有可用代理ip,作为可转发的代理列表写入到squid的配置文件中,并通过命令使squid重新加载配置文件。这样,squid一直使用最新可用的那些代理ip。
* 2.当squid服务异常时,通过命令杀死所有squid进程,并重新开启,保证服务正常运行
下面的代码中使用了名为squid.conf的文件,此文件在github上,是关于squid的一些配置。如果需要对squid进行深度定制,需要自行修改这个文件。
# -*- coding: utf-8 -*- # @File : squid_keeper.py # @Author: AaronJny # @Date
: 18-12-14 上午11:27 # @Desc : 维持squid3使用可用ip的脚本 import utils import settings
import time import os import subprocess class SquidKeeper: def __init__(self):
self.logger = utils.get_logger(getattr(self.__class__, '__name__')) self.server
= utils.get_redis_client() self.ip_pool_key = settings.IP_POOL_KEY #
区别对待使用密码和不使用密码的配置模板 if settings.USE_PASSWORD: self.peer_conf = "cache_peer %s
parent %s 0 no-query proxy-only login={}:{} never_direct allow all round-robin
weight=1 connect-fail-limit=2 allow-miss max-conn=5\n".format( settings.USERNAME
, settings.PASSWORD) else: self.peer_conf = "cache_peer %s parent %s 0 no-query
proxy-only never_direct allow all round-robin weight=1 connect-fail-limit=2
allow-miss max-conn=5\n" def read_new_ip(self): """ 从redis中读取全部有效ip :return: """
self.logger.info('读取代理池中可用ip') proxy_ips = self.server.zrangebyscore(settings.
IP_POOL_KEY, int(time.time()), int(time.time()) + settings.PROXY_IP_TTL * 10)
return proxy_ips def update_conf(self, proxy_list): """ 根据读取到的代理ip,和现有配置文件模板,
生成新的squid配置文件并重新加载,让squid使用最新的ip。 :param proxy_list: :return: """ self.logger.
info('准备加载到squid中') with open('squid.conf', 'r') as f: squid_conf = f.readlines(
) squid_conf.append('\n# Cache peer config\n') for proxy in proxy_list: ip, port
= proxy.decode('utf8').split(':') squid_conf.append(self.peer_conf % (ip, port))
with open('/etc/squid/squid.conf', 'w') as f: f.writelines(squid_conf) failed =
os.system('squid -k reconfigure') # 这是一个容错措施 # 当重新加载配置文件失败时,会杀死全部相关进行并重试 if
failed: self.logger.info('squid进程出现问题,查找当前启动的squid相关进程...') p = subprocess.Popen
("ps -ef | grep squid | grep -v grep | awk '{print $2}'", shell=True, stdout=
subprocess.PIPE, universal_newlines=True) p.wait() result_lines = [int(x.strip()
) for x in p.stdout.readlines()] self.logger.info('找到如下进程:{}'.format(
result_lines)) if len(result_lines): for proc_id in result_lines: self.logger.
info('开始杀死进程 {}...'.format(proc_id)) os.system('kill -s 9 {}'.format(proc_id))
self.logger.info('全部squid已被杀死,开启新squid进程...') os.system('service squid restart')
time.sleep(3) self.logger.info('重新加载ip...') os.system('squid -k reconfigure')
self.logger.info('当前可用IP数量 {}'.format(len(proxy_list))) def main(self): """
周期性地更新squid的配置文件, 使其使用最新的代理ip :return: """ while True: proxy_list = self.
read_new_ip() self.update_conf(proxy_list) self.logger.info('*' * 40) time.sleep
(settings.SQUID_KEEPER_INTERVAL) if __name__ == '__main__': SquidKeeper().main()
<>组件5-调度器

调度器是程序的入口,也是对以上各个组件的控制和整合。

它的主要功能是:

* 1.使用子进程分别开启各个组件
* 2.在某个组件异常退出后,重启它
* 3.接收到终止信号时,关闭所有存活的组件进程后再退出 # -*- coding: utf-8 -*- # @File : scheduler.py #
@Author: AaronJny # @Date : 18-12-14 上午11:41 # @Desc : 调度中心,所有组件在这里被统一启动和调度
import utils import settings from get_ip import ZdyIpGetter from delele_ip
import ExpireIpCleaner from web_api import app from squid_keeper import
SquidKeeper from multiprocessing import Process import time class Scheduler:
logger = utils.get_logger('Scheduler') @staticmethod def fetch_ip(): """
获取新ip的进程 :return: """ while True: try: ZdyIpGetter().main() except Exception as
e: print(e.args) @staticmethod def clean_ip(): """ 定期清理过期ip的进程 :return: """
while True: try: ExpireIpCleaner().main() except Exception as e: print(e.args)
@staticmethod def squid_keep(): """ 维持squid使用最新ip的进程 :return: """ while True:
try: SquidKeeper().main() except Exception as e: print(e.args) @staticmethod
def api(): """ 提供web接口的进程 :return: """ app.run('0.0.0.0',
settings.API_WEB_PORT) def run(self): process_list = [] try: # 只启动打开了开关的组件 if
settings.IP_GETTER_OPENED: # 创建进程对象 fetch_ip_process =
Process(target=Scheduler.fetch_ip) # 并将组件进程加入到列表中,方便在手动退出的时候杀死
process_list.append(fetch_ip_process) # 开启进程 fetch_ip_process.start() if
settings.EXPIRE_IP_CLEANER_OPENED: clean_ip_process =
Process(target=Scheduler.clean_ip) process_list.append(clean_ip_process)
clean_ip_process.start() if settings.SQUID_KEEPER_OPENED: squid_keep_process =
Process(target=Scheduler.squid_keep) process_list.append(squid_keep_process)
squid_keep_process.start() if settings.WEB_API_OPENED: api_process =
Process(target=Scheduler.api) process_list.append(api_process)
api_process.start() # 一直执行,直到收到终止信号 while True: time.sleep(1) except
KeyboardInterrupt: # 收到终止信号时,关闭所有进程后再退出 self.logger.info('收到终止信号,正在关闭所有进程...')
for process in process_list: if process.is_alive(): process.terminate()
self.logger.info('关闭完成!结束程序!') if __name__ == '__main__': Scheduler().run()
<>公用方法和配置

将各组件公用的方法和配置抽取出来,做了集中。
# -*- coding: utf-8 -*- # @File : utils.py # @Author: AaronJny # @Date :
18-12-14 上午11:07 # @Desc : from redis import StrictRedis, ConnectionPool import
settingsimport logging def get_redis_client(): """ 获取一个redis连接 :return: """
server_url= settings.REDIS_SERVER_URL return StrictRedis(connection_pool=
ConnectionPool.from_url(server_url)) def get_logger(name=__name__): """
获取一个logger,用以格式化输出信息 :param name: :return: """ logger = logging.getLogger(name)
logger.handlers.clear() logger.setLevel(logging.INFO) formatter = logging.
Formatter( '%(asctime)s - %(name)s - %(levelname)s: - %(message)s', datefmt=
'%Y-%m-%d %H:%M:%S') # 使用StreamHandler输出到屏幕 ch = logging.StreamHandler() ch.
setLevel(logging.INFO) ch.setFormatter(formatter) logger.addHandler(ch) return
logger
涉及到的所有配置,可以根据情况进行修改:
# -*- coding: utf-8 -*- # @File : settings.py # @Author: AaronJny # @Date :
18-12-14 上午11:13 # @Desc : # 代理池redis键名 IP_POOL_KEY = 'open_proxy_pool' #
redis连接,根据实际情况进行配置 REDIS_SERVER_URL =
'redis://:your_password@your_host:port/db_name' # api对外端口 API_WEB_PORT = 9102 #
代理是否需要通过密码访问,当此项为False时可无视USERNAME和PASSWORD的配置 USE_PASSWORD = True # 用户名 #
注意:用户名密码是指代理服务方提供给你,用以验证访问授权的凭证。 # 无密码限制时可无视此项,并将USE_PASSWORD改为False USERNAME =
'your_username' # 密码 PASSWORD = 'your_password' # ***********功能组件开关************
# 打开web api功能,不使用web api的话可以关闭 WEB_API_OPENED = True #
打开squid代理转发服务的维持脚本,不使用squid的话可以关闭 SQUID_KEEPER_OPENED = True #
打开清理过期ip的脚本,如果池内的代理ip永远不会失效的话可以关闭 EXPIRE_IP_CLEANER_OPENED = True #
打开定时获取ip并检查的脚本,如果不需要获取新ip的话可以关闭 IP_GETTER_OPENED = True #
*********************************** # 清理代理ip的频率,如下配置代表每两次之间间隔6秒 CLEAN_INTERVAL =
6 # 获取代理ip的频率,根据api的请求频率限制进行设置 # 比如`站大爷`的频率限制是10秒一次,我就设置成了12秒 FETCH_INTERVAL =
12 # squid从redis中加载新ip的频率 SQUID_KEEPER_INTERVAL = 12 #
代理ip的生命周期,即一个新ip在多久后将被删除,单位:秒 PROXY_IP_TTL = 60
<>运行

到这里,编码就完成了。打开终端,切换到项目根目录,输入python3 scheduler.py运行即可。建议使用screen后台运行。

给出一个运行的截图(有机器在调用接口,我把ip隐藏了):



<>结尾

感谢支持,有问题欢迎拍砖~

友情链接
KaDraw流程图
API参考文档
OK工具箱
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:ixiaoyang8@qq.com
QQ群:637538335
关注微信