import abc
import asyncio
import logging
from copy import deepcopy
from graph.data.key_value_store import KeyValueStore
[docs]
class INode(abc.ABC):
[docs]
@abc.abstractmethod
async def execute(self, shared_storage: KeyValueStore) -> KeyValueStore:
pass
@abc.abstractmethod
async def _execute_node(self, shared_storage: KeyValueStore) -> KeyValueStore:
pass
[docs]
class ExecutableNode(INode, abc.ABC):
[docs]
def __init__(self, parents: list[INode]):
self._parents = parents
self._logger = logging.getLogger(self.__class__.__name__)
self._has_execution_started = False
self._state: KeyValueStore | None = None
[docs]
async def execute(self, input_state: KeyValueStore = None) -> KeyValueStore:
input_state = input_state or KeyValueStore()
self._logger.debug("Execute Function called")
while self._has_execution_started and self._state is None:
await asyncio.sleep(0.1)
if self._state is not None:
self._logger.debug("Already Executed, Returning Changed State")
return self._state
self._logger.debug(f"{self.__class__.__name__} Executing")
self._has_execution_started = True
parent_node_tasks = [parent.execute(deepcopy(input_state)) for parent in self._parents]
parent_storages = list(await asyncio.gather(*parent_node_tasks))
input_state.merge(parent_storages)
new_state = await self._execute_node(input_state)
self._state = deepcopy(new_state)
return new_state
@abc.abstractmethod
async def _execute_node(self, shared_storage: KeyValueStore) -> KeyValueStore:
pass