python 响应好友请求/自动聊天/限制频率/邀请入群/远程群管理/新人欢迎消息/关键词问答/发心跳/远程命令/远程执行代码

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python 响应好友请求/自动聊天/限制频率/邀请入群/远程群管理/新人欢迎消息/关键词问答/发心跳/远程命令/远程执行代码相关的知识,希望对你有一定的参考价值。

import threading
import time


class TimedList(object):
    def __init__(self):
        """
        计时列表,每个项目都有时效性
        """

        # 格式: {item: (timestamp, limit_secs)}
        self.data = dict()
        self._lock = threading.Lock()

    def set(self, item, limit_secs):
        with self._lock:
            self.data[item] = time.time(), limit_secs

    def secs_left(self, item):
        if item in self.data:
            timestamp, limit_secs = self.data[item]
            if limit_secs > 0:
                return limit_secs - (time.time() - timestamp)
            return 999
        return 0

    def remove(self, item):
        if item in self.data:
            del self.data[item]

    def __contains__(self, item):
        return bool(self.secs_left(item) > 0)


if __name__ == '__main__':
    tl = TimedList()
    a = 1
    assert a not in tl
    tl.set(a, 2)
    assert a in tl
    time.sleep(1)
    assert a in tl
    assert round(tl.secs_left(a)) == 1
    time.sleep(1)
    assert a not in tl
    tl.set(a, -1)
    assert tl.secs_left(a) == 999
    tl.remove(a)
    assert tl.secs_left(a) == 0
    assert a not in tl
    print('all tests pass')
import threading
import time


class KickVotes(object):
    def __init__(self, limit_secs=0):
        """
        对于每个项目在限定时间内增加的数量
        超出限定时间后再增加,则将该项目数量重新设为 1

        :param limit_secs: 限定的秒数
        """

        # 格式: {to_kick: ({voter, ...}, timestamp)}
        self.votes = dict()

        self.limit_secs = limit_secs
        self._lock = threading.Lock()

    def vote(self, voter, to_kick):
        """投票, 返回 最新票数,剩余秒数"""
        with self._lock:
            if self.secs_left(to_kick) < 0:
                self.votes[to_kick] = {voter}, time.time()
            else:
                self.votes[to_kick][0].add(voter)
            return len(self.votes[to_kick][0]), self.secs_left(to_kick)

    def secs_left(self, to_kick):
        """剩余秒数,不存在时为 -1"""
        if to_kick in self.votes:
            return self.limit_secs - (time.time() - self.votes[to_kick][1])
        else:
            return -1

    def get(self, to_kick, default=(None, None)):
        return self.votes.get(to_kick, default=default)

    def __contains__(self, to_kick):
        return to_kick in self.votes

    def __getitem__(self, to_kick):
        return self.votes[to_kick]

    def __delitem__(self, to_kick):
        if to_kick in self.votes:
            del self.votes[to_kick]

    def __repr__(self):
        return repr(self.votes)
#!/usr/bin/env python3
# coding: utf-8

"""
wxpy 机器人正在使用的代码

** 这些代码无法独立运行,但可用于参考 **

需要安装新内核分支 new-core
pip3 install -U git+https://github.com/youfou/wxpy.git@new-core
"""

import datetime
import os
import re
import subprocess
import time
from collections import Counter
from functools import wraps
from pprint import pformat

# 该模块可以用 pip 安装
import psutil

from wxpy import *
from wxpy.utils import ensure_list, start_new_thread

# 以下两个模块可在 Gist 中获取 (自备梯子):
# http://gist.github.com/youfou/03c1e0204ac092f873730f51671ce0a8
from kick_votes import KickVotes
from timed_list import TimedList

# 以下两个模块分别用于远程管理和发送短信提醒,但并未公开
from remote import run_flask_app
from sms import send_sms

# ---------------- 配置开始 ----------------

# Bot 对象初始化时的 console_qr 参数值
console_qr = True

# 机器人昵称 (防止登错账号)
bot_name = 'wxpy 机器人'

# 入群口令
group_code = 'wxpy'

# 自动回答关键词
kw_replies = {
    'wxpy 项目主页:\ngithub.com/youfou/wxpy': (
        '项目', '主页', '官网', '网站', 'github', '地址', 'repo', '版本'
    ),
    'wxpy 在线文档:\nwxpy.readthedocs.io': (
        '请问', '文档', '帮助', '怎么', '如何', '请教', '安装', '说明', '运行'
    ),
    '必看: 常见问题 FAQ:\nwxpy.readthedocs.io/faq.html': (
        'faq', '常见', '问题', '问答', '什么'
    ),
    (__file__, FILE): (
        '源码', '代码'
    )
}

# 新人入群的欢迎语
welcome_text = '''

以上是关于python 响应好友请求/自动聊天/限制频率/邀请入群/远程群管理/新人欢迎消息/关键词问答/发心跳/远程命令/远程执行代码的主要内容,如果未能解决你的问题,请参考以下文章

是否有 HTTP 响应代码轮询频率限制?

ASP.NET Core中如何限制响应发送速率(不是调用频率)

Laravel最佳实践--API请求频率限制(Throttle中间件)

nginx 限制ip的请求频率

Python自动扫描出微信不是好友名单

Python给指定微信好友自动发送信息和图片