Source code for graph.nodes.core.random_table_node

from enum import Enum

from graph.data.key_value_store import KeyValueStore
from graph.nodes.core.executable_node import ExecutableNode, INode
from src.random_collections.random_collection_table import RandomTable


[docs] class RandomTableNode[K: Enum, V: Enum](ExecutableNode):
[docs] def __init__( self, key_type: type, value_type: type, parents: list[INode], random_generator: RandomTable[K, V], ): self.key_type = key_type self.value_type = value_type self.random_generator = random_generator super().__init__(parents)
async def _execute_node(self, shared_storage: KeyValueStore) -> KeyValueStore: key_value = shared_storage.get(self.key_type) shared_storage.save(self.random_generator.get_random_value(key_value)) return shared_storage