import logging
from collections.abc import Callable
from openai.types.chat import ChatCompletion
from ai.analysis.dataset_usage_analyzer import DatasetUsageAnalyzer
from ai.analysis.money.money import Money
from ai.analysis.run.assistant_run import AssistantRun
from ai.assistant.assistant import Assistant
[docs]
def cost_analyzer(
warning_limit: Money = Money(1e-3),
error_limit: Money = Money(1e-2),
dataset_usage_analyzer: DatasetUsageAnalyzer = None,
) -> Callable[[type[Assistant]], type[Assistant]]:
"""Factory for a class decorator that instruments Assistant subclasses with cost logging.
The decorated Assistant will log a warning if a single run’s cost exceeds `warning_limit`,
and an error if it exceeds `error_limit`. It will also record each run via a
DatasetUsageAnalyzer.
Args:
warning_limit (Money): Cost threshold above which a warning is logged. Defaults to 0.001 USD.
error_limit (Money): Cost threshold above which an error is logged. Defaults to 0.01 USD.
dataset_usage_analyzer (DatasetUsageAnalyzer, optional): Analyzer to collect usage data.
If not provided here, must be injected when instantiating the Assistant.
Returns:
Callable[[Type[Assistant]], Type[Assistant]]: A decorator that produces a cost-instrumented Assistant class.
"""
def class_decorator(cls: type[Assistant]) -> type[Assistant]:
"""Apply cost analysis instrumentation to an Assistant subclass.
Args:
cls (Type[Assistant]): The original Assistant subclass.
Returns:
Type[Assistant]: A new subclass with cost analysis enabled.
"""
class CostDecoratedAssistant(cls):
"""Assistant subclass that logs and records cost per run."""
def __init__(
self,
injected_dataset_usage_analyzer: DatasetUsageAnalyzer = None,
**kwargs,
):
"""Initialize with a dataset usage analyzer and logger.
Args:
injected_dataset_usage_analyzer (DatasetUsageAnalyzer, optional): Overrides
the analyzer provided to the decorator factory.
**kwargs: Passed to the base Assistant constructor.
Raises:
ValueError: If no DatasetUsageAnalyzer is provided.
"""
super().__init__(**kwargs)
self._logger = logging.getLogger(__name__)
self._dataset_usage_analyzer = (
injected_dataset_usage_analyzer or dataset_usage_analyzer
)
if self._dataset_usage_analyzer is None:
raise ValueError("DatasetUsageAnalyzer not provided.")
def add_completion_result(self, completion_result: ChatCompletion) -> None:
"""Process a ChatCompletion by computing cost, logging, and recording it.
Args:
completion_result (ChatCompletion): The result returned by OpenAI API.
"""
assistant_run = AssistantRun(
completion_result=completion_result,
assistant=self,
)
total_cost = assistant_run.get_cost_analysis().total_cost
if total_cost > warning_limit:
self._logger.warning(
f"Cost of run exceeded warning limit: {total_cost}"
)
if total_cost > error_limit:
self._logger.error(
f"Cost of run exceeded error limit: {total_cost}"
)
self._dataset_usage_analyzer.add_run(assistant_run)
async def run_openai(self, *args, **kwargs) -> ChatCompletion | None:
"""Invoke the base run_openai, then instrument its result.
After obtaining the ChatCompletion, logs cost and records it
if usage and model data are present.
Args:
*args: Positional arguments for the base run_openai.
**kwargs: Keyword arguments for the base run_openai.
Returns:
ChatCompletion | None: The original completion result.
"""
completion_result = await super().run_openai(*args, **kwargs)
if (
completion_result is not None
and completion_result.usage is not None
and completion_result.model is not None
):
self.add_completion_result(completion_result)
return completion_result
return CostDecoratedAssistant
return class_decorator