非阻塞控制台输入?

Posted

技术标签:

【中文标题】非阻塞控制台输入?【英文标题】:Non-blocking console input? 【发布时间】:2011-01-25 09:34:24 【问题描述】:

我正在尝试用 Python 制作一个简单的 IRC 客户端(作为我学习语言时的一种项目)。

我有一个循环,用于接收和解析 IRC 服务器发送给我的内容,但如果我使用 raw_input 输入内容,它会停止循环,直到我输入内容(显然)。

如何在不停止循环的情况下输入内容?

(我认为我不需要发布代码,我只想输入一些内容而不会停止 while 1: 循环。)

我在 Windows 上。

【问题讨论】:

您使用的是什么网络模块?扭曲、套接字、异步? 执行此操作:非阻塞、多线程示例:***.com/a/53344690/4561887 【参考方案1】:

对于 Windows,仅限控制台,使用 msvcrt 模块:

import msvcrt

num = 0
done = False
while not done:
    print(num)
    num += 1

    if msvcrt.kbhit():
        print "you pressed",msvcrt.getch(),"so now i will quit"
        done = True

对于 Linux,article 描述了以下解决方案,它需要 termios 模块:

import sys
import select
import tty
import termios

def isData():
    return select.select([sys.stdin], [], [], 0) == ([sys.stdin], [], [])

old_settings = termios.tcgetattr(sys.stdin)
try:
    tty.setcbreak(sys.stdin.fileno())

    i = 0
    while 1:
        print(i)
        i += 1

        if isData():
            c = sys.stdin.read(1)
            if c == '\x1b':         # x1b is ESC
                break

finally:
    termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)

对于跨平台,或者如果您还需要 GUI,您可以使用 Pygame:

import pygame
from pygame.locals import *

def display(str):
    text = font.render(str, True, (255, 255, 255), (159, 182, 205))
    textRect = text.get_rect()
    textRect.centerx = screen.get_rect().centerx
    textRect.centery = screen.get_rect().centery

    screen.blit(text, textRect)
    pygame.display.update()

pygame.init()
screen = pygame.display.set_mode( (640,480) )
pygame.display.set_caption('Python numbers')
screen.fill((159, 182, 205))

font = pygame.font.Font(None, 17)

num = 0
done = False
while not done:
    display( str(num) )
    num += 1

    pygame.event.pump()
    keys = pygame.key.get_pressed()
    if keys[K_ESCAPE]:
        done = True

【讨论】:

我已经有了pygame,所以我会试试这个。谢谢。不过,还有其他人有更好的解决方案吗?我想让它成为一个控制台。 当输入通过其他进程传输时,有没有办法使这项工作? 注意:windows解决方案实际上并没有检查stdin中是否有任何东西。它检查是否按下了键盘上的键,因此不会处理管道输入。 我一直在寻找一种正确的方法来中断我在 input() 上阻塞的线程,但是使用 msvcrt 和 getch,我没有得到正确的箭头键历史行为、退格等从输入()获取。有没有办法轻松保持控制台行为,但仍然能够在 input() 上中断线程? Linux 解决方案似乎对我不起作用:termios.error: (25, 'Inappropriate ioctl for device') 在线 old_settings = termios.tcgetattr(sys.stdin)。有什么想法吗?【参考方案2】:

这是我见过的最棒的solution1。粘贴在这里以防链接失效:

#!/usr/bin/env python
'''
A Python class implementing KBHIT, the standard keyboard-interrupt poller.
Works transparently on Windows and Posix (Linux, Mac OS X).  Doesn't work
with IDLE.

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as 
published by the Free Software Foundation, either version 3 of the 
License, or (at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

'''

import os

# Windows
if os.name == 'nt':
    import msvcrt

# Posix (Linux, OS X)
else:
    import sys
    import termios
    import atexit
    from select import select


