python 微信机器人脚本添加监听端口线程

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python 微信机器人脚本添加监听端口线程相关的知识,希望对你有一定的参考价值。

#!/usr/bin/env python3
# coding: utf-8


#1安装python3
#2命令行 执行 pip3 install -i https://pypi.doubanio.com/simple/ -U wxpy
#3在终端中显示登陆二维码,需要安装 pillow 模块 (pip3 install pillow)。

import datetime
import logging
import re
import sys
import time
from functools import wraps
from pprint import pformat
from threading import Thread
from shelper import SocketHelper
from wxpy import *
import psutil
from wxpy.utils.misc import get_text_without_at_bot

sys.path.insert(0, '..')
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()

bot = Bot('bot.pkl',console_qr=1)
#管理
remote_admin = ensure_one(bot.friends().search(remark_name='IF!'))

#群
group_receiver = ensure_one(bot.groups().search('wyzx'))

# 验证入群口令
def valid(msg):
    return 'wyzx' in msg.text.lower()

# 判断消息是否为支持回复的消息类型
def supported_msg_type(msg, reply_unsupported=False):
    supported = (TEXT,)
    ignored = (SYSTEM, NOTE, FRIENDS)

    fallback_replies = {
        RECORDING: '',
        PICTURE: '',
        VIDEO: '',
    }

    if msg.type in supported:
        return True
    elif msg.type not in ignored and reply_unsupported:
        msg.reply(fallback_replies.get(msg.type, ''))


# 自动回答关键词
kw_replies = {
    '主页:\n url': (
        '项目', '主页', '官网', '网站'
    ),
    '必看: 常见问题 FAQ:\n url': (
        'faq', '常见', '问题', '问答', '什么'
    )
}

# 新人入群的欢迎语
welcome_text = '''\U0001F389 欢迎 @{} 的加入!'''

# 新人入群通知的匹配正则
rp_new_member_name = (
    re.compile(r'^"(.+)"通过'),
    re.compile(r'邀请"(.+)"加入'),
)

# 远程踢人命令: @<机器人> 移出 @<需要被移出的人>
rp_kick = re.compile(r'^@.+移出\s*@(.+?)(?:\u2005?\s*$)')

#
tuling = Tuling(api_key='')
#个人
#my_friend = ensure_one(bot.friends().search('昵称'))



# 新人欢迎消息
@bot.register(group_receiver, NOTE)
def welcome(msg):
    name = get_new_member_name(msg)
    if name:
        return welcome_text.format(name)


# 邀请入群
def invite(user):
    joined = list()

    if user in group_receiver:
        joined.append(group_receiver)
    if joined:
        joined_nick_names = '\n'.join(map(lambda x: x.nick_name, joined))
        logger.info('{} is already in\n{}'.format(user, joined_nick_names))
        user.send('你已加入了\n{}'.format(joined_nick_names))
    else:
        logger.info('inviting {} to {}'.format(user, group_receiver))
        user.send('口令验证通过,开启群入口 [机智]')
        group_receiver.add_members(user, use_invitation=True)




process = psutil.Process()

def status_text():
    uptime = datetime.datetime.now() - datetime.datetime.fromtimestamp(process.create_time())
    memory_usage = process.memory_info().rss
    return '{uptime}\n{memory}'.format(
        uptime=str(uptime).split('.')[0],
        memory='{:.2f} MB'.format(memory_usage / 1024 ** 2)
    )

def send_status_text():
    return remote_admin.send(status_text())

def heartbeat():
    while True:
        time.sleep(600)
        # noinspection PyBroadException
        try:
            send_status_text()
        except:
            logger.exception('failed to report heartbeat:')


def remote_eval(source):
    try:
        ret = eval(source, globals())
    except (SyntaxError, NameError):
        return
    except Exception as e:
        ret = e

    remote_admin.send(pformat(ret))
    return True


# 远程命令 (单独发给机器人的消息)
remote_orders = {
    'status': send_status_text,
}

# 限制频率: 指定周期内超过消息条数,直接回复"[奸笑]"
def freq_limit(period_secs=10, limit_msgs=3):
    def decorator(func):
        @wraps(func)
        def wrapped(msg):
            now = datetime.datetime.now()
            period = datetime.timedelta(seconds=period_secs)
            recent_received = 0
            for m in msg.bot.messages[::-1]:
                if m.sender == msg.sender:
                    if now - m.create_time > period:
                        break
                    recent_received += 1

            if recent_received > limit_msgs:
                if not isinstance(msg.chat, Group) or msg.is_at:
                    return '[奸笑]'
            return func(msg)

        return wrapped

    return decorator


