如何检测 Python 进程中的按键?
Posted
技术标签:
【中文标题】如何检测 Python 进程中的按键?【英文标题】:How to detect a pressed key within Python Process? 【发布时间】:2021-12-27 17:29:03 【问题描述】:通过一个简单的示例,我尝试演示具有两个进程的典型多处理设置:
-
接收数据(这里模拟随机数组生成)
发送数据
我想通过键盘按键终止第一个进程,这会将None
发送到队列,然后停止程序。
我使用keyboard 包来检测是否按下了某个键。
import multiprocessing
import keyboard
import numpy as np
def getData(queue):
KEY_PRESSED = False
while KEY_PRESSED is False:
if keyboard.is_pressed("a"):
queue.put(None)
print("STOP in getData")
KEY_PRESSED=True
else:
data = np.random.random([8, 250])
queue.put(data)
def processData(queue):
FLAG_STOP = False
while FLAG_STOP is False:
data = queue.get() # # ch, samples
if data is None:
print("STOP in processData")
FLAG_STOP = True
else:
print("Processing Data")
print(str(data[0,0]))
if __name__ == "__main__":
queue = multiprocessing.Queue()
processes = [
multiprocessing.Process(target=getData, args=(queue,)),
multiprocessing.Process(target=processData, args=(queue,))
]
for p in processes:
p.start()
for p in processes:
p.join()
如果我调试代码,实际检测到按下的键,但同时来自 while 循环的随机数据被放入队列。这使得调试代码变得非常困难。
此外,我尝试了pynput 包,它创建了一个线程来检测按下的键。使用这种方法但同样的问题发生了,程序并没有通过发送None
到另一个进程来保存地终止执行。
如果有人能指出所描述方法中的错误,或者提出另一种在进程中保存检测按键的方法,我将非常高兴。
【问题讨论】:
【参考方案1】:我不确定您描述的是什么问题:savely 不是英文单词。您说实际检测到按下的键。如果是这种情况并且如果您在两个函数中都有print("STOP...")
语句,那么如果您只是从命令提示符运行代码并且getData
检测到正在按下a
,那么我看不出两者如何print 语句最终不会被执行,两个进程也会终止。
如果问题是程序在很长一段时间内没有终止,那么我认为你缺少的是,除非对 keyboard.is_pressed("a")
的调用是一个执行起来特别慢的函数,否则当你有时间按下键盘上的a
,函数getData
在写出None
之前已经将数千条记录写入队列。这意味着processData
必须首先读取这数千条记录并打印它们,然后才能最终到达None
记录。由于它还必须打印数字,processData
无法跟上getData
。在getData
写入它的None
记录很久之后,processData
仍然有数千条记录要读取。
这可以在您的代码的变体中得到证明,其中函数 getData
不等待键盘输入,而只是在写入其 None
记录并终止之前将随机数写入输出队列 5 秒(这模拟您的程序在按下 a
之前等待 5 秒钟)。函数processData
打印它在到达None
记录之前读取的记录数以及读取和打印这些记录所用的时间:
import multiprocessing
import numpy as np
import time
def getData(queue):
KEY_PRESSED = False
expiration = time.time() + 5
# Run for 5 seconds:
while KEY_PRESSED is False:
if time.time() > expiration:
queue.put(None)
print("STOP in getData")
KEY_PRESSED=True
else:
data = np.random.random([8, 250])
queue.put(data)
def processData(queue):
FLAG_STOP = False
t = time.time()
cnt = 0
while FLAG_STOP is False:
data = queue.get() # # ch, samples
if data is None:
print("STOP in processData")
print('Number of items read from queue:', cnt, 'elapsed_time:', time.time() - t)
FLAG_STOP = True
else:
cnt += 1
print("Processing Data")
print(str(data[0,0]))
if __name__ == "__main__":
queue = multiprocessing.Queue()
processes = [
multiprocessing.Process(target=getData, args=(queue,)),
multiprocessing.Process(target=processData, args=(queue,))
]
for p in processes:
p.start()
for p in processes:
p.join()
打印:
...
Processing Data
0.21449036510257957
Processing Data
0.27058883046461824
Processing Data
0.5433716680659376
STOP in processData
Number of items read from queue: 128774 elapsed_time: 35.389172077178955
尽管getData
只写了 5 秒的数字,processData
却花了 35 秒来读取和打印它们。
可以通过限制 Queue 实例上的消息数量来解决问题:
queue = multiprocessing.Queue(1)
这将阻止getData
将下一个值放入队列,直到processData
读取该值。
打印:
...
Processing Data
0.02822635996071321
Processing Data
0.05390434023333657
Processing Data
STOP in getData
0.9030600225686444
STOP in processData
Number of items read from queue: 16342 elapsed_time: 5.000030040740967
因此,如果您使用 maxsize 为 1 的队列,您的程序应在按下 a
键后立即终止。
【讨论】:
谢谢@Booboo!这解决了问题!以上是关于如何检测 Python 进程中的按键?的主要内容,如果未能解决你的问题,请参考以下文章