Source code for ai.analysis.cost_analyzer_decorator

import logging
from collections.abc import Callable

from openai.types.chat import ChatCompletion

from ai.analysis.dataset_usage_analyzer import DatasetUsageAnalyzer
from ai.analysis.money.money import Money
from ai.analysis.run.assistant_run import AssistantRun
from ai.assistant.assistant import Assistant


[docs] def cost_analyzer( warning_limit: Money = Money(1e-3), error_limit: Money = Money(1e-2), dataset_usage_analyzer: DatasetUsageAnalyzer = None, ) -> Callable[[type[Assistant]], type[Assistant]]: """Factory for a class decorator that instruments Assistant subclasses with cost logging. The decorated Assistant will log a warning if a single run’s cost exceeds `warning_limit`, and an error if it exceeds `error_limit`. It will also record each run via a DatasetUsageAnalyzer. Args: warning_limit (Money): Cost threshold above which a warning is logged. Defaults to 0.001 USD. error_limit (Money): Cost threshold above which an error is logged. Defaults to 0.01 USD. dataset_usage_analyzer (DatasetUsageAnalyzer, optional): Analyzer to collect usage data. If not provided here, must be injected when instantiating the Assistant. Returns: Callable[[Type[Assistant]], Type[Assistant]]: A decorator that produces a cost-instrumented Assistant class. """ def class_decorator(cls: type[Assistant]) -> type[Assistant]: """Apply cost analysis instrumentation to an Assistant subclass. Args: cls (Type[Assistant]): The original Assistant subclass. Returns: Type[Assistant]: A new subclass with cost analysis enabled. """ class CostDecoratedAssistant(cls): """Assistant subclass that logs and records cost per run.""" def __init__( self, injected_dataset_usage_analyzer: DatasetUsageAnalyzer = None, **kwargs, ): """Initialize with a dataset usage analyzer and logger. Args: injected_dataset_usage_analyzer (DatasetUsageAnalyzer, optional): Overrides the analyzer provided to the decorator factory. **kwargs: Passed to the base Assistant constructor. Raises: ValueError: If no DatasetUsageAnalyzer is provided. """ super().__init__(**kwargs) self._logger = logging.getLogger(__name__) self._dataset_usage_analyzer = ( injected_dataset_usage_analyzer or dataset_usage_analyzer ) if self._dataset_usage_analyzer is None: raise ValueError("DatasetUsageAnalyzer not provided.") def add_completion_result(self, completion_result: ChatCompletion) -> None: """Process a ChatCompletion by computing cost, logging, and recording it. Args: completion_result (ChatCompletion): The result returned by OpenAI API. """ assistant_run = AssistantRun( completion_result=completion_result, assistant=self, ) total_cost = assistant_run.get_cost_analysis().total_cost if total_cost > warning_limit: self._logger.warning( f"Cost of run exceeded warning limit: {total_cost}" ) if total_cost > error_limit: self._logger.error( f"Cost of run exceeded error limit: {total_cost}" ) self._dataset_usage_analyzer.add_run(assistant_run) async def run_openai(self, *args, **kwargs) -> ChatCompletion | None: """Invoke the base run_openai, then instrument its result. After obtaining the ChatCompletion, logs cost and records it if usage and model data are present. Args: *args: Positional arguments for the base run_openai. **kwargs: Keyword arguments for the base run_openai. Returns: ChatCompletion | None: The original completion result. """ completion_result = await super().run_openai(*args, **kwargs) if ( completion_result is not None and completion_result.usage is not None and completion_result.model is not None ): self.add_completion_result(completion_result) return completion_result return CostDecoratedAssistant return class_decorator