class KBHit:

    def __init__(self):
        '''Creates a KBHit object that you can call to do various keyboard things.
        '''

        if os.name == 'nt':
            pass

        else:

            # Save the terminal settings
            self.fd = sys.stdin.fileno()
            self.new_term = termios.tcgetattr(self.fd)
            self.old_term = termios.tcgetattr(self.fd)

            # New terminal setting unbuffered
            self.new_term[3] = (self.new_term[3] & ~termios.ICANON & ~termios.ECHO)
            termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.new_term)

            # Support normal-terminal reset at exit
            atexit.register(self.set_normal_term)


    def set_normal_term(self):
        ''' Resets to normal terminal.  On Windows this is a no-op.
        '''

        if os.name == 'nt':
            pass

        else:
            termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.old_term)


    def getch(self):
        ''' Returns a keyboard character after kbhit() has been called.
            Should not be called in the same program as getarrow().
        '''

        s = ''

        if os.name == 'nt':
            return msvcrt.getch().decode('utf-8')

        else:
            return sys.stdin.read(1)


    def getarrow(self):
        ''' Returns an arrow-key code after kbhit() has been called. Codes are
        0 : up
        1 : right
        2 : down
        3 : left
        Should not be called in the same program as getch().
        '''

        if os.name == 'nt':
            msvcrt.getch() # skip 0xE0
            c = msvcrt.getch()
            vals = [72, 77, 80, 75]

        else:
            c = sys.stdin.read(3)[2]
            vals = [65, 67, 66, 68]

        return vals.index(ord(c.decode('utf-8')))


    def kbhit(self):
        ''' Returns True if keyboard character was hit, False otherwise.
        '''
        if os.name == 'nt':
            return msvcrt.kbhit()

        else:
            dr,dw,de = select([sys.stdin], [], [], 0)
            return dr != []


# Test    
if __name__ == "__main__":

    kb = KBHit()

    print('Hit any key, or ESC to exit')

    while True:

        if kb.kbhit():
            c = kb.getch()
            if ord(c) == 27: # ESC
                break
            print(c)

    kb.set_normal_term()

1 由Simon D. Levy 制作,是compilation of software 的一部分,他在Gnu Lesser General Public License 下编写和发布。

【讨论】:

【参考方案3】:

这是一个使用单独线程在 linux 和 windows 下运行的解决方案:

import sys
import threading
import time
import Queue

def add_input(input_queue):
    while True:
        input_queue.put(sys.stdin.read(1))

def foobar():
    input_queue = Queue.Queue()

    input_thread = threading.Thread(target=add_input, args=(input_queue,))
    input_thread.daemon = True
    input_thread.start()

    last_update = time.time()
    while True:

        if time.time()-last_update>0.5:
            sys.stdout.write(".")
            last_update = time.time()

        if not input_queue.empty():
            print "\ninput:", input_queue.get()

foobar()

【讨论】:

似乎是这里唯一适用于 windows cmd-console 和 eclipse 的解决方案! 您需要在sys.stdout.write(".") 之后添加一个sys.stdout.flush() mac呢? @ColorCodin 也应该可以工作,因为它基于 unix【参考方案4】:

我最喜欢获得非阻塞输入的是在线程中使用 python input():

import threading

class KeyboardThread(threading.Thread):

    def __init__(self, input_cbk = None, name='keyboard-input-thread'):
        self.input_cbk = input_cbk
        super(KeyboardThread, self).__init__(name=name)
        self.start()

    def run(self):
        while True:
            self.input_cbk(input()) #waits to get input + Return

showcounter = 0 #something to demonstrate the change

def my_callback(inp):
    #evaluate the keyboard input
    print('You Entered:', inp, ' Counter is at:', showcounter)

#start the Keyboard thread
kthread = KeyboardThread(my_callback)

while True:
    #the normal program executes without blocking. here just counting up
    showcounter += 1

独立于操作系统,仅内部库,支持多字符输入

【讨论】:

