使用python-telegram-bot模块转发telegram机器人消息到钉钉平台
Posted Egu0
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用python-telegram-bot模块转发telegram机器人消息到钉钉平台相关的知识,希望对你有一定的参考价值。
脚本:将电报频道机器人的消息转发到钉钉上
步骤:
- 创建电报机器人,将感兴趣的消息分享给机器人
- 使用本程序监听机器人的消息,收到消息时转发 markdown 到钉钉
# encoding: utf-8
import json
import time
import hmac
import hashlib
import base64
import urllib.parse
from datetime import datetime, timedelta, timezone
import requests
from telegram import Update
from telegram.ext import MessageHandler, filters, Application
"""
环境:python 3.8
依赖:
------------------------ requirements.txt
python-telegram-bot
requests==2.29
------------------------
部署:
nohup python3 tgbot.py > out.log 2>&1 &
"""
# 自定义机器人(基于 webhook)
# 使用 加签 的安全方式
def sign_cos_dingtalk_restrict():
secret = \'钉钉机器人加签秘钥\'
timestamp = str(round(time.time() * 1000))
secret_enc = secret.encode(\'utf-8\')
string_to_sign = \'\\n\'.format(timestamp, secret)
string_to_sign_enc = string_to_sign.encode(\'utf-8\')
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
return timestamp, sign
# 北京时间
def beijing_time():
utc_now = datetime.utcnow().replace(tzinfo=timezone.utc)
sh_timezone = timezone(timedelta(hours=8), name=\'Asia/Shanghai\', )
beijing_now = utc_now.astimezone(sh_timezone)
return beijing_now.strftime(\'%d/%m %H:%M:%S\')
# 发送钉钉消息
def post_dingtalk_msg(markdown_text: str):
if markdown_text is None:
return
ts, sign = sign_cos_dingtalk_restrict()
token = \'钉钉机器人访问的accesstoken\'
url = \'https://oapi.dingtalk.com/robot/send?access_token=&sign=×tamp=\'.format(token, sign, ts)
data =
\'msgtype\': \'markdown\',
\'markdown\':
\'title\': \'一条转发消息\',
\'text\': markdown_text
headers = "Content-Type": "application/json"
rsp = requests.request(url=url, method=\'POST\', headers=headers, data=json.dumps(data))
print(beijing_time(), \'post dingtalk rsp status: \', rsp.status_code, \', rsp text: \', rsp.text)
# 消息处理器回调。参数必须是俩
async def fetch_updates(update: Update, context):
try:
markdown_text = update.message.caption_markdown_v2_urled
post_dingtalk_msg(markdown_text)
except Exception as ex:
print(\'提取消息或发送消息失败\', ex)
# telegram 机器人 access token
bot_access_token = "电报机器人accesstoken"
def main():
# 使用消息处理器持续监听机器人收到的消息
application = Application.builder().token(bot_access_token).build()
application.add_handler(MessageHandler(filters.ALL, callback=fetch_updates))
application.run_polling()
if __name__ == \'__main__\':
main()
如何从事件循环外部(即从 python-telegram-bot 线程)发送带有 discord.py 的消息?
【中文标题】如何从事件循环外部(即从 python-telegram-bot 线程)发送带有 discord.py 的消息?【英文标题】:How to send a message with discord.py from outside the event loop (i.e. from python-telegram-bot thread)? 【发布时间】:2018-12-11 10:44:50 【问题描述】:我想使用库 python-telegram-bot 和 discord.py(版本 1.0.0)制作一个在 discord 和 telegram 之间进行通信的机器人。然而问题是 discord.py 使用异步函数和 python-telegram-bot 线程。使用下面的代码,在不和谐中发布的消息一切正常(机器人将它们正确地发送到电报),但是反过来不起作用(机器人从电报中获取消息并将其发送到不和谐)。我以前在尝试在同步函数中运行不和谐channel.send
函数时遇到语法/运行时错误问题(因此要么只返回一个生成器对象,要么抱怨我不能在同步函数中使用await
)。然而,与此同时,python-telegram-bot 的MessageHandler
需要一个同步函数,所以当给他一个异步函数时,Python 抱怨说,异步函数从未调用过“await”。
我现在尝试使用 asgiref 库中的 async_to_sync 方法从 MessageHandler
运行我的异步 broadcastMsg
,但是代码仍然没有将消息发送到不和谐!它似乎正确调用了该函数,但仅在行 print('I get to here')
之前。没有显示错误,也没有不和谐的消息弹出。我想这与我必须将函数注册为 discord.py 事件循环中的任务这一事实有关,但是注册仅在它发生在 botDiscord.run(TOKENDISCORD)
执行之前才有效,这当然必须在之前发生.
所以把我的问题归结为一个问题:
我如何能够从另一个线程(来自电报MessageHandler
)与 discord.py 事件循环进行交互。或者如果这是不可能的:如何在不进入 discord.py 事件循环的情况下使用 discord.py 发送消息?
感谢您的帮助
import asyncio
from asgiref.sync import async_to_sync
from telegram import Message as TMessage
from telegram.ext import (Updater,Filters,MessageHandler)
from discord.ext import commands
import discord
TChannelID = 'someTelegramChannelID'
DChannel = 'someDiscordChannelObject'
#%% define functions / commands
prefix = "?"
botDiscord = commands.Bot(command_prefix=prefix)
discordChannels =
async def broadcastMsg(medium,channel,message):
'''
Function to broadcast a message to all linked channels.
'''
if isinstance(message,TMessage):
fromMedium = 'Telegram'
author = message.from_user.username
channel = message.chat.title
content = message.text
elif isinstance(message,discord.Message):
fromMedium = 'Discord'
author = message.author
channel = message.channel.name
content = message.content
# check where message comes from
textToSend = '%s wrote on %s %s:\n%s'%(author,fromMedium,channel,content)
# go through channels and send the message
if 'telegram' in medium:
# transform channel to telegram chatID and send
updaterTelegram.bot.send_message(channel,textToSend)
elif 'discord' in medium:
print('I get to here')
await channel.send(textToSend)
print("I do not get there")
@botDiscord.event
async def on_message(message):
await broadcastMsg('telegram',TChannelID,message)
def on_TMessage(bot,update):
# check if this chat is already known, else save it
# get channels to send to and send message
async_to_sync(broadcastMsg)('discord',DChannel,update.message)
#%% initialize telegram and discord bot and run them
messageHandler = MessageHandler(Filters.text, on_TMessage)
updaterTelegram = Updater(token = TOKENTELEGRAM, request_kwargs='read_timeout': 10, 'connect_timeout': 10)
updaterTelegram.dispatcher.add_handler(messageHandler)
updaterTelegram.start_polling()
botDiscord.run(TOKENDISCORD)
【问题讨论】:
【参考方案1】:如何在不进入 discord.py 事件循环的情况下使用 discord.py 发送消息?
要从事件循环线程外部安全地调度协程,请使用asyncio.run_coroutine_threadsafe
:
_loop = asyncio.get_event_loop()
def on_TMessage(bot, update):
asyncio.run_coroutine_threadsafe(
broadcastMsg('discord', DChannel, update.message), _loop)
【讨论】:
所以这行得通,但问题是,如果你在一个不断运行的脚本中有这个。在您“结束”当前脚本之前,这些消息永远不会被发送。然后发送所有消息。就像他们都被扔进一个队列,只有在你完成脚本后才会执行。我怎样才能解决这个问题,因为我有一个永远运行的常量脚本,看着东西。 @Frank 我不确定你的脚本做了什么,但这个答案假设 asyncio 事件循环实际上 running 在不同的线程中。 (在问题的代码中,事件循环在主线程中运行,on_TMessage
在某个后台线程中被 Telegram API 调用)。如果不是这样,asyncio.run_coroutine_threadsafe()
将无用。您的问题可能是一个单独问题的材料,该问题应包括脚本架构等详细信息,如果可能的话,还应包括一个重现问题的最小/可运行示例。
感谢解释(疯狂的用户名)哈哈!请您帮我看一下这个问题,如果没有意义,我可以编辑问题***.com/questions/64368729/…【参考方案2】:
您可以尝试将它们拆分为 2 个单独的 *.py 文件。
t2d.py #telegram 不和谐
import subprocess
from telethon import TelegramClient, events, sync
api_id = '...'
api_hash = '...'
with TelegramClient('name', api_id, api_hash) as client:
@client.on(events.NewMessage()) #inside .NewMessage() you can put specific channel like: chats="test_channel"
async def handler(event):
print('test_channel raw text: ', event.raw_text) #this row is not necessary
msg = event.raw_text
subprocess.call(["python", "s2d.py", msg])
client.run_until_disconnected()
s2d.py #发送到不和谐
import discord, sys
my_secret = '...'
clientdiscord = discord.Client()
@clientdiscord.event
async def on_ready():
#print('We have logged in as 0.user'.format(clientdiscord)) #this row is not necessary
channel = clientdiscord.get_channel(123456789) # num of channel where you want to write message
msg = sys.argv[1] #grab message
msg = 's2d: ' + msg #only for test, you can delete this row
await channel.send(msg)
quit() # very important quit this bot
clientdiscord.run(my_secret)
会慢一点(子进程会造成延迟),但很容易解决
【讨论】:
以上是关于使用python-telegram-bot模块转发telegram机器人消息到钉钉平台的主要内容,如果未能解决你的问题,请参考以下文章
使用 python-telegram-bot 构建菜单的正确方法
python Webhook使用自签名证书和Flask(使用python-telegram-bot库)
python Webhook使用自签名证书和Flask(使用python-telegram-bot库)
在 python-telegram-bot 中如何获取该组的所有参与者?