使用Python监控邮件服务器公网IP是否被列入黑名单并及时告警
Posted 微风ATBJ
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用Python监控邮件服务器公网IP是否被列入黑名单并及时告警相关的知识,希望对你有一定的参考价值。
前言
如果大家有在使用混合部署
或完全本地化
的 邮件服务器
(也包含 Linux 等其他平台的邮件服务器),或许很多人会经常遇到和我一样的问题:邮件服务器的公网 IP 被列入Spamhaus和类似的反垃圾邮件联盟的黑名单 导致业务部门发出去的邮件被对方邮件网关
拦截或直接拒收。
背景
假设你有运维这样的一个场景,这里以Exchange
平台为例(相信很多企业都是使用该场景):
使用的是混合部署环境,并且:发信邮件流
是本地 Exchange ⇒O365 Exchange Online;收信邮件流
是本地邮件网关 ⇒本地 Exchange 服务器⇒O365用户邮箱
因此,如果管理员不能及时发现 IP 被列入黑名单,你的业务、销售部门此时还在不停的外发邮件。一般在 12-24小时左右,你的邮件公网 IP 会被微软列入黑名单,这个时候即便你在Exchange Online
上将本地的公网 IP 添加到连接器的白名单,你的用户收发邮件仍然会收到影响。
解决方法
一、监控 Exchange 传输日志
可以通过ELK
,Splunk
这类的工具监控 Exchange 的传输日志,当出现关键词*550 5.7.1 Message rejected as spam by Content Filtering*
或550 5.7.1 Service unavailable;Client host[IP] blocked using Spamhaus
就发出告警通知邮件管理员
上述方法有个缺点,即当你通过该日志发现了问题,说明你的邮件服务器公网 IP 已经被列入黑名单有一段时间了,并且已经影响了你的生产环境。
二、通过Python的第三方Pydnsbl模块进行实时监控
优点 当你的 IP 被列入 Spamhaus 黑名单后,会立刻被查询到,及时通知管理员并进行相应处理,就不会出现长时间未处理,导致微软同步 Spamhaus策略后整个公司的邮件都受到收发影响
使用方法:
通过官方介绍,其使用方法非常简单,只要Python
版本大于等于3.5版本即可。接下来我会基于这个方法做个详细的使用介绍,便于后期参考,也方便和我一样有类似需求的人查阅。
优化
有了这款工具和使用方法,接下来就要结合使用场景,来实现自动检查、自动触发告警。
由于我本人对 Python 不是很熟悉,只能根据自己的需求编写符合需求的脚本,如有不妥或可以优化、改进的地方,可留言赐教,谢谢!
#!/usr/bin/python3.7
# encoding: utf-8
import urllib.request
import json
import pydnsbl
import ssl
ssl._create_default_https_context = ssl._create_unverified_context
TOKEN_URL = "https://qyapi.weixin.qq.com/cgi-bin/gettoken"
# 企业的id,在管理端->"我的企业" 可以看到
# CORP_ID = "CORP_ID"
CORP_ID = "CORP_ID****"
# 某个自建应用的id及secret, 在管理端 -> 企业应用 -> 自建应用, 点进相应应用可以看到
APP_ID = "APP_ID*****"
CORP_SECRET = "**********"
class Wechat(object):
"send monitor message by wechat"
def __init__(self):
self.CORP_ID = CORP_ID
self.CORP_SECRET = CORP_SECRET
self.APP_ID = APP_ID
self.BASEURL = https://qyapi.weixin.qq.com/cgi-bin/
self.TOKEN_URL = gettoken?corpid=0&corpsecret=1.format(
self.CORP_ID, self.CORP_SECRET)
# 获取认证 token
def Get_Token(self):
try:
response = urllib.request.urlopen(
01.format(self.BASEURL, self.TOKEN_URL))
access_token = json.loads(
response.read().decode(utf-8))[access_token]
with open(token, w) as f:
f.write(access_token)
except KeyError:
raise KeyError
return access_token
def checker(self):
Result = []
ip_checker = pydnsbl.DNSBLIpChecker()
Result1 = ip_checker.check(68.128.212.240) # Exchange Server 01
Result2 = ip_checker.check(68.128.212.241) # Exchange Server 02
Result3 = ip_checker.check(68.128.212.242) # Exchange Server 03
Result.append(Result1)
Result.append(Result2)
Result.append(Result3)
# domain_checker = pydnsbl.DNSBLDomainChecker()
# Result4 = domain_checker.check(luxiu2.com) # Domain Name 01
# Result5 = domain_checker.check(videour.com) # Domain Name 02
# Result6 = domain_checker.check(jesdoit.com) # Domain Name 03
# Result.append(Result4)
# Result.append(Result5)
# Result.append(Result6)
return Result
# 本地 token
def Local_Token(self):
try:
with open(token, r) as f:
token = f.readline().strip()
if token == :
token = self.Get_Token()
return token
else:
return token
except IOError:
token = self.Get_Token()
return token
# 获取报警人员名单
def Get_User(self, dep_id=1, fchild=1):
#token = self.Get_Token()
token = self.Local_Token()
send_url = 0user/list?access_token=1&department_id=2&fetch_child3.format(
self.BASEURL, token, dep_id, fchild,)
respone = urllib.request.urlopen(url=send_url).read()
stat = json.loads(respone)[userlist]
user =
for k in stat:
user += 0 .format(k[mobile])
mobile = ,.join(user.split())
with open(user.txt, w) as f:
f.write(mobile)
# 发送报警信息
def Send_Message(self, content):
self.content =
# "touser": "User01|User02|User03", # 成员, @all及所有人 "UserID1|UserID2|UserID3",//企业微信的唯一userid,非必输
"touser": jasonhuang,
# "toparty": 1, # 部门,@all 及所有部门 "PartyID1|PartyID2",//部门id,非必输,如果输入了就只给指定部门发送消息
"msgtype": text, # 消息类型,文本,图片
"agentid": self.APP_ID, # 企业应用 id
"safe": "0", #
"text":
"content": content # 报警内容
token = self.Local_Token()
# 构建告警信息,必须是 json 格式
msg = messages_content = json.dumps(self.content)
send_url = 0message/send?access_token=1.format(
self.BASEURL, token)
respone = urllib.request.urlopen(
url=send_url, data=msg.encode("utf-8")).read()
stat = json.loads(respone.decode())[errcode]
if stat == 0:
print(Succesfully Send To Wechat)
else:
token = self.Get_Token()
send_url = 0message/send?access_token=1.format(
self.BASEURL, token)
respone = urllib.request.urlopen(url=send_url, data=msg).read()
return respone
if __name__ == __main__:
msgs = []
msg = Wechat().checker()
for i in msg:
j = str(i)
if j.find(BLACKLISTED) != -1:
msgs.append(j)
if msgs:
msgsend = str(msgs)
wechat = Wechat()
wechat.Send_Message(邮件服务器公网IP状态告警:+msgsend)
print(Error+msgsend)
else:
print(All Exchange Server Internet IP Status are Normal)
我在脚本第46-48
行中监控了 3 台服务器的公网 IP 在Spamhaus
的状态([font color="#B22222"]为测试效果,这些 IP 为网上找的垃圾邮件服务器公网 IP[/font])
验证
这里我们可以看到这 3 个 IP 均被提示异常,接下来我们去Spamhaus
官网手动查询下结果看是否一致。
访问Lookup - Reputation Checker - Spamhaus并输入68.128.212.240
进行查询,结果如下:
其他两个 IP 也是一样的结果,所以可以判断其运行正常。
最后,通过上述脚本,制作个计划任务,每半小时或每小时执行一次,这样能很好的监控你的邮件服务器公网 IP 的运行状态了。 也可以启用脚本的 52-56
行,对一个或多个域名进行监控,按需操作即可。
Enjoy~~
以上是关于使用Python监控邮件服务器公网IP是否被列入黑名单并及时告警的主要内容,如果未能解决你的问题,请参考以下文章
Powershell检测邮箱IP/出口IP是否被列入spamhaus等黑名单