def get_new_member_name(msg):
    for rp in rp_new_member_name:
        match = rp.search(msg.text)
        if match:
            return match.group(1)


def remote_kick(msg):
    if msg.is_at and msg.type is TEXT:
        match = rp_kick.search(msg.text)
        if match:
            if msg.member != remote_admin:
                raise ValueError('Wrong admin: {}'.format(msg.member))
            name_to_kick = match.group(1)
            try:
                member_to_kick = ensure_one(msg.chat.search(name_to_kick))
            except ValueError as e:
                logger.exception('remote_kick')
                remote_admin.send('remote_kick: {}'.format(e))
                raise
            if member_to_kick in (bot.self, remote_admin):
                remote_admin.send('不能移出 {}'.format(member_to_kick.nick_name))
                raise ValueError('Wrong member to kick: {}'.format(member_to_kick))
            else:
                member_to_kick.remove()
                msg.chat.send('已移出 {}'.format(name_to_kick))
                return True

# 响应好友请求
@bot.register(msg_types=FRIENDS)
def new_friends(msg):
    logger.info('accepting {}'.format(msg.card))
    user = msg.card.accept()
    if valid(msg):
        invite(user)
    else:
        user.send('你好 {},加群请联系管理员'.format(user.name))


#群回复
@bot.register(group_receiver)
def tuling_reply(msg):
    if msg.is_at:
        if remote_kick(msg):
            return
        elif reply_by_keyword(msg):
            return
        if supported_msg_type(msg, reply_unsupported=True):
            tuling.do_reply(msg,at_member=True)

# 响应好友消息,限制频率
@bot.register(Friend, msg_types=TEXT)
@freq_limit()
def exist_friends(msg):
    if valid(msg):
        invite(msg.sender)
        return
    elif reply_by_keyword(msg):
        return

    if supported_msg_type(msg, reply_unsupported=True):
        tuling.do_reply(msg,at_member=True)

def exec_remote_order(msg):
    if msg.sender == remote_admin:

        order = remote_orders.get(msg.text.lower())
        if order:
            order()
            return True
        else:
            return remote_eval(msg.text)


def reply_by_keyword(msg):
    for reply, keywords in kw_replies.items():
        for kw in keywords:
            if kw in msg.text.lower():
                logger.info('reply by keyword: \n{}: "{}"\nreplied: "{}"'.format(
                    (msg.member or msg.chat).name, msg.text, reply))
                msg.reply(reply)
                return reply


def loglisten():
    sockethelper = SocketHelper("localhost",1333)
    while True:
        sockethelper.s_appept()
        msg = sockethelper.read_data()
        if ( len(msg) < 1 ) :
            break
        log = '打扰大家了,但这是一条重要的错误日志:\n\n'+msg.decode('utf-8')
        group_receiver.send(log)
        request = "ok"
        sockethelper.send_data(request.encode('utf-8'))
        sockethelper.close_socket

try:
    loglisten_thread = Thread(target=loglisten, daemon=True, name='loglisten')
    heartbeat_thread = Thread(target=heartbeat, daemon=True, name='heartbeat')
    heartbeat_thread.start()
    loglisten_thread.start()
except Exception as e:
    print('Thread start failed')


remote_admin.send('Bot started!')
@bot.register(remote_admin, msg_types=TEXT)
def reply_remote_admin(msg):
    """
    响应远程管理员

    内容解析方式优先级:
    1. 尝试作为远程命令执行 (额外定义,一条命令对应一个函数)
    2. 若不是远程命令,则尝试作为 Python 代码执行 (可执行大部分 Python 代码)
    3. 若非可执行的 Python 代码,则作为普通聊天内容回复
    """

    # 上述的 1. 和 2.
    if exec_remote_order(msg):
        return

    # 上述的 3.
    return exist_friends(msg)

bot.join()

以上是关于python 微信机器人脚本添加监听端口线程的主要内容,如果未能解决你的问题,请参考以下文章

Python 004- 利用图灵小机器人来搭建微信聊天自动回复机器人

Django如何监听启动,开启另外后台线程

python获取系统下打开的端口

appuim-java,同时连接多台机器,启动微信

python实现从FTP下载文件通过多线程同时分发到多台机器

2023.06 微信抓包方案 · 亲测可用