Spaces:
Paused
Paused
| #### What this does #### | |
| # On success, logs events to Langfuse | |
| import dotenv, os | |
| import requests | |
| import requests | |
| from datetime import datetime | |
| dotenv.load_dotenv() # Loading env variables using dotenv | |
| import traceback | |
| from packaging.version import Version | |
| class LangFuseLogger: | |
| # Class variables or attributes | |
| def __init__(self): | |
| try: | |
| from langfuse import Langfuse | |
| except Exception as e: | |
| raise Exception( | |
| f"\033[91mLangfuse not installed, try running 'pip install langfuse' to fix this error: {e}\033[0m" | |
| ) | |
| # Instance variables | |
| self.secret_key = os.getenv("LANGFUSE_SECRET_KEY") | |
| self.public_key = os.getenv("LANGFUSE_PUBLIC_KEY") | |
| self.langfuse_host = os.getenv("LANGFUSE_HOST", "https://cloud.langfuse.com") | |
| self.langfuse_release = os.getenv("LANGFUSE_RELEASE") | |
| self.langfuse_debug = os.getenv("LANGFUSE_DEBUG") | |
| self.Langfuse = Langfuse( | |
| public_key=self.public_key, | |
| secret_key=self.secret_key, | |
| host=self.langfuse_host, | |
| release=self.langfuse_release, | |
| debug=self.langfuse_debug, | |
| ) | |
| def log_event( | |
| self, kwargs, response_obj, start_time, end_time, user_id, print_verbose | |
| ): | |
| # Method definition | |
| try: | |
| print_verbose( | |
| f"Langfuse Logging - Enters logging function for model {kwargs}" | |
| ) | |
| litellm_params = kwargs.get("litellm_params", {}) | |
| metadata = ( | |
| litellm_params.get("metadata", {}) or {} | |
| ) # if litellm_params['metadata'] == None | |
| prompt = [kwargs.get("messages")] | |
| optional_params = kwargs.get("optional_params", {}) | |
| optional_params.pop("functions", None) | |
| optional_params.pop("tools", None) | |
| # langfuse only accepts str, int, bool, float for logging | |
| for param, value in optional_params.items(): | |
| if not isinstance(value, (str, int, bool, float)): | |
| try: | |
| optional_params[param] = str(value) | |
| except: | |
| # if casting value to str fails don't block logging | |
| pass | |
| # end of processing langfuse ######################## | |
| input = prompt | |
| output = response_obj["choices"][0]["message"].json() | |
| print_verbose( | |
| f"OUTPUT IN LANGFUSE: {output}; original: {response_obj['choices'][0]['message']}" | |
| ) | |
| self._log_langfuse_v2( | |
| user_id, | |
| metadata, | |
| output, | |
| start_time, | |
| end_time, | |
| kwargs, | |
| optional_params, | |
| input, | |
| response_obj, | |
| ) if self._is_langfuse_v2() else self._log_langfuse_v1( | |
| user_id, | |
| metadata, | |
| output, | |
| start_time, | |
| end_time, | |
| kwargs, | |
| optional_params, | |
| input, | |
| response_obj, | |
| ) | |
| self.Langfuse.flush() | |
| print_verbose( | |
| f"Langfuse Layer Logging - final response object: {response_obj}" | |
| ) | |
| except: | |
| traceback.print_exc() | |
| print_verbose(f"Langfuse Layer Error - {traceback.format_exc()}") | |
| pass | |
| async def _async_log_event( | |
| self, kwargs, response_obj, start_time, end_time, user_id, print_verbose | |
| ): | |
| self.log_event( | |
| kwargs, response_obj, start_time, end_time, user_id, print_verbose | |
| ) | |
| def _is_langfuse_v2(self): | |
| import langfuse | |
| return Version(langfuse.version.__version__) >= Version("2.0.0") | |
| def _log_langfuse_v1( | |
| self, | |
| user_id, | |
| metadata, | |
| output, | |
| start_time, | |
| end_time, | |
| kwargs, | |
| optional_params, | |
| input, | |
| response_obj, | |
| ): | |
| from langfuse.model import CreateTrace, CreateGeneration | |
| print( | |
| "Please upgrade langfuse to v2.0.0 or higher: https://github.com/langfuse/langfuse-python/releases/tag/v2.0.1" | |
| ) | |
| trace = self.Langfuse.trace( | |
| CreateTrace( | |
| name=metadata.get("generation_name", "litellm-completion"), | |
| input=input, | |
| output=output, | |
| userId=user_id, | |
| ) | |
| ) | |
| trace.generation( | |
| CreateGeneration( | |
| name=metadata.get("generation_name", "litellm-completion"), | |
| startTime=start_time, | |
| endTime=end_time, | |
| model=kwargs["model"], | |
| modelParameters=optional_params, | |
| input=input, | |
| output=output, | |
| usage={ | |
| "prompt_tokens": response_obj["usage"]["prompt_tokens"], | |
| "completion_tokens": response_obj["usage"]["completion_tokens"], | |
| }, | |
| metadata=metadata, | |
| ) | |
| ) | |
| def _log_langfuse_v2( | |
| self, | |
| user_id, | |
| metadata, | |
| output, | |
| start_time, | |
| end_time, | |
| kwargs, | |
| optional_params, | |
| input, | |
| response_obj, | |
| ): | |
| trace = self.Langfuse.trace( | |
| name=metadata.get("generation_name", "litellm-completion"), | |
| input=input, | |
| output=output, | |
| user_id=metadata.get("trace_user_id", user_id), | |
| id=metadata.get("trace_id", None), | |
| ) | |
| trace.generation( | |
| name=metadata.get("generation_name", "litellm-completion"), | |
| id=metadata.get("generation_id", None), | |
| startTime=start_time, | |
| endTime=end_time, | |
| model=kwargs["model"], | |
| modelParameters=optional_params, | |
| input=input, | |
| output=output, | |
| usage={ | |
| "prompt_tokens": response_obj["usage"]["prompt_tokens"], | |
| "completion_tokens": response_obj["usage"]["completion_tokens"], | |
| }, | |
| metadata=metadata, | |
| ) | |