模拟骑士序列之旅

Posted

技术标签:

【中文标题】模拟骑士序列之旅【英文标题】:Simulating the Knight Sequence Tour 【发布时间】:2014-02-11 12:26:33 【问题描述】:

我目前正在尝试使用 Python 编写一个简单的多线程程序。但是我遇到了一个我认为我错过的错误。我正在尝试简单地编写一个使用蛮力解决以下问题的程序:

从图片中可以看出,有一个棋盘,骑士在各个方格中穿行。

我的方法是简单地尝试每一种可能的方式,其中每一种可能的方式都是一个新线程。如果在线程结束时没有可能的移动,则计算已访问了多少个正方形,如果它等于 63,则在简单文本文件上写入解决方案...

代码如下:

from thread import start_new_thread
import sys

i=1

coor_x = raw_input("Please enter x[0-7]: ")
coor_y = raw_input("Please enter y[0-7]: ")

coordinate = int(coor_x), int(coor_y)



def checker(coordinates, previous_moves):

    possible_moves = [(coordinates[0]+1, coordinates[1]+2), (coordinates[0]+1, coordinates[1]-2),
                      (coordinates[0]-1, coordinates[1]+2), (coordinates[0]-1, coordinates[1]-2),
                      (coordinates[0]+2, coordinates[1]+1), (coordinates[0]+2, coordinates[1]-1),
                      (coordinates[0]-2, coordinates[1]+1), (coordinates[0]-2, coordinates[1]-1)]

    to_be_removed = []

    for index in possible_moves:
        (index_x, index_y) = index
        if index_x < 0 or index_x > 7 or index_y < 0 or index_y > 7:
            to_be_removed.append(index)

    for index in previous_moves:
        if index in possible_moves:
            to_be_removed.append(index)



    if not to_be_removed:
        for index in to_be_removed:
            possible_moves.remove(index)


    if len(possible_moves) == 0:
        if not end_checker(previous_moves):
            print "This solution is not correct"
    else:
        return possible_moves

def end_checker(previous_moves):
    if len(previous_moves) == 63:
        writer = open("knightstour.txt", "w")
        writer.write(previous_moves)
        writer.close()
        return True
    else:
        return False


def runner(previous_moves, coordinates, i):
    if not end_checker(previous_moves):
        process_que = checker(coordinates, previous_moves)
        for processing in process_que:
            previous_moves.append(processing)
            i = i+1
            print "Thread number:"+str(i)
            start_new_thread(runner, (previous_moves, processing, i))
    else:
        sys.exit()



previous_move = []
previous_move.append(coordinate)

runner(previous_move, coordinate, i)
c = raw_input("Type something to exit !")

我愿意接受所有建议... 我的示例输出如下:

Please enter x[0-7]: 4
Please enter y[0-7]: 0
Thread number:2
Thread number:3
Thread number:4
Thread number:5Thread number:4
Thread number:5

Thread number:6Thread number:3Thread number:6Thread number:5Thread number:6
Thread number:7
Thread number:6Thread number:8

Thread number:7

Thread number:8Thread number:7
 Thread number:8



Thread number:4
Thread number:5
Thread number:6Thread number:9Thread number:7Thread number:9
Thread number:10
Thread number:11
Thread number:7
Thread number:8
Thread number:9
Thread number:10
Thread number:11
Thread number:12
Thread number:5Thread number:5
 Thread number:6
Thread number:7
Thread number:8
Thread number:9

Thread number:6
Thread number:7
Thread number:8
Thread number:9

如果似乎由于某种原因线程数停留在 12... 任何帮助都将受到欢迎......

谢谢

【问题讨论】:

图片是你的代码创建的吗? 您为此使用线程是否有原因?如果 Python 程序受 CPU 限制(而不是 IO 限制),它们通常不会从多线程中受益,因为全局解释器锁会阻止它们实际并行运行 Python 代码。 如果您想从多线程中的多个处理器中受益,您可能需要切换到multiprocessing 模块。 @GrijeshChauhan:图片看起来像***上的this one。 @JohnY:我怀疑你是对的,这可能是一个计划——请参阅我刚刚添加的“answer”。 【参考方案1】:

您所谓的Knights Who Say Ni 问题的探索,虽然是对 Python 问题的巧妙改写,但更广为人知的是 Knights Tour 数学问题。鉴于这一点以及您是 math teacher 的事实,我怀疑您的问题可能是愚蠢的差事(又名 snipe hunt),并且您完全了解以下事实:

根据***关于骑士巡回问题的文章section:

5.1 蛮力算法 除了之外,暴力搜索骑士之旅是不切实际的 最小的板;例如,在 8x8 板上大约有 4x1051 个可能的移动序列,远远超出了容量 执行操作的现代计算机(或计算机网络) 在这么大的一组。

正好 3,926,356,053,343,005,839,641,342,729,308,535,057,127,083,875,101,072 根据footnote 链接。

【讨论】:

【参考方案2】:

您当前的代码存在几个问题。

我看到的第一个问题是你的检查器永远不会确定任何潜在的移动是无效的。您在此块中的条件中有一个错误:

if not to_be_removed:
    for index in to_be_removed:
        possible_moves.remove(index)

只有在to_be_removed 为空时循环才会运行。由于循环一个空列表会立即终止,它什么也不做。我认为您希望if 成为if to_be_removed,它会测试其中包含某些内容的列表。但是那个测试不是必须的。您可以始终运行循环,如果列表为空,则让它不执行任何操作。

或者更好的是,根本不要使用to_be_removed 列表,而是使用列表理解直接过滤您的possible_moves

def checker(coordinates, previous_moves):
    possible_moves = [(coordinates[0]+1, coordinates[1]+2),
                      (coordinates[0]+1, coordinates[1]-2),
                      (coordinates[0]-1, coordinates[1]+2),
                      (coordinates[0]-1, coordinates[1]-2),
                      (coordinates[0]+2, coordinates[1]+1),
                      (coordinates[0]+2, coordinates[1]-1),
                      (coordinates[0]-2, coordinates[1]+1),
                      (coordinates[0]-2, coordinates[1]-1)]

    valid_moves = [(x, y) for x, y in possible_moves
                   if 0 <= x <= 7 and 0 <= y <=7 and (x,y) not in previous_moves]

    return valid_moves # always return the list, even if it is empty

我看到的第二个问题是关于您的previously_seen 列表。您的线程都在使用对同一个列表实例的引用,并且当它们对其进行变异时(通过在runner 中调用append),它们将相互混淆它的值。也就是说,在第一个线程运行并启动它的八个子线程之后,它们每个都会看到相同的情况,所有八个点都已经访问过。您可以通过复制列表而不是改变它来解决这个问题(例如previously_seen + [processing])。

第三个问题是您的线程编号系统无法按您希望的方式工作。原因是每个线程用紧跟在它自己编号后面的值来编号它的八个子线程。所以线程 1 产生线程 2-9,但线程 2 产生线程 3-10,重复使用一堆数字。

您可以通过多种方式得出更好的数字,但它们并非完全无关紧要。您可以使用每次启动新线程时递增的全局变量,但这需要锁定同步以确保两个线程不会同时尝试递增它。或者您可以使用某种数学方案使子线程编号唯一(例如线程i 的子线程是i*8 加上0-8 之间的数字),但这可能需要跳过一些线程编号,因为您无法提前知道哪些线程由于无效的动作而不需要。

第四个问题是,即使找到许多解决方案,您的输出代码也只会让您看到数据文件中的最后一个结果。这是因为您使用"w" 模式打开输出文件,这会擦除文件的先前内容。您可能想使用"a"(追加)或"r+"(读写,不截断)。

我的最终建议不是代码中的特定错误,而是更笼统的一点。在这个程序中使用线程似乎没有任何好处。由于Global Interpreter Lock,即使您的 CPU 中有多个内核,线程化的 Python 代码也永远不会同时运行。线程对于 IO 受限的代码是有意义的,但对于像你这样 CPU 受限的代码,它会增加开销和调试难度,而没有任何收益。

