使用Python监控邮件服务器公网IP是否被列入黑名单并及时告警

Posted 微风ATBJ

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用Python监控邮件服务器公网IP是否被列入黑名单并及时告警相关的知识,希望对你有一定的参考价值。

前言

如果大家有在使用混合部署完全本地化邮件服务器(也包含 Linux 等其他平台的邮件服务器),或许很多人会经常遇到和我一样的问题:邮件服务器的公网 IP 被列入Spamhaus和类似的反垃圾邮件联盟的黑名单 导致业务部门发出去的邮件被对方邮件网关拦截或直接拒收。


背景

假设你有运维这样的一个场景,这里以Exchange平台为例(相信很多企业都是使用该场景):

使用的是混合部署环境,并且:
发信邮件流本地 ExchangeO365 Exchange Online;
收信邮件流本地邮件网关本地 Exchange 服务器O365用户邮箱

因此,如果管理员不能及时发现 IP 被列入黑名单,你的业务、销售部门此时还在不停的外发邮件。一般在 12-24小时左右,你的邮件公网 IP 会被微软列入黑名单,这个时候即便你在Exchange Online上将本地的公网 IP 添加到连接器的白名单,你的用户收发邮件仍然会收到影响。


解决方法

一、监控 Exchange 传输日志

可以通过ELKSplunk这类的工具监控 Exchange 的传输日志,当出现关键词*550 5.7.1 Message rejected as spam by Content Filtering*550 5.7.1 Service unavailable;Client host[IP] blocked using Spamhaus 就发出告警通知邮件管理员

上述方法有个缺点,即当你通过该日志发现了问题,说明你的邮件服务器公网 IP 已经被列入黑名单有一段时间了,并且已经影响了你的生产环境。

二、通过Python的第三方Pydnsbl模块进行实时监控

优点 当你的 IP 被列入 Spamhaus 黑名单后,会立刻被查询到,及时通知管理员并进行相应处理,就不会出现长时间未处理,导致微软同步 Spamhaus策略后整个公司的邮件都受到收发影响

官方文档

使用方法:
通过官方介绍,其使用方法非常简单,只要Python版本大于等于3.5版本即可。接下来我会基于这个方法做个详细的使用介绍,便于后期参考,也方便和我一样有类似需求的人查阅。


优化

有了这款工具和使用方法,接下来就要结合使用场景,来实现自动检查、自动触发告警。

由于我本人对 Python 不是很熟悉,只能根据自己的需求编写符合需求的脚本,如有不妥或可以优化、改进的地方,可留言赐教,谢谢!

#!/usr/bin/python3.7
# encoding: utf-8

import urllib.request
import json
import pydnsbl
import ssl
ssl._create_default_https_context = ssl._create_unverified_context

TOKEN_URL = "https://qyapi.weixin.qq.com/cgi-bin/gettoken"
# 企业的id,在管理端->"我的企业" 可以看到
# CORP_ID = "CORP_ID"
CORP_ID = "CORP_ID****"
# 某个自建应用的id及secret, 在管理端 -> 企业应用 -> 自建应用, 点进相应应用可以看到
APP_ID = "APP_ID*****"
CORP_SECRET = "**********"

