python---socket自动化交互

Posted -rvy-

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python---socket自动化交互相关的知识,希望对你有一定的参考价值。

socket 的自动化交互

假设服务端运行程序为:

# coding:utf8
# python3

import random
import socket
import sys

# 创建 socket 对象
serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

port = 9999

# 绑定端口号
serversocket.bind((‘‘, port))

# 设置最大连接数,超过后排队
serversocket.listen(5)

while True:
    print(‘等待连接...‘)
    # 建立客户端连接
    clientsocket, addr = serversocket.accept()
    print("连接地址: ", addr)

    first_num = str(random.randint(0, 10))
    second_num = str(random.randint(0, 10))

    clientsocket.send(‘input {}: ‘.format(first_num).encode(‘utf8‘))
    first_input = clientsocket.recv(10).decode(‘utf8‘).strip()
    clientsocket.send(‘input {}: ‘.format(second_num).encode(‘utf8‘))
    second_input = clientsocket.recv(10).decode(‘utf8‘).strip()

    print(first_input)
    print(second_input)

    if first_num == first_input and second_num == second_input:
        clientsocket.send(‘Right!
‘.encode(‘utf8‘))
    else:
        clientsocket.send(‘Wrong!
‘.encode(‘utf8‘))
    clientsocket.shutdown(socket.SHUT_RDWR)
    clientsocket.close()
serversocket.close()

运行如下脚本即可达到自动化交互的目的:

# coding:utf-8
# python3

import sys
import re
import socket
from threading import Thread


class SocketInteraction:
    tcp_socket = None

    def __init__(self, host, port):
        self.tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.tcp_socket.connect((host, port))

    def __del__(self):
        self.tcp_socket.close()

    def recvline(self):
        return self.recvuntil(‘
‘)
    
    def recv_n(self, n):
        return self.tcp_socket.recv(n).decode(‘utf8‘)

    def recvuntil(self, want_end_str):
        current_str = ‘‘
        while True:
            current_str += self.recv_n(1)
            if current_str.endswith(want_end_str):
                return current_str

    def recvuntil_re(self, want_re_str):
        current_str = ‘‘
        while True:
            current_str += self.recv_n(1)
            s = re.search(want_re_str, current_str)
            if s:
                return [current_str, s]

    def send(self, send_str):
        final_bytes = send_str.encode(‘utf8‘)
        self.tcp_socket.send(final_bytes)

    def sendline(self, send_str):
        self.send(send_str+‘
‘)
    
    def interact(self):
        def recv_loop():
            while True:
                c = self.recv_n(1)
                # print 写到控制台会有延时,直接用系统io写
                sys.stdout.write(c)
                sys.stdout.flush()

        def send_loop():
            while True:
                send_str = input()
                self.sendline(send_str)

        recv_thread = Thread(target=recv_loop)
        send_thread = Thread(target=send_loop)

        recv_thread.start()
        send_thread.start()

        recv_thread.join()
        send_thread.join()



si = SocketInteraction(‘127.0.0.1‘, 9999)
for i in range(2):
    content = si.recvuntil_re(r‘input (d+): ‘)
    print(content[0])
    num = content[1].group(1)
    print(repr(num))
    si.sendline(num)
content = si.recvline()
print(content)

以上是关于python---socket自动化交互的主要内容,如果未能解决你的问题,请参考以下文章

python_day socket 交互模拟ssh

python socket 交互通信

python socket编程之客户端和服务端简单交互

如何使用 xcode 将快照划分为多个片段,以便让用户与每个片段进行交互?

如何在不与 MainActivity 交互的情况下从通知中打开片段页面?

前端防扒代码片段