File size: 12,990 Bytes
0819f4e 8039f1f 0819f4e 8039f1f 0819f4e 8039f1f 0819f4e 8039f1f 0819f4e 8039f1f 0819f4e 8039f1f 0819f4e 8039f1f 0819f4e 8039f1f 0819f4e 8039f1f 0819f4e 8039f1f 0819f4e 8039f1f 0819f4e 8039f1f 0819f4e 8039f1f 0819f4e 8039f1f 0819f4e 8039f1f 0819f4e 8039f1f 0819f4e 8039f1f 0819f4e 8039f1f 0819f4e 8039f1f 0819f4e 8039f1f 0819f4e 8039f1f 0819f4e 8039f1f 0819f4e 8039f1f 0819f4e 8039f1f 0819f4e 8039f1f 0819f4e 8039f1f 0819f4e 8039f1f 0819f4e 8039f1f 0819f4e 8039f1f 0819f4e 8039f1f 0819f4e 8039f1f 0819f4e 8039f1f 0819f4e 8039f1f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 |
import numpy as np
import torch
import json
import os
import logging
from flask import Flask, request, jsonify
from flask_cors import CORS
import time
# 导入你的游戏和 AI 模块
from game import OthelloGame #
from alphazero import NNetWrapper, MCTS, dotdict #
# 配置日志
logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)
app = Flask(__name__)
# 启用 CORS,允许前端(通常在不同端口)访问
CORS(app)
# --- 全局状态和 AI 初始化 ---
# 默认参数 (与 alphazero.py 中的 args 保持一致)
args = dotdict({
'lr': 0.001,
'dropout': 0.1,
'epochs': 10,
'batch_size': 64,
'cuda': torch.cuda.is_available(),
'num_channels': 512,
'numIters': 200,
'numEps': 100,
'tempThreshold': 15,
'updateThreshold': 0.6,
'maxlenOfQueue': 200000,
'numItersForTrainExamplesHistory': 20,
'numMCTSSims': 25, # 训练时的 MCTS 模拟次数
'arenaCompare': 40,
'cpuct': 1,
'checkpoint': './temp/',
'load_model': True,
'load_folder_file': ('./temp/','best.pth.tar'),
'board_size': 8 # 默认 8x8
})
# 游戏和 AI 实例
game = None
nnet = None
mcts = None
# 游戏状态
current_board = None
current_player = 1 # 1: Human (White), -1: AI (Black)
last_move_coords = None
board_size = 8
history_stack = []
first_player_game = 1
# 【新增】辅助函数:保存当前状态到历史栈
def save_state():
"""保存当前棋盘的副本和当前轮到的玩家到历史栈。"""
global current_board, current_player, history_stack
# 保存当前棋盘的副本和当前轮到的玩家
# 注意:这里保存的是未翻转的内部游戏状态
history_stack.append((np.copy(current_board), current_player))
def init_game_and_ai(n):
"""根据板子大小初始化游戏和 AI 模块"""
global game, nnet, mcts, board_size, history_stack, current_board
board_size = n
log.info(f"Initializing game and AI for {n}x{n} board.")
game = OthelloGame(n) #
# 重新配置 MCTS 参数用于 Play 模式
play_args = dotdict({
'numMCTSSims': 200, # 对战时使用更多的模拟次数
'cpuct': 1.0,
'cuda': args.cuda # 继承 CUDA 设置
})
nnet = NNetWrapper(game, args) #
# 假设你的模型文件已保存到 './checkpoint/best.pth.tar'
try:
load_folder = args.load_folder_file[0]
load_file = args.load_folder_file[1]
nnet.load_checkpoint(folder=load_folder, filename=load_file)
log.info(f"Successfully loaded model from {load_folder}{load_file}")
except ValueError as e:
log.error(f"Failed to load model: {e}. AI will likely perform poorly.")
mcts = MCTS(game, nnet, play_args) #
# 【新增】清空历史栈并保存初始状态
history_stack = []
# 获取初始棋盘并保存
current_board = game.getInitBoard()
save_state()
def get_api_moves(board, player):
"""将 getValidMoves 结果从向量转换为 {x, y} 列表"""
if game is None: return []
valids = game.getValidMoves(board, player) #
moves_list = []
# 排除最后一个动作(Pass动作)
for i in range(len(valids) - 1): #
if valids[i] == 1:
x = i // game.n
y = i % game.n
moves_list.append({'x': int(x), 'y': int(y)})
return moves_list
def check_game_end(board, player):
"""检查游戏是否结束,并返回状态信息,基于绝对的棋子数量差异。"""
result = game.getGameEnded(board, player) #
status = 'Ongoing'
score_diff = 0
if result is not None:
white_count = np.sum(board == 1)
black_count = np.sum(board == -1)
score_diff = int(white_count - black_count)
if result == 0 or score_diff == 0:
status = f"Game Over: Draw. Score: {white_count} vs {black_count}"
elif score_diff > 0:
status = f"Game Over: Human (O) Wins! Score: {white_count} vs {black_count}"
elif score_diff < 0:
status = f"Game Over: AI (X) Wins! Score: {white_count} vs {black_count}"
else:
status = f"Game Over: Draw. Score: {white_count} vs {black_count}"
return status
@app.route('/api/game/new', methods=['POST'])
def new_game():
global current_board, current_player, last_move_coords, board_size, history_stack, first_player_game
data = request.json
size = data.get('size', 8)
first_player = data.get('first_player', 1)
first_player_game = first_player
# 1. 初始化游戏和 AI
# 只有在尺寸变化时才重新初始化 AI,否则只重置游戏
if game is None or size != board_size:
init_game_and_ai(size)
# 如果尺寸不变,只重置历史栈和棋盘
history_stack = []
current_board = game.getInitBoard()
save_state() # 保存初始状态 (栈长度 = 1)
current_player = first_player
last_move_coords = None
# 2. 处理 AI 先手逻辑
if current_player == -1:
# 【修复】删除 history_stack.pop(),因为初始状态 S_init 必须保留。
# S_init 已经是 current_player = 1 的状态,我们只需要立即触发 AI 移动。
# AI 移动逻辑 (ai_move_logic) 会执行 AI 动作并保存 S_AI_1 状态。
history_stack = []
# 立即触发 AI 移动
current_board = np.flip(current_board, 0)
status = check_game_end(current_board, current_player)
if status == 'Ongoing':
# is_init_move=True 确保 AI 逻辑中不再重复 save_state()
# 因为 S_init 已经被保存,AI 下完后保存 S_AI_1
return ai_move_logic(is_init_move=False) # 【修正】这里应该是 False,让 ai_move_logic 保存 S_AI_1
# 3. 如果是 Human 先手或 AI 先手但游戏结束,则返回当前状态 (S_init)
status = check_game_end(current_board, current_player)
current_board = np.flip(current_board, 0)
# 确保返回给前端的棋盘是垂直翻转的,以匹配前端的坐标系
return jsonify({
'board': current_board.tolist(),
'legal_moves': get_api_moves(current_board, current_player),
'current_player': current_player,
'last_move': last_move_coords,
'status': status,
'history_length': len(history_stack) # 返回历史记录长度
})
@app.route('/api/game/human_move', methods=['POST'])
def human_move():
"""处理人类玩家移动,并保存状态,返回给 AI 的中间状态"""
global current_board, current_player, last_move_coords
if current_player != 1 or check_game_end(current_board, current_player) != 'Ongoing':
return jsonify({'error': 'Not your turn or game is over', 'history_length': len(history_stack)}), 400
data = request.json
x = data.get('x')
y = data.get('y')
if x is None or y is None:
# 检查是否是 Pass 动作
if data.get('action') == 'pass':
action = game.n * game.n # Pass action is the last index
else:
return jsonify({'error': 'Invalid move coordinates', 'history_length': len(history_stack)}), 400
else:
action = game.n * x + y
valids = game.getValidMoves(current_board, 1)
if valids[action] == 0:
return jsonify({'error': 'Illegal move', 'history_length': len(history_stack)}), 400
# 1. 执行人类移动
current_board, current_player = game.getNextState(current_board, 1, action)
# 2. 【核心修改】保存人类移动后的状态 (State S_H: 轮到 AI 移动)
save_state()
if action != game.n * game.n:
last_move_coords = {'x': x, 'y': y}
else:
last_move_coords = None # Human Pass
status = check_game_end(current_board, current_player)
# current_board = np.flip(current_board, 0)
# 确保返回给前端的棋盘是垂直翻转的,以匹配前端的坐标系
return jsonify({
'board': current_board.tolist(),
'legal_moves': get_api_moves(current_board, current_player),
'current_player': current_player,
'last_move': last_move_coords,
'status': status,
'history_length': len(history_stack) # 【新增】返回历史记录长度
})
def ai_move_logic(is_init_move=False):
"""AI 移动的逻辑封装,在 new_game 中调用"""
global current_board, current_player, last_move_coords
canonical_board = game.getCanonicalForm(current_board, -1) #
# 获取 AI 的最佳动作 (temp=0)
ai_action = np.argmax(mcts.getActionProb(canonical_board, temp=0)) #
# 更新游戏状态
current_board, next_player = game.getNextState(current_board, -1, ai_action) #
current_player = next_player
# 记录 AI 的移动坐标
if ai_action != game.n * game.n: # 如果不是 Pass 动作
ai_x = ai_action // game.n
ai_y = ai_action % game.n
last_move_coords = {'x': int(ai_x), 'y': int(ai_y)}
else:
last_move_coords = None # AI Pass
status = check_game_end(current_board, current_player)
# 【核心修改】保存 AI 移动后的状态 (State S_A: 轮到 Human 移动)
# is_init_move 标记不再用于控制 save_state,因为 new_game 中只需要 AI 正常执行并保存状态
save_state()
# 确保返回给前端的棋盘是垂直翻转的,以匹配前端的坐标系
return jsonify({
'board': current_board.tolist(),
'legal_moves': get_api_moves(current_board, current_player),
'current_player': current_player,
'last_move': last_move_coords,
'status': status,
'history_length': len(history_stack) # 【新增】返回历史记录长度
})
# B. 新增 `ai_move` 路由
@app.route('/api/game/ai_move', methods=['POST'])
def ai_move():
start_time = time.time()
"""触发 AI 移动,并返回最终状态"""
global current_board, current_player, last_move_coords
if current_player != -1:
return jsonify({'error': 'Not AI turn', 'history_length': len(history_stack)}), 400
response = ai_move_logic(is_init_move=False)
# 控制 AI 最少思考时间为 0.5 秒
end_time = time.time()
used_time = end_time - start_time
if used_time < 0.5:
time.sleep(0.5 - used_time) # 确保至少等待0.5秒
return response
# app.py (新增路由)
@app.route('/api/game/undo_move', methods=['POST'])
def undo_move():
"""执行悔棋操作:回退到历史栈中的前一个人类落子完成前的状态(即撤销 Human move + AI move)。"""
global current_board, current_player, last_move_coords, history_stack
# 栈长度至少需要为 3 才能安全地回退一个完整的 (Human + AI) 步骤
# 3 = S_init + S_Human_move + S_AI_move
if len(history_stack) < 2:
# 【修正】如果只有 S_init (长度=1) 或 S_AI_1 (长度=2, 发生在AI先手的第一步),则不能再悔棋了
return jsonify({
'error': 'Cannot undo further. Only initial state remains or insufficient moves made.',
'history_length': len(history_stack)
}), 400
# 场景 1: S_init -> S_AI_1 (AI 先手的第一步,长度为 2)
# 此时只需要 pop() 一次,回到 S_init
if len(history_stack) == 2:
# 弹出 S_AI_1 状态
history_stack.pop()
# 场景 2: S_init -> S_H1 -> S_AI_1 (长度 >= 3)
# 此时需要 pop() 两次,回到 S_H1 之前,即 S_init 或 S_AI_last
elif len(history_stack) >= 3:
# 1. 弹出 S_AI 状态 (AI move done, Human turn)
history_stack.pop()
# 2. 弹出 S_Human 状态 (Human move done, AI turn)
history_stack.pop()
# 3. 恢复到栈顶状态
current_board_restored, current_player_restored = history_stack[-1]
if len(history_stack) == 1 and first_player_game == 1:
current_board_restored = np.flip(current_board_restored, 0)
# 恢复状态
current_board = np.copy(current_board_restored)
current_player = current_player_restored
# 重置 last_move
last_move_coords = None
status = check_game_end(current_board, current_player)
# 确保返回给前端的棋盘是垂直翻转的,以匹配前端的坐标系
return jsonify({
'board': current_board.tolist(),
'legal_moves': get_api_moves(current_board, current_player),
'current_player': current_player,
'last_move': last_move_coords,
'status': status,
'history_length': len(history_stack) # 【新增】返回新的历史记录长度
})
if __name__ == '__main__':
# 初始化一个默认的 8x8 游戏实例
init_game_and_ai(8)
log.info("Starting Flask server on port 7860...")
port = int(os.environ.get('PORT', 7860))
# ... (日志) ...
app.run(host='0.0.0.0', port=port)
|