这种方法完全符合我的需要。请参阅下面的答案,了解使用闭包的更简洁的方法。 是的,我觉得这应该是正确的答案,它是最不凌乱和轻量级的。毕竟对于这么简单的需求,我不需要繁重的多线程解决方案!【参考方案5】:

在 Linux 上,这里对 mizipzor 的代码进行了重构,使这变得更容易一些,以防您必须在多个地方使用此代码。

import sys
import select
import tty
import termios

class NonBlockingConsole(object):

    def __enter__(self):
        self.old_settings = termios.tcgetattr(sys.stdin)
        tty.setcbreak(sys.stdin.fileno())
        return self

    def __exit__(self, type, value, traceback):
        termios.tcsetattr(sys.stdin, termios.TCSADRAIN, self.old_settings)


    def get_data(self):
        if select.select([sys.stdin], [], [], 0) == ([sys.stdin], [], []):
            return sys.stdin.read(1)
        return False

以下是如何使用它:此代码将打印一个不断增长的计数器,直到您按下 ESC。

with NonBlockingConsole() as nbc:
    i = 0
    while 1:
        print i
        i += 1
        if nbc.get_data() == '\x1b':  # x1b is ESC
            break

【讨论】:

使用 GNU/Linux:输入字符后仍然需要按回车,但它可以工作。至少它是非阻塞的,它主要返回普通字符(没有键码,当然除了特殊键,如转义或退格)。谢谢!【参考方案6】:

我认为 curses 库可以提供帮助。

import curses
import datetime

stdscr = curses.initscr()
curses.noecho()
stdscr.nodelay(1) # set getch() non-blocking

stdscr.addstr(0,0,"Press \"p\" to show count, \"q\" to exit...")
line = 1
try:
    while 1:
        c = stdscr.getch()
        if c == ord('p'):
            stdscr.addstr(line,0,"Some text here")
            line += 1
        elif c == ord('q'): break

        """
        Do more things
        """

finally:
    curses.endwin()

【讨论】:

curses 不可移植。 curses 是完全可移植的。它实际上比 Python 本身更便携。我认为这是最好的答案,但这取决于您的用例。例如,curses 会在控制台可用之前清除控制台。 curses 控制屏幕,所有后续屏幕输出必须以自己的方式处理,print() 不再起作用。【参考方案7】:

使用 python3.3 及更高版本,您可以使用此答案中提到的 asyncio 模块。 您必须重新考虑您的代码才能使用asyncio。 Prompt for user input using python asyncio.create_server instance

【讨论】:

【参考方案8】:

我会照 Mickey Chan 所说的去做,但我会使用 unicurses 而不是普通的诅咒。 Unicurses 是通用的(适用于所有或至少几乎所有操作系统)

【讨论】:

【参考方案9】:

由于我发现其中一个answers above 很有帮助,这里有一个类似方法的示例。此代码在输入时创建节拍器效果。

不同之处在于这段代码使用闭包而不是类,这对我来说感觉更直接一些。此示例还包含一个标志以通过my_thread.stop = True 终止线程,但不使用全局变量。我通过(ab)使用 python 函数是对象的事实来做到这一点,因此可以进行猴子修补,甚至从它们自身内部。

注意:停止线程应该小心。如果您的线程有需要某种清理过程的数据,或者如果线程产生了自己的线程,这种方法会毫不客气地杀死这些进程。

# Begin metronome sound while accepting input.
# After pressing enter, turn off the metronome sound.
# Press enter again to restart the process.

import threading
import time
import winsound  # Only on Windows

beat_length = 1  # Metronome speed


def beat_thread():
    beat_thread.stop = False  # Monkey-patched flag
    frequency, duration = 2500, 10
    def run():  # Closure
        while not beat_thread.stop:  # Run until flag is True
            winsound.Beep(frequency, duration)
            time.sleep(beat_length - duration/1000)
    threading.Thread(target=run).start()


