Spaces:
Sleeping
Sleeping
| import numpy as np | |
| import torch | |
| import json | |
| import os | |
| import logging | |
| from flask import Flask, request, jsonify | |
| from flask_cors import CORS | |
| import time | |
| # 导入你的游戏和 AI 模块 | |
| from game import OthelloGame # | |
| from alphazero import NNetWrapper, MCTS, dotdict # | |
| # 配置日志 | |
| logging.basicConfig(level=logging.INFO) | |
| log = logging.getLogger(__name__) | |
| app = Flask(__name__) | |
| # 启用 CORS,允许前端(通常在不同端口)访问 | |
| CORS(app) | |
| # --- 全局状态和 AI 初始化 --- | |
| # 默认参数 (与 alphazero.py 中的 args 保持一致) | |
| args = dotdict({ | |
| 'lr': 0.001, | |
| 'dropout': 0.1, | |
| 'epochs': 10, | |
| 'batch_size': 64, | |
| 'cuda': torch.cuda.is_available(), | |
| 'num_channels': 512, | |
| 'numIters': 200, | |
| 'numEps': 100, | |
| 'tempThreshold': 15, | |
| 'updateThreshold': 0.6, | |
| 'maxlenOfQueue': 200000, | |
| 'numItersForTrainExamplesHistory': 20, | |
| 'numMCTSSims': 25, # 训练时的 MCTS 模拟次数 | |
| 'arenaCompare': 40, | |
| 'cpuct': 1, | |
| 'checkpoint': './temp/', | |
| 'load_model': True, | |
| 'load_folder_file': ('./temp/','best.pth.tar'), | |
| 'board_size': 8 # 默认 8x8 | |
| }) | |
| # 游戏和 AI 实例 | |
| game = None | |
| nnet = None | |
| mcts = None | |
| # 游戏状态 | |
| current_board = None | |
| current_player = 1 # 1: Human (White), -1: AI (Black) | |
| last_move_coords = None | |
| board_size = 8 | |
| history_stack = [] | |
| first_player_game = 1 | |
| # 【新增】辅助函数:保存当前状态到历史栈 | |
| def save_state(): | |
| """保存当前棋盘的副本和当前轮到的玩家到历史栈。""" | |
| global current_board, current_player, history_stack | |
| # 保存当前棋盘的副本和当前轮到的玩家 | |
| # 注意:这里保存的是未翻转的内部游戏状态 | |
| history_stack.append((np.copy(current_board), current_player)) | |
| def init_game_and_ai(n): | |
| """根据板子大小初始化游戏和 AI 模块""" | |
| global game, nnet, mcts, board_size, history_stack, current_board | |
| board_size = n | |
| log.info(f"Initializing game and AI for {n}x{n} board.") | |
| game = OthelloGame(n) # | |
| # 重新配置 MCTS 参数用于 Play 模式 | |
| play_args = dotdict({ | |
| 'numMCTSSims': 200, # 对战时使用更多的模拟次数 | |
| 'cpuct': 1.0, | |
| 'cuda': args.cuda # 继承 CUDA 设置 | |
| }) | |
| nnet = NNetWrapper(game, args) # | |
| # 假设你的模型文件已保存到 './checkpoint/best.pth.tar' | |
| try: | |
| load_folder = args.load_folder_file[0] | |
| load_file = args.load_folder_file[1] | |
| nnet.load_checkpoint(folder=load_folder, filename=load_file) | |
| log.info(f"Successfully loaded model from {load_folder}{load_file}") | |
| except ValueError as e: | |
| log.error(f"Failed to load model: {e}. AI will likely perform poorly.") | |
| mcts = MCTS(game, nnet, play_args) # | |
| # 【新增】清空历史栈并保存初始状态 | |
| history_stack = [] | |
| # 获取初始棋盘并保存 | |
| current_board = game.getInitBoard() | |
| save_state() | |
| def get_api_moves(board, player): | |
| """将 getValidMoves 结果从向量转换为 {x, y} 列表""" | |
| if game is None: return [] | |
| valids = game.getValidMoves(board, player) # | |
| moves_list = [] | |
| # 排除最后一个动作(Pass动作) | |
| for i in range(len(valids) - 1): # | |
| if valids[i] == 1: | |
| x = i // game.n | |
| y = i % game.n | |
| moves_list.append({'x': int(x), 'y': int(y)}) | |
| return moves_list | |
| def check_game_end(board, player): | |
| """检查游戏是否结束,并返回状态信息,基于绝对的棋子数量差异。""" | |
| result = game.getGameEnded(board, player) # | |
| status = 'Ongoing' | |
| score_diff = 0 | |
| if result is not None: | |
| white_count = np.sum(board == 1) | |
| black_count = np.sum(board == -1) | |
| score_diff = int(white_count - black_count) | |
| if result == 0 or score_diff == 0: | |
| status = f"Game Over: Draw. Score: {white_count} vs {black_count}" | |
| elif score_diff > 0: | |
| status = f"Game Over: Human (O) Wins! Score: {white_count} vs {black_count}" | |
| elif score_diff < 0: | |
| status = f"Game Over: AI (X) Wins! Score: {white_count} vs {black_count}" | |
| else: | |
| status = f"Game Over: Draw. Score: {white_count} vs {black_count}" | |
| return status | |
| def new_game(): | |
| global current_board, current_player, last_move_coords, board_size, history_stack, first_player_game | |
| data = request.json | |
| size = data.get('size', 8) | |
| first_player = data.get('first_player', 1) | |
| first_player_game = first_player | |
| # 1. 初始化游戏和 AI | |
| # 只有在尺寸变化时才重新初始化 AI,否则只重置游戏 | |
| if game is None or size != board_size: | |
| init_game_and_ai(size) | |
| # 如果尺寸不变,只重置历史栈和棋盘 | |
| history_stack = [] | |
| current_board = game.getInitBoard() | |
| save_state() # 保存初始状态 (栈长度 = 1) | |
| current_player = first_player | |
| last_move_coords = None | |
| # 2. 处理 AI 先手逻辑 | |
| if current_player == -1: | |
| # 【修复】删除 history_stack.pop(),因为初始状态 S_init 必须保留。 | |
| # S_init 已经是 current_player = 1 的状态,我们只需要立即触发 AI 移动。 | |
| # AI 移动逻辑 (ai_move_logic) 会执行 AI 动作并保存 S_AI_1 状态。 | |
| history_stack = [] | |
| # 立即触发 AI 移动 | |
| current_board = np.flip(current_board, 0) | |
| status = check_game_end(current_board, current_player) | |
| if status == 'Ongoing': | |
| # is_init_move=True 确保 AI 逻辑中不再重复 save_state() | |
| # 因为 S_init 已经被保存,AI 下完后保存 S_AI_1 | |
| return ai_move_logic(is_init_move=False) # 【修正】这里应该是 False,让 ai_move_logic 保存 S_AI_1 | |
| # 3. 如果是 Human 先手或 AI 先手但游戏结束,则返回当前状态 (S_init) | |
| status = check_game_end(current_board, current_player) | |
| current_board = np.flip(current_board, 0) | |
| # 确保返回给前端的棋盘是垂直翻转的,以匹配前端的坐标系 | |
| return jsonify({ | |
| 'board': current_board.tolist(), | |
| 'legal_moves': get_api_moves(current_board, current_player), | |
| 'current_player': current_player, | |
| 'last_move': last_move_coords, | |
| 'status': status, | |
| 'history_length': len(history_stack) # 返回历史记录长度 | |
| }) | |
| def human_move(): | |
| """处理人类玩家移动,并保存状态,返回给 AI 的中间状态""" | |
| global current_board, current_player, last_move_coords | |
| if current_player != 1 or check_game_end(current_board, current_player) != 'Ongoing': | |
| return jsonify({'error': 'Not your turn or game is over', 'history_length': len(history_stack)}), 400 | |
| data = request.json | |
| x = data.get('x') | |
| y = data.get('y') | |
| if x is None or y is None: | |
| # 检查是否是 Pass 动作 | |
| if data.get('action') == 'pass': | |
| action = game.n * game.n # Pass action is the last index | |
| else: | |
| return jsonify({'error': 'Invalid move coordinates', 'history_length': len(history_stack)}), 400 | |
| else: | |
| action = game.n * x + y | |
| valids = game.getValidMoves(current_board, 1) | |
| if valids[action] == 0: | |
| return jsonify({'error': 'Illegal move', 'history_length': len(history_stack)}), 400 | |
| # 1. 执行人类移动 | |
| current_board, current_player = game.getNextState(current_board, 1, action) | |
| # 2. 【核心修改】保存人类移动后的状态 (State S_H: 轮到 AI 移动) | |
| save_state() | |
| if action != game.n * game.n: | |
| last_move_coords = {'x': x, 'y': y} | |
| else: | |
| last_move_coords = None # Human Pass | |
| status = check_game_end(current_board, current_player) | |
| # current_board = np.flip(current_board, 0) | |
| # 确保返回给前端的棋盘是垂直翻转的,以匹配前端的坐标系 | |
| return jsonify({ | |
| 'board': current_board.tolist(), | |
| 'legal_moves': get_api_moves(current_board, current_player), | |
| 'current_player': current_player, | |
| 'last_move': last_move_coords, | |
| 'status': status, | |
| 'history_length': len(history_stack) # 【新增】返回历史记录长度 | |
| }) | |
| def ai_move_logic(is_init_move=False): | |
| """AI 移动的逻辑封装,在 new_game 中调用""" | |
| global current_board, current_player, last_move_coords | |
| canonical_board = game.getCanonicalForm(current_board, -1) # | |
| # 获取 AI 的最佳动作 (temp=0) | |
| ai_action = np.argmax(mcts.getActionProb(canonical_board, temp=0)) # | |
| # 更新游戏状态 | |
| current_board, next_player = game.getNextState(current_board, -1, ai_action) # | |
| current_player = next_player | |
| # 记录 AI 的移动坐标 | |
| if ai_action != game.n * game.n: # 如果不是 Pass 动作 | |
| ai_x = ai_action // game.n | |
| ai_y = ai_action % game.n | |
| last_move_coords = {'x': int(ai_x), 'y': int(ai_y)} | |
| else: | |
| last_move_coords = None # AI Pass | |
| status = check_game_end(current_board, current_player) | |
| # 【核心修改】保存 AI 移动后的状态 (State S_A: 轮到 Human 移动) | |
| # is_init_move 标记不再用于控制 save_state,因为 new_game 中只需要 AI 正常执行并保存状态 | |
| save_state() | |
| # 确保返回给前端的棋盘是垂直翻转的,以匹配前端的坐标系 | |
| return jsonify({ | |
| 'board': current_board.tolist(), | |
| 'legal_moves': get_api_moves(current_board, current_player), | |
| 'current_player': current_player, | |
| 'last_move': last_move_coords, | |
| 'status': status, | |
| 'history_length': len(history_stack) # 【新增】返回历史记录长度 | |
| }) | |
| # B. 新增 `ai_move` 路由 | |
| def ai_move(): | |
| start_time = time.time() | |
| """触发 AI 移动,并返回最终状态""" | |
| global current_board, current_player, last_move_coords | |
| if current_player != -1: | |
| return jsonify({'error': 'Not AI turn', 'history_length': len(history_stack)}), 400 | |
| response = ai_move_logic(is_init_move=False) | |
| # 控制 AI 最少思考时间为 0.5 秒 | |
| end_time = time.time() | |
| used_time = end_time - start_time | |
| if used_time < 0.5: | |
| time.sleep(0.5 - used_time) # 确保至少等待0.5秒 | |
| return response | |
| # app.py (新增路由) | |
| def undo_move(): | |
| """执行悔棋操作:回退到历史栈中的前一个人类落子完成前的状态(即撤销 Human move + AI move)。""" | |
| global current_board, current_player, last_move_coords, history_stack | |
| # 栈长度至少需要为 3 才能安全地回退一个完整的 (Human + AI) 步骤 | |
| # 3 = S_init + S_Human_move + S_AI_move | |
| if len(history_stack) < 2: | |
| # 【修正】如果只有 S_init (长度=1) 或 S_AI_1 (长度=2, 发生在AI先手的第一步),则不能再悔棋了 | |
| return jsonify({ | |
| 'error': 'Cannot undo further. Only initial state remains or insufficient moves made.', | |
| 'history_length': len(history_stack) | |
| }), 400 | |
| # 场景 1: S_init -> S_AI_1 (AI 先手的第一步,长度为 2) | |
| # 此时只需要 pop() 一次,回到 S_init | |
| if len(history_stack) == 2: | |
| # 弹出 S_AI_1 状态 | |
| history_stack.pop() | |
| # 场景 2: S_init -> S_H1 -> S_AI_1 (长度 >= 3) | |
| # 此时需要 pop() 两次,回到 S_H1 之前,即 S_init 或 S_AI_last | |
| elif len(history_stack) >= 3: | |
| # 1. 弹出 S_AI 状态 (AI move done, Human turn) | |
| history_stack.pop() | |
| # 2. 弹出 S_Human 状态 (Human move done, AI turn) | |
| history_stack.pop() | |
| # 3. 恢复到栈顶状态 | |
| current_board_restored, current_player_restored = history_stack[-1] | |
| if len(history_stack) == 1 and first_player_game == 1: | |
| current_board_restored = np.flip(current_board_restored, 0) | |
| # 恢复状态 | |
| current_board = np.copy(current_board_restored) | |
| current_player = current_player_restored | |
| # 重置 last_move | |
| last_move_coords = None | |
| status = check_game_end(current_board, current_player) | |
| # 确保返回给前端的棋盘是垂直翻转的,以匹配前端的坐标系 | |
| return jsonify({ | |
| 'board': current_board.tolist(), | |
| 'legal_moves': get_api_moves(current_board, current_player), | |
| 'current_player': current_player, | |
| 'last_move': last_move_coords, | |
| 'status': status, | |
| 'history_length': len(history_stack) # 【新增】返回新的历史记录长度 | |
| }) | |
| if __name__ == '__main__': | |
| # 初始化一个默认的 8x8 游戏实例 | |
| init_game_and_ai(8) | |
| log.info("Starting Flask server on port 7860...") | |
| port = int(os.environ.get('PORT', 7860)) | |
| # ... (日志) ... | |
| app.run(host='0.0.0.0', port=port) | |