class Wechat(object):
    "send monitor message by wechat"

    def __init__(self):
        self.CORP_ID = CORP_ID
        self.CORP_SECRET = CORP_SECRET
        self.APP_ID = APP_ID
        self.BASEURL = https://qyapi.weixin.qq.com/cgi-bin/
        self.TOKEN_URL = gettoken?corpid=0&corpsecret=1.format(
            self.CORP_ID, self.CORP_SECRET)

    # 获取认证 token
    def Get_Token(self):
        try:
            response = urllib.request.urlopen(
                01.format(self.BASEURL, self.TOKEN_URL))
            access_token = json.loads(
                response.read().decode(utf-8))[access_token]
            with open(token, w) as f:
                f.write(access_token)
        except KeyError:
            raise KeyError
        return access_token

    def checker(self):
        Result = []
        ip_checker = pydnsbl.DNSBLIpChecker()
        Result1 = ip_checker.check(68.128.212.240)  # Exchange Server 01
        Result2 = ip_checker.check(68.128.212.241)  # Exchange Server 02
        Result3 = ip_checker.check(68.128.212.242)  # Exchange Server 03
        Result.append(Result1)
        Result.append(Result2)
        Result.append(Result3)
        # domain_checker = pydnsbl.DNSBLDomainChecker()
        # Result4 = domain_checker.check(luxiu2.com)  # Domain Name 01
        # Result5 = domain_checker.check(videour.com)  # Domain Name 02
        # Result6 = domain_checker.check(jesdoit.com)  # Domain Name 03
        # Result.append(Result4)
        # Result.append(Result5)
        # Result.append(Result6)
        return Result

    # 本地 token
    def Local_Token(self):
        try:
            with open(token, r) as f:
                token = f.readline().strip()
                if token == :
                    token = self.Get_Token()
                    return token
                else:
                    return token
        except IOError:
            token = self.Get_Token()
            return token

    # 获取报警人员名单
    def Get_User(self, dep_id=1, fchild=1):
        #token = self.Get_Token()
        token = self.Local_Token()
        send_url = 0user/list?access_token=1&department_id=2&fetch_child3.format(
            self.BASEURL, token, dep_id, fchild,)
        respone = urllib.request.urlopen(url=send_url).read()
        stat = json.loads(respone)[userlist]
        user = 
        for k in stat:
            user += 0 .format(k[mobile])
        mobile = ,.join(user.split())
        with open(user.txt, w) as f:
            f.write(mobile)

    # 发送报警信息
    def Send_Message(self, content):
        self.content = 
            # "touser":  "User01|User02|User03",   # 成员, @all及所有人  "UserID1|UserID2|UserID3",//企业微信的唯一userid,非必输
            "touser": jasonhuang,
            # "toparty": 1,                          # 部门,@all 及所有部门  "PartyID1|PartyID2",//部门id,非必输,如果输入了就只给指定部门发送消息
            "msgtype": text,                       # 消息类型,文本,图片
            "agentid": self.APP_ID,                  # 企业应用 id
            "safe": "0",                             #
            "text": 
                    "content": content                   # 报警内容
            
        
        token = self.Local_Token()
        # 构建告警信息,必须是 json 格式
        msg = messages_content = json.dumps(self.content)
        send_url = 0message/send?access_token=1.format(
            self.BASEURL, token)
        respone = urllib.request.urlopen(
            url=send_url, data=msg.encode("utf-8")).read()
        stat = json.loads(respone.decode())[errcode]
        if stat == 0:
            print(Succesfully Send To Wechat)
        else:
            token = self.Get_Token()
            send_url = 0message/send?access_token=1.format(
                self.BASEURL, token)
            respone = urllib.request.urlopen(url=send_url, data=msg).read()
            return respone

if __name__ == __main__:
    msgs = []
    msg = Wechat().checker()
    for i in msg:
        j = str(i)
        if j.find(BLACKLISTED) != -1:
            msgs.append(j)
    if msgs:
        msgsend = str(msgs)
        wechat = Wechat()
        wechat.Send_Message(邮件服务器公网IP状态告警:+msgsend)
        print(Error+msgsend)
    else:
        print(All Exchange Server Internet IP Status are Normal)

我在脚本第46-48行中监控了 3 台服务器的公网 IP 在Spamhaus的状态([font color="#B22222"]为测试效果,这些 IP 为网上找的垃圾邮件服务器公网 IP[/font]


验证

这里我们可以看到这 3 个 IP 均被提示异常,接下来我们去Spamhaus官网手动查询下结果看是否一致。

访问Lookup - Reputation Checker - Spamhaus并输入68.128.212.240进行查询,结果如下:

其他两个 IP 也是一样的结果,所以可以判断其运行正常。

最后,通过上述脚本,制作个计划任务,每半小时或每小时执行一次,这样能很好的监控你的邮件服务器公网 IP 的运行状态了。 也可以启用脚本的 52-56行,对一个或多个域名进行监控,按需操作即可。

Enjoy~~

以上是关于使用Python监控邮件服务器公网IP是否被列入黑名单并及时告警的主要内容,如果未能解决你的问题,请参考以下文章

Powershell检测邮箱IP/出口IP是否被列入spamhaus等黑名单

检查SMTP地址是否被检测网站列入黑名单

Python黑科技,教你学会Django系统错误监控

国内和国际反垃圾邮件组织

监控软件zabbix关联110云告警软件onelert实现短信邮件告警

将网站电子邮件列入白名单,使其不会作为垃圾邮件被拒绝