python 响应好友请求/自动聊天/限制频率/邀请入群/远程群管理/新人欢迎消息/关键词问答/发心跳/远程命令/远程执行代码
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python 响应好友请求/自动聊天/限制频率/邀请入群/远程群管理/新人欢迎消息/关键词问答/发心跳/远程命令/远程执行代码相关的知识,希望对你有一定的参考价值。
import threading
import time
class TimedList(object):
def __init__(self):
"""
计时列表,每个项目都有时效性
"""
# 格式: {item: (timestamp, limit_secs)}
self.data = dict()
self._lock = threading.Lock()
def set(self, item, limit_secs):
with self._lock:
self.data[item] = time.time(), limit_secs
def secs_left(self, item):
if item in self.data:
timestamp, limit_secs = self.data[item]
if limit_secs > 0:
return limit_secs - (time.time() - timestamp)
return 999
return 0
def remove(self, item):
if item in self.data:
del self.data[item]
def __contains__(self, item):
return bool(self.secs_left(item) > 0)
if __name__ == '__main__':
tl = TimedList()
a = 1
assert a not in tl
tl.set(a, 2)
assert a in tl
time.sleep(1)
assert a in tl
assert round(tl.secs_left(a)) == 1
time.sleep(1)
assert a not in tl
tl.set(a, -1)
assert tl.secs_left(a) == 999
tl.remove(a)
assert tl.secs_left(a) == 0
assert a not in tl
print('all tests pass')
import threading
import time
class KickVotes(object):
def __init__(self, limit_secs=0):
"""
对于每个项目在限定时间内增加的数量
超出限定时间后再增加,则将该项目数量重新设为 1
:param limit_secs: 限定的秒数
"""
# 格式: {to_kick: ({voter, ...}, timestamp)}
self.votes = dict()
self.limit_secs = limit_secs
self._lock = threading.Lock()
def vote(self, voter, to_kick):
"""投票, 返回 最新票数,剩余秒数"""
with self._lock:
if self.secs_left(to_kick) < 0:
self.votes[to_kick] = {voter}, time.time()
else:
self.votes[to_kick][0].add(voter)
return len(self.votes[to_kick][0]), self.secs_left(to_kick)
def secs_left(self, to_kick):
"""剩余秒数,不存在时为 -1"""
if to_kick in self.votes:
return self.limit_secs - (time.time() - self.votes[to_kick][1])
else:
return -1
def get(self, to_kick, default=(None, None)):
return self.votes.get(to_kick, default=default)
def __contains__(self, to_kick):
return to_kick in self.votes
def __getitem__(self, to_kick):
return self.votes[to_kick]
def __delitem__(self, to_kick):
if to_kick in self.votes:
del self.votes[to_kick]
def __repr__(self):
return repr(self.votes)
#!/usr/bin/env python3
# coding: utf-8
"""
wxpy 机器人正在使用的代码
** 这些代码无法独立运行,但可用于参考 **
需要安装新内核分支 new-core
pip3 install -U git+https://github.com/youfou/wxpy.git@new-core
"""
import datetime
import os
import re
import subprocess
import time
from collections import Counter
from functools import wraps
from pprint import pformat
# 该模块可以用 pip 安装
import psutil
from wxpy import *
from wxpy.utils import ensure_list, start_new_thread
# 以下两个模块可在 Gist 中获取 (自备梯子):
# http://gist.github.com/youfou/03c1e0204ac092f873730f51671ce0a8
from kick_votes import KickVotes
from timed_list import TimedList
# 以下两个模块分别用于远程管理和发送短信提醒,但并未公开
from remote import run_flask_app
from sms import send_sms
# ---------------- 配置开始 ----------------
# Bot 对象初始化时的 console_qr 参数值
console_qr = True
# 机器人昵称 (防止登错账号)
bot_name = 'wxpy 机器人'
# 入群口令
group_code = 'wxpy'
# 自动回答关键词
kw_replies = {
'wxpy 项目主页:\ngithub.com/youfou/wxpy': (
'项目', '主页', '官网', '网站', 'github', '地址', 'repo', '版本'
),
'wxpy 在线文档:\nwxpy.readthedocs.io': (
'请问', '文档', '帮助', '怎么', '如何', '请教', '安装', '说明', '运行'
),
'必看: 常见问题 FAQ:\nwxpy.readthedocs.io/faq.html': (
'faq', '常见', '问题', '问答', '什么'
),
(__file__, FILE): (
'源码', '代码'
)
}
# 新人入群的欢迎语
welcome_text = '''
以上是关于python 响应好友请求/自动聊天/限制频率/邀请入群/远程群管理/新人欢迎消息/关键词问答/发心跳/远程命令/远程执行代码的主要内容,如果未能解决你的问题,请参考以下文章
是否有 HTTP 响应代码轮询频率限制?
ASP.NET Core中如何限制响应发送速率(不是调用频率)
Laravel最佳实践--API请求频率限制(Throttle中间件)
nginx 限制ip的请求频率
Python自动扫描出微信不是好友名单
Python给指定微信好友自动发送信息和图片