简单地在单个线程中递归的更基本的解决方案,或者使用其他策略(如backtracking)来检查所有搜索空间几乎肯定会更好。

【讨论】:

最后的建议也是我想指出的,python 中的线程并不是你想的那样。您使用它的方式实际上会损失很多 (TM) 的 cpu 时间,只需在始终为 ready 的线程之间切换(从不等待 I/O)。您可以改为研究多处理,这就是您可以充分利用(或接近)多核 CPU 的潜力。 所以最好使用以下库:docs.python.org/2/library/multiprocessing.html我说的对吗? 是的,多处理将比线程更有用,但其中任何一个都比仅使用单个线程并且在没有并发的情况下解决问题更难。虽然您可以从使用多个 CPU 内核中受益,但您必须通过同步和通信代码的开销以及在编程时遇到的困难来抵消这一点。例如,将当前代码中的线程创建简单地替换为进程创建可能不是一个好主意,因为当每个进程启动 8 个子进程时,您将自己“分叉炸弹”。 @Blckknght 这正是发生的事情......最后我得到了一个“叉子炸弹”那么我该如何解决这个问题?我真的被难住了…… 好吧,蛮力算法写起来并不难,但它实际上无法解决 8x8 骑士巡回赛,因为要搜索的可能状态太多。您需要更智能的算法,而不仅仅是工作多处理代码。【参考方案3】:

我可以在这里看到两个问题:

1) 您正在使用变量 i 计算线程数。但是 i 从调用线程传递给所有子线程。所以第一个线程会将 1,2,3 传递给前 3 个子线程。但是标记为 1 的子线程随后会将 2、3、4 传递给它的 3 个子线程(原始线程的孙线程)。或者换句话说,你在不同的线程中复制线程号,这是你没有计算超过 12 个的原因之一。你可以通过几种方式解决这个问题 - 最简单的可能是使用在范围之外声明的变量runner 函数并使用一个锁来确保两个线程不会同时修改它:

runnerLock = threading.Lock()
i=0
def runner(previous_moves, coordinates):
global i
if not end_checker(previous_moves):
    process_que = checker(coordinates, previous_moves)
    for processing in process_que:
        previous_moves.append(processing)
        runnerLock.acquire()
        i = i+1
        print "Thread number:"+str(i)
        runnerLock.release()
        start_new_thread(runner, (previous_moves, processing))
else:
    sys.exit()

2)第二个问题是你正在做的跑步者功能:

    previous_moves.append(processing)

在 for 循环中,您希望为当前位置的每个可能移动启动一个新线程。这样做的问题是,如果你有 4 个可能的移动,你想为第一个启动线程将有当前先前的移动加上一个新的附加(这是你想要的)。但是,第二个将具有先前的动作+您启动线程的第一个新动作+您启动线程的第二个新动作。因此,它先前移动的历史现在已被破坏(它有第一种可能是另一个线程正在尝试以及它打算尝试的那个)。第三个是有 2 个额外的可能性,依此类推。这可以通过执行(未经测试)来纠正:

runnerLock = threading.Lock()
i=0
def runner(previous_moves, coordinates):
global i
if not end_checker(previous_moves):
    process_que = checker(coordinates, previous_moves)
    temp_previous_moves = previous_moves.deepcopy()
    for processing in process_que:
        temp_previous_moves.append(processing)
        runnerLock.acquire()
        i = i+1
        print "Thread number:"+str(i)
        runnerLock.release()
        start_new_thread(runner, (temp_previous_moves, processing))
else:
    sys.exit()

这样做还避免了对 previous_moves 数组的锁定需求(在您执行此操作的同时在所有不同线程中对其进行修改)

【讨论】:

【参考方案4】:

我尝试在 Python 中使用 MultiProcessing 做一件非常相似的事情(探索大型组合搜索树)。我实现了某种work stealing algorithm 你可以在this patch 找到我旨在进入Sagemath 的实验结果。然而,我终于意识到 Python 是一种非常糟糕的语言。我强烈建议尝试Cilk++ langage,它是 C++ 的超集。它特别适合这类问题。例如,您可以找到solution of the 8-queens problem。很抱歉,这只是一个链接答案,但在意识到这不是正确的方法之前,我花了很多时间尝试在 Python 中做到这一点。

