Source code for graph.nodes.core.executable_node

import abc
import asyncio
import logging
from copy import deepcopy

from graph.data.key_value_store import KeyValueStore


[docs] class INode(abc.ABC):
[docs] @abc.abstractmethod async def execute(self, shared_storage: KeyValueStore) -> KeyValueStore: pass
@abc.abstractmethod async def _execute_node(self, shared_storage: KeyValueStore) -> KeyValueStore: pass
[docs] class ExecutableNode(INode, abc.ABC):
[docs] def __init__(self, parents: list[INode]): self._parents = parents self._logger = logging.getLogger(self.__class__.__name__) self._has_execution_started = False self._state: KeyValueStore | None = None
[docs] async def execute(self, input_state: KeyValueStore = None) -> KeyValueStore: input_state = input_state or KeyValueStore() self._logger.debug("Execute Function called") while self._has_execution_started and self._state is None: await asyncio.sleep(0.1) if self._state is not None: self._logger.debug("Already Executed, Returning Changed State") return self._state self._logger.debug(f"{self.__class__.__name__} Executing") self._has_execution_started = True parent_node_tasks = [parent.execute(deepcopy(input_state)) for parent in self._parents] parent_storages = list(await asyncio.gather(*parent_node_tasks)) input_state.merge(parent_storages) new_state = await self._execute_node(input_state) self._state = deepcopy(new_state) return new_state
@abc.abstractmethod async def _execute_node(self, shared_storage: KeyValueStore) -> KeyValueStore: pass