|
|
from navsim.agents.abstract_agent import AbstractAgent |
|
|
from omegaconf import DictConfig |
|
|
import hydra |
|
|
from hydra.utils import instantiate |
|
|
import argparse |
|
|
import os |
|
|
from hugsim.visualize import to_video, save_frame |
|
|
from hugsim.dataparser import parse_raw |
|
|
import torch |
|
|
from hugsim_client import HugsimClient |
|
|
|
|
|
CONFIG_PATH = "navsim/planning/script/config/HUGSIM" |
|
|
CONFIG_NAME = "transfuser" |
|
|
|
|
|
hugsim_client = HugsimClient() |
|
|
hugsim_client.reset_env() |
|
|
|
|
|
def get_opts(): |
|
|
parser = argparse.ArgumentParser() |
|
|
parser.add_argument('--output', type=str, required=True) |
|
|
return parser.parse_args() |
|
|
|
|
|
|
|
|
@hydra.main(config_path=CONFIG_PATH, config_name=CONFIG_NAME, version_base=None) |
|
|
def main(cfg: DictConfig) -> None: |
|
|
cfg.agent.config.latent = True |
|
|
cfg.agent.checkpoint_path = "./ckpts/ltf_seed_0.ckpt" |
|
|
print(cfg) |
|
|
agent: AbstractAgent = instantiate(cfg.agent) |
|
|
agent.initialize() |
|
|
agent = agent.cuda() |
|
|
|
|
|
print('Ready for recieving') |
|
|
cnt = 0 |
|
|
output_folder = os.path.join(cfg.output, 'ltf') |
|
|
os.makedirs(output_folder, exist_ok=True) |
|
|
|
|
|
env_state = hugsim_client.get_current_state() |
|
|
while True: |
|
|
if env_state.done: |
|
|
return |
|
|
|
|
|
data = parse_raw((env_state.state.obs, env_state.state.info)) |
|
|
del data['raw_imgs'] |
|
|
|
|
|
try: |
|
|
with torch.no_grad(): |
|
|
traj = agent.compute_trajectory(data['input']) |
|
|
except RuntimeError as e: |
|
|
traj = None |
|
|
print(e) |
|
|
|
|
|
if traj is None: |
|
|
print('Waiting for visualize tasks...') |
|
|
return |
|
|
|
|
|
|
|
|
imu_way_points = traj.poses[:, :3] |
|
|
way_points = imu_way_points[:, [1, 0, 2]] |
|
|
way_points[:, 0] *= -1 |
|
|
|
|
|
env_state = hugsim_client.execute_action(way_points[:, :2]) |
|
|
print('sent') |
|
|
cnt += 1 |
|
|
if env_state.cur_scene_done: |
|
|
print('Scene done') |
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
|
|
|
main() |