【讨论】:

【参考方案5】:

请注意,写入文件不是线程安全的。

import thread
import sys

i=1

coor_x = raw_input("Please enter x[0-7]: ")
coor_y = raw_input("Please enter y[0-7]: ")

coordinate = int(coor_x), int(coor_y)



def checker(coordinates, previous_moves):

    possible_moves = [(coordinates[0]+1, coordinates[1]+2), (coordinates[0]+1, coordinates[1]-2), 
                      (coordinates[0]-1, coordinates[1]+2), (coordinates[0]-1, coordinates[1]-2),    
                      (coordinates[0]+2, coordinates[1]+1), (coordinates[0]+2, coordinates[1]-1),    
                      (coordinates[0]-2, coordinates[1]+1), (coordinates[0]-2, coordinates[1]-1)] 

    possible_moves = [(x,y) for x,y in possible_moves if x >= 0 and x < 8 and y >=0 and y < 8]

    possible_moves = [move for move in possible_moves if move not in previous_moves]

    if len(possible_moves) == 0:
        if not end_checker(previous_moves):
            print "This solution is not correct"
    else:
        return possible_moves

def end_checker(previous_moves):
    if len(previous_moves) == 63:
        writer = open("knightstour.txt", "w")
        writer.write(str(previous_moves) + "\n")
        writer.close()
        return True
    else:
        return False


def runner(previous_moves, coordinates, i):
    if not end_checker(previous_moves):
        process_que = checker(coordinates, previous_moves)
        if not process_que:
            thread.exit()
        for processing in process_que:
            previous_moves.append(processing)
            i = i+1
            print "Thread number:"+str(i)
            thread.start_new_thread(runner, (previous_moves, processing, i))
    else:
        sys.exit()



previous_move = []
previous_move.append(coordinate)

runner(previous_move, coordinate, i)
c = raw_input("Type something to exit !")

【讨论】:

【参考方案6】:

这是我想出的一个解决方案,因为我发现这很有趣(在一分钟内没有解决它......所以可能在某个地方有点偏离......使用Depth-First search,但可以轻松更改):

#!/usr/bin/env python
# you should probably be using threading... python docs suggest thread is 
from threading import Thread
import itertools
import time


def is_legal(move):
    x = move[0]
    y = move[1]
    return 8 > x >= 0 and 8 > y >= 0


def get_moves(current, previous_moves):
    possibilities = []
    possibilities.extend(itertools.product([1,-1],[2,-2]))
    possibilities.extend(itertools.product([2, -2],[1,-1]))
    for mx, my in possibilities:
        move_dest = [current[0] + mx, current[1] + my]
        if is_legal(move_dest) and not move_dest in previous_moves:
            yield (move_dest)


def solve_problem(current, previous_moves):
    # add location to previous moves...
    previous_moves.append(current)
    threads = []
    for move in get_moves(current, previous_moves):
        # start a thread for every legal move
        t = Thread(target=solve_problem, args=(list(move), list(previous_moves)))
        threads.extend([t])
        t.start()
        # dfs prevent resource overflow...
        # - makes threads redundant as mentioned in comments
        # t.join()
    if len(previous_moves) % 20 == 0:
        #   print "got up to %d moves !\n" % len(previous_moves)
        pass

    if len(previous_moves) == 64:
        print " solved !\n" % len(previous_moves)
        # check to see if we're done
        t = int(time.time())
        with open('res_%d' % t, 'w') as f:
            f.write("solution: %r" % previous_moves)
            f.close()

#    for t in threads:
#        t.join()


if "__main__" == __name__:
    print "starting..."
    coor_x = int(raw_input("Please enter x[0-7]:"))
    coor_y = int(raw_input("Please enter y[0-7]:"))
    start = [coor_x, coor_y]
    print "using start co-ordinations: %r" % start
    solve_problem(start, [])

Threadinv vs Thread

通过快速重新检查您的代码,可能会尝试确保将内容实际复制到您的子线程中。 Python 默认共享我读过的内存。

