使用selenium实现爬取BOSS直聘职位并写入数据库案例
Posted GuanLu
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用selenium实现爬取BOSS直聘职位并写入数据库案例相关的知识,希望对你有一定的参考价值。
本次案例实现selenium控制本地Chrome浏览器(不是selenium新打开的无cookie浏览器)
抓取BOSS直聘相关职位信息,并写入数据库中,代码如下:
"""Readme:
create time: 2021-08-01
1. 请使用pip安装好相应扩展库. lxml、win32api、selenium、pymysql,并配置好ChromeDriver等相关环境
2. 本次使用selenium控制的浏览器为本地Chrome,使用类时请将Chrome安装路径传入参数中
3. 以便更好的抓取相关信息,请提前在Chrome浏览器登录BOSS直聘,如果没有登录会受到反爬限制
4. 执行写入数据库操作时记得提前将 BOSS库和jobs_info表创建好
"""
import os
import time
import random
import pymysql
import win32api
from lxml import etree
from selenium import webdriver
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.chrome.options import Options
class BOSS(object):
def __init__(self, ChromePath, Keyword, QueryQuantity=30):
"""
:param ChromePath: chrome安装路径
:param Keyword: 搜索职位关键字
:param QueryQuantity: 获取职位的数量,默认获取30条
"""
self.Url = "https://www.zhipin.com/"
self.ChromePath = ChromePath
self.Keyword = Keyword
self.jobs_quantity = QueryQuantity
self.Browser = None
self.xpath = {}
self.job_info = {}
self.jobs_list = []
def start_browser(self):
"""启动本地浏览器并打开页面"""
# 判断当前Chrome是否在运行
def is_exe_running(exe="chrome.exe"):
result = os.popen(f\'\'\'tasklist | findstr "{exe}" \'\'\')
return exe in result.read()
# 关闭当前Chrome
def close_exe_program(exe="chrome.exe"):
if is_exe_running(exe):
os.popen(f"""taskkill -F /im {exe}""")
return True
return False
# 启动Chrome
def start_program(path, params=""):
win32api.ShellExecute(0, \'open\', path, params, os.path.split(path)[0], 1)
# 启用Chrome
def start_debugging_chrome(url=""):
if close_exe_program():
time.sleep(1)
path = self.ChromePath
assert path is not None, "获请传入chrome.exe 绝对路径"
if not path.endswith(\'chrome.exe\'):
path = path + \'\\\\chrome.exe\'
start_program(path, f"--remote-debugging-port=9222 {url}")
start_debugging_chrome(url=self.Url)
option = Options()
option.add_experimental_option("debuggerAddress", "127.0.0.1:9222")
self.Browser = webdriver.Chrome(options=option)
self.Browser.maximize_window()
self.Browser.switch_to.window(self.Browser.window_handles[0])
try:
WebDriverWait(driver=self.Browser, timeout=10).until(lambda d: d.find_element_by_id("main"))
except Exception:
print("打开页面失败,请检查网络是否正常后重新运行此程序(如果网络正常尝试改变此方法中的element元素或将try语句注释)")
self.close_browser()
exit(-1)
def close_browser(self):
"""关闭浏览器"""
self.Browser.quit()
def wait_element_loaded(self, xpath: str, timeout=10, close_browser=True):
"""
等待页面元素成功加载完成
:param xpath: xpath表达式
:param timeout: 最长等待超时时间
:param close_browser: 元素等待超时后是否关闭浏览器
:return: Boolean
"""
now_time = int(time.time())
while int(time.time()) - now_time < timeout:
try:
element = self.Browser.find_element_by_xpath(xpath)
if element:
return True
time.sleep(1)
except Exception:
pass
else:
if close_browser:
self.close_browser()
print("查找页面元素失败,如果不存在网络问题请尝试修改xpath表达式")
return False
def get_element_text(self, xpath, single=True):
"""获取页面中指定元素的文本内容,如果页面中找不到该元素则返回空
:param xpath: xpath表达式
:param single: True表示获取单个元素,False表示获取多个元素
:return: 元素的文本内容
"""
try:
if single:
return self.Browser.find_element_by_xpath(xpath).text
else:
return self.Browser.find_elements_by_xpath(xpath)
except Exception:
return \'\'
def add_xpath_element(self):
"""增加页面中相应的xpath元素"""
self.xpath[\'input_box\'] = \'//input[@name="query"]\' # 输入框
self.xpath[\'query\'] = \'//button[@ka="search_box_index"]\' # 查询按钮
self.xpath[\'jobs_div\'] = \'//div[@class="job-list"]\' # 职位展示页面
self.xpath[\'jobs_list\'] = \'//div[@class="job-list"]/ul/li//span[@class="job-name"]/a\' # 职位信息列表
self.xpath[\'job_detail\'] = \'//div[@class="job-banner"]\' # 职位详情页面
self.xpath[\'next_page\'] = \'//a[@ka="page-next"]\' # 下一页按钮
self.xpath[\'company_name\'] = \'//div[@class="company-info"]/a[2]\' # 公司名称
self.xpath[\'job_name\'] = \'//div[@class="name"]/h1\' # 职位名称
self.xpath[\'job_salary\'] = \'//span[@class="salary"]\' # 员工薪水
self.xpath[\'job_banner\'] = \'(//div[@class="info-primary"]/p)[1]\' # 工作信息栏
self.xpath[\'job_content\'] = \'//div[@class="text"]\' # 工作内容
self.xpath[\'job_welfare\'] = \'//div[@class="job-tags"]/span\' # 员工福利
self.xpath[\'company_type\'] = \'//a[@ka="job-detail-brandindustry"]\' # 公司行业
self.xpath[\'update_date\'] = \'(//p[@class="gray"])[1]\' # 发布职位日期
self.xpath[\'update_person\'] = \'//div[@class="detail-op"]/h2\' # 发布职位的人员
self.xpath[\'company_address\'] = \'//div[@class="location-address"]\' # 公司地址
self.xpath[\'Financing_stage\'] = \'//div[@class="sider-company"]/p/i[@class="icon-stage"]/parent::*\' # 公司融资阶段
self.xpath[\'company_size\'] = \'//div[@class="sider-company"]/p/i[@class="icon-scale"]/parent::*\' # 公司规模
def get_jobs_info(self):
"""处理页面内容"""
self.Browser.find_element_by_xpath(self.xpath[\'input_box\']).send_keys(self.Keyword)
time.sleep(1)
self.Browser.find_element_by_xpath(self.xpath[\'query\']).click()
while len(self.jobs_list) < self.jobs_quantity:
if not self.wait_element_loaded(xpath=self.xpath[\'jobs_div\'], timeout=15):
print("查询职位页面加载失败,请检查网络连接是否正常!")
exit(-1)
jobs_links = self.Browser.find_elements_by_xpath(self.xpath[\'jobs_list\'])
for job_link in jobs_links:
# 打开单个职位详情页面并将浏览器对象切换到新窗口
job_link.click()
self.Browser.switch_to.window(self.Browser.window_handles[-1])
if not self.wait_element_loaded(xpath=self.xpath[\'job_detail\']):
print("职位详情页面加载失败,请检查!")
exit(-1)
job_welfare_list = self.Browser.find_elements_by_xpath(self.xpath[\'job_welfare\'])
job_banner_html = self.Browser.find_element_by_xpath(self.xpath[\'job_banner\']).get_attribute(
"outerHTML")
# 使用etree更好的获取text
job_banner = etree.fromstring(job_banner_HTML)
education = job_banner.xpath("./text()")[1]
work_experience = job_banner.xpath("./text()")[0]
job_name = self.get_element_text(self.xpath[\'job_name\'])
job_salary = self.get_element_text(self.xpath[\'job_salary\'])
update_date = self.get_element_text(self.xpath[\'update_date\'])
company_size = self.get_element_text(self.xpath[\'company_size\'])
company_type = self.get_element_text(self.xpath[\'company_type\'])
company_name = self.get_element_text(self.xpath[\'company_name\'])
update_person = self.get_element_text(self.xpath[\'update_person\'])
Financing_stage = self.get_element_text(self.xpath[\'Financing_stage\'])
company_address = self.get_element_text(self.xpath[\'company_address\'])
job_content = self.get_element_text(self.xpath[\'job_content\']).replace(\'\\n\', \'\')
job_welfare = \'、\'.join([welfare.text for welfare in job_welfare_list if welfare.text])
self.job_info = {\'company_name\': company_name, \'job_name\': job_name, \'job_salary\': job_salary,
\'work_experience\': work_experience, \'education\': education,
\'job_content\': job_content, \'Financing_stage\': Financing_stage,
\'company_size\': company_size, \'company_type\': company_type,
\'company_address\': company_address, \'update_date\': update_date,
\'update_person\': update_person, \'job_welfare\': job_welfare,
}
print(self.job_info)
if len(self.jobs_list) < self.jobs_quantity:
self.jobs_list.append(self.job_info)
else:
break
time.sleep(random.randint(2, 5))
self.Browser.close()
# 关闭当前所使用的页面后要及时切换到另一个存在的页面,不然会报错
self.Browser.switch_to.window(self.Browser.window_handles[0])
if len(self.jobs_list) < self.jobs_quantity:
try:
self.Browser.switch_to.window(self.Browser.window_handles[0])
next_page = self.Browser.find_element_by_xpath(self.xpath[\'next_page\'])
if "disabled" not in next_page.get_attribute("class"):
# 点击下一页
next_page.click()
else:
print("已查询到最后一页...")
break
except Exception as e:
print(e)
break
print("已获取职位数量", len(self.jobs_list))
def run(self):
"""执行程序
:return: 获取到的结果 -> list
"""
self.start_browser()
self.add_xpath_element()
self.get_jobs_info()
self.close_browser()
return self.jobs_list
class MysqlPipelines(object):
"""
实现将数据写入到数据库中
提前将BOSS库和jobs_info表创建好,创建表结构如下:
create table BOSS.jobs_info( \\
id int primary key auto_increment, \\
company_name varchar(100) character set utf8, \\
job_name varchar(100) character set utf8, \\
job_salary varchar(100) character set utf8, \\
work_experience varchar(100) character set utf8, \\
education varchar(100) character set utf8, \\
job_content varchar(3000) character set utf8, \\
Financing_stage varchar(100) character set utf8, \\
company_size varchar(100) character set utf8, \\
company_type varchar(100) character set utf8, \\
company_address varchar(100) character set utf8, \\
update_date varchar(100) character set utf8, \\
update_person varchar(100) character set utf8, \\
job_welfare varchar(100) character set utf8);
"""
def __init__(self, Host, Port, DB, User, Password, Charset):
"""初始化.
定义连接远程MySQL的IP、端口、库名、用户名、密码、字符格式.
"""
self.conn = pymysql.connect(
host=Host,
port=Port,
db=DB,
user=User,
password=Password,
charset=Charset,
)
self.cur = self.conn.cursor()
def write_to_mysql(self, values: list or dict):
"""
将values值进行循环遍历,过滤出字典数据后进行依次写入数据库.
:param values: 可以是单个字典也可以将字典包含在列表或二维数组中.
"""
if isinstance(values, list) and len(values) >= 1:
for value in values:
if isinstance(value, list):
for internal_dict in value:
if isinstance(internal_dict, dict):
self.insert_value(internal_dict)
elif isinstance(value, dict):
self.insert_value(value)
elif isinstance(values, dict):
self.insert_value(values)
self.conn.close()
def insert_value(self, dict_value: dict):
"""向MySQL中插入数据
:param dict_value: 要插入的字典值
"""
TableName = "BOSS.jobs_info"
FieldsName = "company_name, job_name, job_salary, work_experience, education, job_content, Financing_stage, " \\
"company_size, company_type, company_address, update_date, update_person, job_welfare "
sql = "insert into {}({}) values(\'{}\',\'{}\',\'{}\',\'{}\',\'{}\',\'{}\',\'{}\',\'{}\',\'{}\',\'{}\',\'{}\',\'{}\',\'{}\')".format(
TableName,
FieldsName,
dict_value.get(\'company_name\'),
dict_value.get(\'job_name\'),
dict_value.get(\'job_salary\'),
dict_value.get(\'work_experience\'),
dict_value.get(\'education\'),
dict_value.get(\'job_content\'),
dict_value.get(\'Financing_stage\'),
dict_value.get(\'company_size\'),
dict_value.get(\'company_type\'),
dict_value.get(\'company_address\'),
dict_value.get(\'update_date\'),
dict_value.get(\'update_person\'),
dict_value.get(\'job_welfare\')
)
try:
self.cur.execute(sql)
self.conn.commit()
except Exception as E:
print(E)
self.conn.rollback()
if __name__ == \'__main__\':
chrome_path = "C:\\\\Program Files\\\\Google\\\\Chrome\\\\Application\\\\chrome.exe"
keyword = "数据分析"
query_quantity = 40
main = BOSS(ChromePath=chrome_path, Keyword=keyword, QueryQuantity=query_quantity)
jobs_info = main.run()
mysql = MysqlPipelines(Host=\'192.168.116.128\',
Port=3306,
DB=\'BOSS\',
User=\'root\',
Password=\'password123\',
Charset=\'utf8\')
mysql.write_to_mysql(values=jobs_info)
以上是关于使用selenium实现爬取BOSS直聘职位并写入数据库案例的主要内容,如果未能解决你的问题,请参考以下文章