Upload operators.py with huggingface_hub
Browse files- operators.py +17 -8
operators.py
CHANGED
|
@@ -250,15 +250,28 @@ class Apply(StreamInstanceOperator):
|
|
| 250 |
to_field: str = NonPositionalField(required=True)
|
| 251 |
|
| 252 |
def function_to_str(self, function: Callable) -> str:
|
| 253 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 254 |
|
| 255 |
def str_to_function(self, function_str: str) -> Callable:
|
| 256 |
splitted = function_str.split(".", 1)
|
| 257 |
if len(splitted) == 1:
|
| 258 |
-
return
|
| 259 |
else:
|
| 260 |
module_name, function_name = splitted
|
| 261 |
-
if module_name in
|
|
|
|
|
|
|
| 262 |
obj = globals()[module_name]
|
| 263 |
else:
|
| 264 |
obj = importlib.import_module(module_name)
|
|
@@ -270,11 +283,7 @@ class Apply(StreamInstanceOperator):
|
|
| 270 |
super().prepare()
|
| 271 |
if isinstance(self.function, str):
|
| 272 |
self.function = self.str_to_function(self.function)
|
| 273 |
-
|
| 274 |
-
def get_init_dict(self):
|
| 275 |
-
result = super().get_init_dict()
|
| 276 |
-
result["function"] = self.function_to_str(self.function)
|
| 277 |
-
return result
|
| 278 |
|
| 279 |
def process(self, instance: Dict[str, Any], stream_name: str = None) -> Dict[str, Any]:
|
| 280 |
argv = [instance[arg] for arg in self._argv]
|
|
|
|
| 250 |
to_field: str = NonPositionalField(required=True)
|
| 251 |
|
| 252 |
def function_to_str(self, function: Callable) -> str:
|
| 253 |
+
parts = []
|
| 254 |
+
|
| 255 |
+
if hasattr(function, "__module__"):
|
| 256 |
+
parts.append(function.__module__)
|
| 257 |
+
if hasattr(function, "__qualname__"):
|
| 258 |
+
parts.append(function.__qualname__)
|
| 259 |
+
else:
|
| 260 |
+
parts.append(function.__name__)
|
| 261 |
+
|
| 262 |
+
result = ".".join(parts)
|
| 263 |
+
|
| 264 |
+
return result
|
| 265 |
|
| 266 |
def str_to_function(self, function_str: str) -> Callable:
|
| 267 |
splitted = function_str.split(".", 1)
|
| 268 |
if len(splitted) == 1:
|
| 269 |
+
return __builtins__[module_name]
|
| 270 |
else:
|
| 271 |
module_name, function_name = splitted
|
| 272 |
+
if module_name in __builtins__:
|
| 273 |
+
obj = __builtins__[module_name]
|
| 274 |
+
elif module_name in globals():
|
| 275 |
obj = globals()[module_name]
|
| 276 |
else:
|
| 277 |
obj = importlib.import_module(module_name)
|
|
|
|
| 283 |
super().prepare()
|
| 284 |
if isinstance(self.function, str):
|
| 285 |
self.function = self.str_to_function(self.function)
|
| 286 |
+
self._init_dict["function"] = self.function_to_str(self.function)
|
|
|
|
|
|
|
|
|
|
|
|
|
| 287 |
|
| 288 |
def process(self, instance: Dict[str, Any], stream_name: str = None) -> Dict[str, Any]:
|
| 289 |
argv = [instance[arg] for arg in self._argv]
|