【讨论】:

如果你在启动后立即加入每个线程,你还不如根本不使用线程,只做一个递归调用。相同行为的开销要少得多。【参考方案7】:

请查看this 代码,它解决了特定类型的骑士序列巡回赛问题。它不使用多线程方法,但如果您正在寻找速度(并且它不使用线程),它会高度优化(算法上)来模拟骑士序列巡回赛问题。我相信你可以根据你的用例调整它。只需修改 build_keypad 函数以匹配棋盘拓扑并删除元音约束代码。希望对您有所帮助。

__author__ = 'me'
'''
Created on Jun 5, 2012

@author: Iftikhar Khan
'''
REQD_SEQUENCE_LENGTH = 10
VOWEL_LIMIT = 2
VOWELS = [(0, 0), (4, 0), (3, -1), (4, -2)]


def build_keypad():
    """Generates 2-D mesh representation of keypad."""
    keypad = [(x, y) for x in range(5) for y in range(-3, 1)]
    # adjust topology
    keypad.remove((0, -3))
    keypad.remove((4, -3))
    return keypad


def check_position(position):
    """Determines if the transform is valid. That is, not off-keypad."""
    if position == (0, -3) or position == (4, -3):
        return False

    if (-1 < position[0] < 5) and (-4 < position[1] < 1):
        return True
    else:
        return False


def build_map(keypad):
    """Generates a map of all possible Knight moves for all keys."""
    moves = [(1, -2), (1, 2), (2, -1), (2, 1), (-1, -2), (-1, 2), (-2, -1), (-2, 1)]
    keymap = 
    for key_pos in keypad:
        for move in moves:
            x = key_pos[0] + move[0]
            y = key_pos[1] + move[1]
            if check_position((x, y)):
                keymap.setdefault(key_pos, []).append((x, y))
    return keymap


def build_sequence(k, p, m, v, ks):
    """Generates n-key sequence permutations under m-vowel constraints using
        memoization optimisation technique. A valid permutation is a function
        of a key press, position of a key in a sequence and the current
        vowel count. This memoization data is stored as a 3-tuple, (k,p,v), in
        dictionary m.
    """
    if k in VOWELS:
        v += 1
        if v > VOWEL_LIMIT:
            v -= 1
            return 0

    if p == REQD_SEQUENCE_LENGTH:
        m[(k, p, v)] = 0
        return 1
    else:
        if (k, p, v) in m:
            return m[(k, p, v)]
        else:
            m[(k, p, v)] = 0
            for e in ks[k]:
                m[(k, p, v)] += build_sequence(e, p + 1, m, v, ks)

    return m[(k, p, v)]


def count(keys):
    """Counts all n-key permutations under m-vowel constraints."""
    # initialise counters
    sequence_position = 1
    vowel_count = 0
    memo = 

    return sum(build_sequence(key, sequence_position, memo, vowel_count, keys)
               for key in keys)


if __name__ == '__main__':
    print(count(build_map(build_keypad())))

【讨论】:

【参考方案8】:

我是 John Roach 的实习生,他给了我这个作为家庭作业,我无法解决它。我用他的账号问了这个问题。以下是我的回答; 我通过使用称为Warnsdorff's rule 的heuristic 找到了它的解决方案。 但是我找到online 的代码有这样的输出:

boardsize: 5
Start position: c3

19,12,17, 6,21
 2, 7,20,11,16
13,18, 1,22, 5
 8, 3,24,15,10
25,14, 9, 4,23

因此我对其进行了一些更改,而不是使用标准输出,而是使用 P,因为 P 的格式是元组。我创建了一个名为 move 的元组列表并将其返回。

def knights_tour(start, boardsize=boardsize, _debug=False):
    board = (x,y):0 for x in range(boardsize) for y in range(boardsize)
    move = 1
    P = chess2index(start, boardsize)
    moves.append(P)

    board[P] = move
    move += 1
    if _debug:
        print(boardstring(board, boardsize=boardsize))
    while move <= len(board):
        P = min(accessibility(board, P, boardsize))[1]
        moves.append(P)
        board[P] = move
        move += 1
        if _debug:
            print(boardstring(board, boardsize=boardsize))
            input('\n%2i next: ' % move)
    return moves