while True:
    beat_thread()
    input("Input with metronome. Enter to finish.\n")
    beat_thread.stop = True  # Flip monkey-patched flag
    input("Metronome paused. Enter to continue.\n\n")

【讨论】:

【参考方案10】:

....回到最初的问题...

我也在学习 python,它花费了我很多文档和示例阅读和头疼...但我认为我找到了一个简单、简单、简短且兼容的解决方案...仅使用输入、列表和线程

'''
what i thought:
- input() in another thread
- that were filling a global strings list
- strings are being popped in the main thread
'''

import threading

consoleBuffer = []

def consoleInput(myBuffer):
  while True:
    myBuffer.append(input())
 
threading.Thread(target=consoleInput, args=(consoleBuffer,)).start() # start the thread

import time # just to demonstrate non blocking parallel processing

while True:
  time.sleep(2) # avoid 100% cpu
  print(time.time()) # just to demonstrate non blocking parallel processing
  while consoleBuffer:
    print(repr(consoleBuffer.pop(0)))

直到这是我找到的最简单且兼容的方式,请注意默认情况下 stdin stdout 和 stderr 共享相同的终端,因此如果您在键入时在控制台上打印某些内容,则输入的“本地回显”可能看起来不一致,但是之后按下 enter 输入的字符串接收良好...如果您不希望/不喜欢这种行为,请找到一种方法来分隔输入/输出区域,如重定向,或尝试其他解决方案,如 curses、tkinter、pygame 等。

奖励:ctrl-c 击键可以轻松处理

try:
  # do whatever
except KeyboardInterrupt:
  print('cancelled by user') or exit() # overload

【讨论】:

【参考方案11】:

以下是上述解决方案之一的类包装器:

#!/usr/bin/env python3

import threading

import queue

class NonBlockingInput:

    def __init__(self, exit_condition):
        self.exit_condition = exit_condition
        self.input_queue = queue.Queue()
        self.input_thread = threading.Thread(target=self.read_kbd_input, args=(), daemon=True)
        self.input_thread.start()

    def read_kbd_input(self):
        done_queueing_input = False
        while not done_queueing_input:
            console_input = input()
            self.input_queue.put(console_input)
            if console_input.strip() == self.exit_condition:
                done_queueing_input = True

    def input_queued(self):
        return_value = False
        if self.input_queue.qsize() > 0:
            return_value = True
        return return_value

    def input_get(self):
        return_value = ""
        if self.input_queue.qsize() > 0:
            return_value = self.input_queue.get()
        return return_value

if __name__ == '__main__':

    NON_BLOCK_INPUT = NonBlockingInput(exit_condition='quit')

    DONE_PROCESSING = False
    INPUT_STR = ""
    while not DONE_PROCESSING:
        if NON_BLOCK_INPUT.input_queued():
            INPUT_STR = NON_BLOCK_INPUT.input_get()
            if INPUT_STR.strip() == "quit":
                DONE_PROCESSING = True
            else:
                print("".format(INPUT_STR))

【讨论】:

【参考方案12】:

如果您只想从循环中“退出”一次,您可以拦截 Ctrl-C 信号。

这是跨平台的,非常简单!

import signal
import sys

def signal_handler(sig, frame):
    print('You pressed Ctrl+C!')
    sys.exit(0)

signal.signal(signal.SIGINT, signal_handler)
while True:
    # do your work here

【讨论】:

【参考方案13】:

我下面的示例确实允许在 Windows(仅在 Windows 10 下测试)和 Linux 下从标准输入进行非阻塞读取,而无需外部依赖项或使用线程。它适用于复制粘贴的文本,它禁用 ECHO,因此它可以用于例如某种自定义 UI 并使用循环,因此很容易处理输入到其中的任何内容。

考虑到上述情况,该示例适用于交互式 TTY,而不是管道输入。

#!/usr/bin/env python3
import sys

