Spaces:
Runtime error
Runtime error
File size: 6,178 Bytes
7d5289a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 |
"""
core/runner_env.py
Minimal HTTP-based environment client.
- Talks to a single env worker exposing: POST /reset, POST /step
Future hooks (commented below) for:
- episode_id, seed on reset
- request_id on step
- custom headers (auth/trace)
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import Any, Dict, Generic, Optional, Type, TYPE_CHECKING, TypeVar
import requests
from .client_types import StepResult
from .containers.runtime import LocalDockerProvider
if TYPE_CHECKING:
from .containers.runtime import ContainerProvider
ActT = TypeVar("ActT")
ObsT = TypeVar("ObsT")
EnvClientT = TypeVar("EnvClientT", bound="HTTPEnvClient")
class HTTPEnvClient(ABC, Generic[ActT, ObsT]):
def __init__(
self,
base_url: str,
request_timeout_s: float = 15.0,
default_headers: Optional[Dict[str, str]] = None,
provider: Optional["ContainerProvider"] = None,
):
self._base = base_url.rstrip("/")
self._timeout = float(request_timeout_s)
self._http = requests.Session()
self._headers = default_headers or {}
self._provider = provider
@classmethod
def from_docker_image(
cls: Type[EnvClientT],
image: str,
provider: Optional["ContainerProvider"] = None,
**kwargs: Any,
) -> EnvClientT:
"""
Create an environment client by spinning up a Docker container locally.
This is a development utility that:
1. Starts a Docker container from the specified image
2. Waits for the server to be ready
3. Creates and returns a client instance connected to the container
Note: The container lifecycle management is left to the user or higher-level
orchestration. The container will keep running until manually stopped.
Args:
image: Docker image name to run (e.g., "echo-env:latest")
provider: Container provider to use (defaults to LocalDockerProvider)
**kwargs: Additional arguments to pass to provider.start_container()
(e.g., env_vars, port)
Returns:
An instance of the client class connected to the running container
Example:
>>> from envs.coding_env.client import CodingEnv
>>> from envs.coding_env.models import CodeAction
>>>
>>> # Create environment from image
>>> env = CodingEnv.from_docker_image("coding-env:latest")
>>>
>>> # Create environment with custom env vars
>>> env = CodingEnv.from_docker_image(
... "coding-env:latest",
... env_vars={"MY_VAR": "value"}
... )
>>>
>>> # Use the environment
>>> result = env.reset()
>>> print(result.observation)
>>>
>>> step_result = env.step(CodeAction(code="print('hello')"))
>>> print(step_result.observation.stdout)
>>>
>>> # Cleanup (optional)
>>> env.close()
"""
# Use default provider if none provided
if provider is None:
provider = LocalDockerProvider()
# 1. Start container with optional kwargs (e.g., env_vars, port)
base_url = provider.start_container(image, **kwargs)
# 2. Wait for server to be ready
provider.wait_for_ready(base_url)
# 3. Create and return client instance with provider reference
return cls(base_url=base_url, provider=provider)
@abstractmethod
def _step_payload(self, action: ActT) -> dict:
"""Convert an Action object to the JSON body expected by the env server."""
raise NotImplementedError
@abstractmethod
def _parse_result(self, payload: dict) -> StepResult[ObsT]:
"""Convert a JSON response from the env server to StepResult[ObsT]."""
raise NotImplementedError
@abstractmethod
def _parse_state(self, payload: dict) -> Any:
"""Convert a JSON response from the state endpoint to a State object."""
raise NotImplementedError
# ---------- Environment Server Interface Methods ----------
def reset(self) -> StepResult[ObsT]:
body: Dict[str, Any] = {}
# TODO: later:
# body["seed"] = seed
# body["episode_id"] = episode_id
r = self._http.post(
f"{self._base}/reset",
json=body,
headers=self._headers,
timeout=self._timeout,
)
r.raise_for_status()
return self._parse_result(r.json())
def step(self, action: ActT) -> StepResult[ObsT]:
body: Dict[str, Any] = {
"action": self._step_payload(action),
"timeout_s": int(self._timeout),
}
# TODO: later:
# body["request_id"] = str(uuid.uuid4())
# body["episode_id"] = current_episode_id
r = self._http.post(
f"{self._base}/step",
json=body,
headers=self._headers,
timeout=self._timeout,
)
r.raise_for_status()
return self._parse_result(r.json())
def state(self) -> Any:
"""
Get the current environment state from the server.
Returns:
State object with environment state information (e.g., episode_id, step_count)
Example:
>>> client = EchoEnv.from_docker_image("echo-env:latest")
>>> result = client.reset()
>>> state = client.state()
>>> print(state.episode_id)
>>> print(state.step_count)
"""
r = self._http.get(
f"{self._base}/state",
headers=self._headers,
timeout=self._timeout,
)
r.raise_for_status()
return self._parse_state(r.json())
def close(self) -> None:
"""
Close the environment and clean up resources.
If this client was created via from_docker_image(), this will stop
and remove the associated container.
"""
if self._provider is not None:
self._provider.stop_container()
|