现在我有了动作列表,我编写了以下程序来创建动画这些动作的 GIF。代码如下;

import sys
import pygame
import knightmove
import os


pygame.init()

square_list = []
line_list = []
i = 0
j = 1


def make_gif():
    os.system("convert   -delay 40   -loop 0   Screenshots/screenshot*.png   knights_tour.gif")

def get_moves(start_move):
    return knightmove.knights_tour(start_move, 8)

def scratch(move):
    move_x, move_y = move
    x = int(move_x) * 50
    y = int(move_y) * 50
    line_list.append([x+25, y+25])
    square_list.append([x, y])
    for index in range(len(square_list)):
        screen.blit(square, square_list[index])

def draw_line():
    for index in range(len(line_list)-1):
        pygame.draw.line(screen, black, (line_list[index]), (line_list[index+1]), 2)

def draw_dot():
    return pygame.draw.circle(screen, red, (line_list[i]), 3, 0)

def screenshot():
    if j <= 9:
        c = "0"+str(j)
    else:
        c = j
    pygame.image.save(screen, "/home/renko/PycharmProjects/pygame_tut1/Screenshots/screenshot"+str(c)+".png")


size = width, height = 400, 400
white = 255, 255, 255
black = 0, 0, 0, 0
red = 255, 0, 0

screen = pygame.display.set_mode(size)
square = pygame.image.load("Untitled.png")

start = raw_input("Enter the start position:")
moves = get_moves(start)


while 1:
    for event in pygame.event.get():
        if event.type == pygame.QUIT:
            sys.exit()
    screen.fill(white)
    pygame.draw.line(screen, black, (0, 50), (400, 50), 3)
    pygame.draw.line(screen, black, (0, 100), (400, 100), 3)
    pygame.draw.line(screen, black, (0, 150), (400, 150), 3)
    pygame.draw.line(screen, black, (0, 200), (400, 200), 3)
    pygame.draw.line(screen, black, (0, 250), (400, 250), 3)
    pygame.draw.line(screen, black, (0, 300), (400, 300), 3)
    pygame.draw.line(screen, black, (0, 350), (400, 350), 3)

    pygame.draw.line(screen, black, (50, 0), (50, 400), 3)
    pygame.draw.line(screen, black, (100, 0), (100, 400), 3)
    pygame.draw.line(screen, black, (150, 0), (150, 400), 3)
    pygame.draw.line(screen, black, (200, 0), (200, 400), 3)
    pygame.draw.line(screen, black, (250, 0), (250, 400), 3)
    pygame.draw.line(screen, black, (300, 0), (300, 400), 3)
    pygame.draw.line(screen, black, (350, 0), (350, 400), 3)



    scratch(moves[i])
    draw_line()
    draw_dot()
    screenshot()
    i += 1
    j += 1
    pygame.display.flip()
    if i == 64:
        make_gif()
        print "GIF was created"
        break

您知道导入的骑士移动库是我使用 rosettacode.org 算法创建的库。

是的...我被派去猎杀... :(

【讨论】:

啊,我明白了,所以你基本上证实了我的大部分怀疑——唯一真正的解决方案不是尝试通过多线程/处理来更快地使用蛮力方法,而是使用更聪明的方法。坦率地说,我认为在这种情况下基本上以不同的用户名接受你自己的答案有点不公平——尤其是,因为它主要包含你在网上和其他不使用您在问题本身中提到的蛮力方法或任何类型的多任务处理。 使用两个账号,然后接受答案,甚至分配赏金??抱歉,这是作弊。

以上是关于模拟骑士序列之旅的主要内容,如果未能解决你的问题,请参考以下文章

骑士之旅中的堆栈实现[关闭]

如何解决具有特殊约束的部分骑士之旅

陷入无限循环(骑士之旅问题)

骑士之旅(回溯)

骑士之旅 - 导致无限循环,我不知道为啥

带回溯的骑士​​之旅