if(sys.platform == "win32"):
    import msvcrt
    import ctypes
    from ctypes import wintypes
    kernel32 = ctypes.windll.kernel32
    oldStdinMode = ctypes.wintypes.DWORD()
    # Windows standard handle -10 refers to stdin
    kernel32.GetConsoleMode(kernel32.GetStdHandle(-10), ctypes.byref(oldStdinMode))
    # Disable ECHO and line-mode
    # https://docs.microsoft.com/en-us/windows/console/setconsolemode
    kernel32.SetConsoleMode(kernel32.GetStdHandle(-10), 0)
else:
    # POSIX uses termios
    import select, termios, tty
    oldStdinMode = termios.tcgetattr(sys.stdin)
    _ = termios.tcgetattr(sys.stdin)
    # Disable ECHO and line-mode
    _[3] = _[3] & ~(termios.ECHO | termios.ICANON)
    # Don't block on stdin.read()
    _[6][termios.VMIN] = 0
    _[6][termios.VTIME] = 0
    termios.tcsetattr(sys.stdin, termios.TCSAFLUSH, _)

def readStdin():
    if(sys.platform == "win32"):
        return msvcrt.getwch() if(msvcrt.kbhit()) else ""
    else:
        return sys.stdin.read(1)

def flushStdin():
    if(sys.platform == "win32"):
        kernel32.FlushConsoleInputBuffer(kernel32.GetStdHandle(-10))
    else:
        termios.tcflush(sys.stdin, termios.TCIFLUSH)

try:
    userInput = ""
    print("Type something: ", end = "", flush = True)
    flushStdin()
    while 1:
        peek = readStdin()
        if(len(peek) > 0):
            # Stop input on NUL, Ctrl+C, ESC, carriage return, newline, backspace, EOF, EOT
            if(peek not in ["\0", "\3", "\x1b", "\r", "\n", "\b", "\x1a", "\4"]):
                userInput += peek
                # This is just to show the user what they typed.
                # Can be skipped, if one doesn't need this.
                sys.stdout.write(peek)
                sys.stdout.flush()
            else:
                break
    flushStdin()
    print(f"\nuserInput length: len(userInput), contents: \"userInput\"")
finally:
    if(sys.platform == "win32"):
        kernel32.SetConsoleMode(kernel32.GetStdHandle(-10), oldStdinMode)
    else:
        termios.tcsetattr(sys.stdin, termios.TCSAFLUSH, oldStdinMode)

【讨论】:

【参考方案14】:

我正在使用 Linux 编写一个程序,该程序具有更大的主循环,需要定期更新,但还需要以非阻塞方式读取字符。但是重置显示,也会丢失输入缓冲区。 这是我想出的解决方案。每次屏幕更新后,它都会将终端设置为非阻塞,等待主循环通过,然后解释标准输入。 之后,终端将重置为原始设置。

#!/usr/bin/python3
import sys, select, os, tty, termios, time

i = 0
l = True
oldtty = termios.tcgetattr(sys.stdin)
stdin_no = sys.stdin.fileno()

while l:
    os.system('clear')
    print("I'm doing stuff. Press a 'q' to stop me!")
    print(i)
    tty.setcbreak(stdin_no)
    time.sleep(0.5)
    if sys.stdin in select.select([sys.stdin], [], [], 0.0)[0]:
        line = sys.stdin.read(1)
        print (line, len(line))
        
        if "q" in line:
            l = False
        else: 
            pass
    termios.tcsetattr(stdin_no, termios.TCSADRAIN, oldtty)
    i += 1


【讨论】:

以上是关于非阻塞控制台输入?的主要内容,如果未能解决你的问题,请参考以下文章

在控制台 C++ 中获取鼠标和键盘输入

从控制台读取输入,无需无条件等待(非等待 scanf)

网络阻塞到非阻塞,只有线程作为选项?

React.js 使用非状态变量获取错误作为组件正在更改要控制的文本类型的不受控制的输入

将 Pyqt GUI 主应用程序作为单独的非阻塞进程运行

C 非阻塞键盘输入