Skip to content

Monocle Advanced Patterns Guide

This guide covers advanced instrumentation patterns for production-ready GenAI applications. You'll learn how to instrument third-party libraries, create sophisticated output processors, and implement comprehensive testing strategies.

Leveraging Monocle's Extensibility

When the out-of-box features from app frameworks are not sufficient, app developers have to add custom code. For example, if you are extending an LLM class in LlamaIndex to use a model hosted in NVIDIA Triton, this new class is not known to Monocle. You can specify this new class method as part of Monocle's enabling API and it will be able to trace it.

Default Configuration of Instrumented Methods

The following files comprise the default configuration of instrumented methods and span names corresponding to them, for each framework respectively: - src/monocle_apptrace/instrumentation/metamodel/langchain/__init__.py - src/monocle_apptrace/instrumentation/metamodel/llamaindex/__init__.py - src/monocle_apptrace/instrumentation/metamodel/haystack/__init__.py

Following configuration instruments invoke(..) of RunnableSequence, aka chain or workflow in LangChain parlance, to emit the span:

{
    "package": "langchain.schema.runnable",
    "object": "RunnableSequence",
    "method": "invoke",
    "span_name": "langchain.workflow",
    "wrapper": task_wrapper
}

Example - Monitoring Custom Methods with Monocle

from monocle_apptrace import setup_monocle_telemetry
from monocle_apptrace.instrumentation.common.wrapper_method import WrapperMethod
from monocle_apptrace.instrumentation.common.wrapper import task_wrapper, atask_wrapper
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter

# Extend the default wrapped methods list as follows
app_name = "simple_math_app"
setup_monocle_telemetry(
    workflow_name=app_name,
    span_processors=[BatchSpanProcessor(ConsoleSpanExporter())],
    wrapper_methods=[
        WrapperMethod(
            package="langchain.schema.runnable",
            object_name="RunnableParallel",
            method="invoke",
            span_name="langchain.workflow",
            wrapper_method=task_wrapper
        ),
        WrapperMethod(
            package="langchain.schema.runnable",
            object_name="RunnableParallel",
            method="ainvoke",
            span_name="langchain.workflow",
            wrapper_method=atask_wrapper
        )
    ]
)

Going Beyond Supported GenAI Components

  • If you are using an application framework, model hosting service/infra etc. that's not currently supported by Monocle, please submit a GitHub issue to add that support.
  • Monocle community is working on adding an SDK to enable applications to generate their own traces.

Real-World Output Processor Examples

Custom OpenAI Client Instrumentation

Here's how to instrument a custom OpenAI client with detailed output processing:

# custom_openai_client.py
import openai
from typing import Dict, Any, List

class CustomOpenAIClient:
    def __init__(self, api_key: str, model: str = "gpt-3.5-turbo"):
        self.client = openai.OpenAI(api_key=api_key)
        self.model = model
        self.request_count = 0

    def chat_completion(self, messages: List[Dict[str, str]], **kwargs) -> Dict[str, Any]:
        """
        Create a chat completion with custom processing.

        Args:
            messages: List of message dictionaries
            **kwargs: Additional parameters for the API call

        Returns:
            Dict containing the completion response
        """
        self.request_count += 1

        # Add custom processing
        processed_messages = self._preprocess_messages(messages)

        response = self.client.chat.completions.create(
            model=self.model,
            messages=processed_messages,
            **kwargs
        )

        return {
            "content": response.choices[0].message.content,
            "usage": response.usage.dict() if response.usage else None,
            "model": response.model,
            "request_id": response.id
        }

    def _preprocess_messages(self, messages: List[Dict[str, str]]) -> List[Dict[str, str]]:
        """Preprocess messages before sending to API"""
        return [{"role": msg["role"], "content": msg["content"].strip()} for msg in messages]

OpenAI Output Processor

# openai_output_processor.py
OPENAI_OUTPUT_PROCESSOR = {
    "type": "inference",
    "attributes": [
        {
            "attribute": "openai.model",
            "accessor": lambda arguments: arguments['instance'].model
        },
        {
            "attribute": "openai.request_count",
            "accessor": lambda arguments: arguments['instance'].request_count
        },
        {
            "attribute": "openai.temperature",
            "accessor": lambda arguments: arguments['kwargs'].get('temperature', 1.0)
        },
        {
            "attribute": "openai.max_tokens",
            "accessor": lambda arguments: arguments['kwargs'].get('max_tokens', None)
        }
    ],
    "events": [
        {
            "name": "data.input",
            "attributes": [
                {
                    "attribute": "messages",
                    "accessor": lambda arguments: [msg.get('content', '') for msg in arguments['args'][0]] if arguments['args'] else []
                },
                {
                    "attribute": "message_count",
                    "accessor": lambda arguments: len(arguments['args'][0]) if arguments['args'] else 0
                }
            ]
        },
        {
            "name": "data.output",
            "attributes": [
                {
                    "attribute": "response",
                    "accessor": lambda arguments: arguments['output'].get('content', '') if arguments['output'] else ''
                },
                {
                    "attribute": "response_length",
                    "accessor": lambda arguments: len(arguments['output'].get('content', '')) if arguments['output'] else 0
                }
            ]
        },
        {
            "name": "metadata",
            "attributes": [
                {
                    "attribute": "prompt_tokens",
                    "accessor": lambda arguments: arguments['output'].get('usage', {}).get('prompt_tokens', 0) if arguments['output'] else 0
                },
                {
                    "attribute": "completion_tokens",
                    "accessor": lambda arguments: arguments['output'].get('usage', {}).get('completion_tokens', 0) if arguments['output'] else 0
                },
                {
                    "attribute": "total_tokens",
                    "accessor": lambda arguments: arguments['output'].get('usage', {}).get('total_tokens', 0) if arguments['output'] else 0
                },
                {
                    "attribute": "request_id",
                    "accessor": lambda arguments: arguments['output'].get('request_id', '') if arguments['output'] else ''
                }
            ]
        }
    ]
}

Vector Database Instrumentation

# custom_vector_db.py
import numpy as np
from typing import List, Dict, Any, Tuple
from sentence_transformers import SentenceTransformer

class CustomVectorDB:
    def __init__(self, model_name: str = "sentence-transformers/all-MiniLM-L6-v2"):
        self.embedding_model = SentenceTransformer(model_name)
        self.model_name = model_name
        self.documents = []
        self.embeddings = []
        self.search_count = 0

    def add_documents(self, documents: List[str]) -> None:
        """Add documents to the vector database"""
        embeddings = self.embedding_model.encode(documents)
        self.documents.extend(documents)
        self.embeddings.extend(embeddings)

    def search(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]:
        """
        Search for similar documents.

        Args:
            query: Search query
            top_k: Number of results to return

        Returns:
            List of search results with scores
        """
        self.search_count += 1

        # Generate query embedding
        query_embedding = self.embedding_model.encode([query])

        # Calculate similarities
        similarities = np.dot(self.embeddings, query_embedding.T).flatten()

        # Get top results
        top_indices = np.argsort(similarities)[::-1][:top_k]

        results = []
        for idx in top_indices:
            results.append({
                "document": self.documents[idx],
                "score": float(similarities[idx]),
                "index": int(idx)
            })

        return results

Vector DB Output Processor

# vector_db_output_processor.py
VECTOR_DB_OUTPUT_PROCESSOR = {
    "type": "retrieval",
    "attributes": [
        {
            "attribute": "vector_db.model",
            "accessor": lambda arguments: arguments['instance'].model_name
        },
        {
            "attribute": "vector_db.document_count",
            "accessor": lambda arguments: len(arguments['instance'].documents)
        },
        {
            "attribute": "vector_db.search_count",
            "accessor": lambda arguments: arguments['instance'].search_count
        },
        {
            "attribute": "vector_db.top_k",
            "accessor": lambda arguments: arguments['kwargs'].get('top_k', 5)
        }
    ],
    "events": [
        {
            "name": "data.input",
            "attributes": [
                {
                    "attribute": "query",
                    "accessor": lambda arguments: arguments['args'][0] if arguments['args'] else ''
                },
                {
                    "attribute": "query_length",
                    "accessor": lambda arguments: len(arguments['args'][0]) if arguments['args'] else 0
                }
            ]
        },
        {
            "name": "data.output",
            "attributes": [
                {
                    "attribute": "results",
                    "accessor": lambda arguments: [r['document'] for r in arguments['output']] if arguments['output'] else []
                },
                {
                    "attribute": "scores",
                    "accessor": lambda arguments: [r['score'] for r in arguments['output']] if arguments['output'] else []
                },
                {
                    "attribute": "result_count",
                    "accessor": lambda arguments: len(arguments['output']) if arguments['output'] else 0
                }
            ]
        }
    ]
}

Third-Party Library Instrumentation

Instrumenting Custom Components

# custom_instrumentation.py
from monocle_apptrace import setup_monocle_telemetry
from monocle_apptrace.instrumentation.common.wrapper_method import WrapperMethod
from monocle_apptrace.instrumentation.common.wrapper import task_wrapper, atask_wrapper

# Instrument custom components
setup_monocle_telemetry(
    workflow_name="custom_app",
    span_processors=[BatchSpanProcessor(ConsoleSpanExporter())],
    wrapper_methods=[
        # Custom OpenAI client
        WrapperMethod(
            package="custom_openai_client",
            object_name="CustomOpenAIClient",
            method="chat_completion",
            span_name="openai.chat_completion",
            wrapper_method=task_wrapper,
            output_processor=OPENAI_OUTPUT_PROCESSOR
        ),
        # Custom vector database
        WrapperMethod(
            package="custom_vector_db",
            object_name="CustomVectorDB",
            method="search",
            span_name="vector_db.search",
            wrapper_method=task_wrapper,
            output_processor=VECTOR_DB_OUTPUT_PROCESSOR
        )
    ]
)

Complete Trace Examples

OpenAI Client Trace Output

With the custom OpenAI client and output processor, you'll get traces like:

{
  "name": "openai.chat_completion",
  "attributes": {
    "span.type": "inference",
    "openai.model": "gpt-3.5-turbo",
    "openai.request_count": 1,
    "openai.temperature": 0.7,
    "openai.max_tokens": 150
  },
  "events": [
    {
      "name": "data.input",
      "attributes": {
        "messages": ["What is machine learning?", "Can you explain it simply?"],
        "message_count": 2
      }
    },
    {
      "name": "data.output",
      "attributes": {
        "response": "Machine learning is a subset of artificial intelligence that enables computers to learn and make decisions from data without being explicitly programmed for every task.",
        "response_length": 142
      }
    },
    {
      "name": "metadata",
      "attributes": {
        "prompt_tokens": 15,
        "completion_tokens": 25,
        "total_tokens": 40,
        "request_id": "chatcmpl-1234567890"
      }
    }
  ]
}

Vector Database Trace Output

With the custom vector database and output processor, you'll get traces like:

{
  "name": "vector_db.search",
  "attributes": {
    "span.type": "retrieval",
    "vector_db.model": "sentence-transformers/all-MiniLM-L6-v2",
    "vector_db.document_count": 1000,
    "vector_db.search_count": 5,
    "vector_db.top_k": 3
  },
  "events": [
    {
      "name": "data.input",
      "attributes": {
        "query": "machine learning algorithms",
        "query_length": 25
      }
    },
    {
      "name": "data.output",
      "attributes": {
        "results": [
          "Machine learning is a subset of AI that focuses on algorithms...",
          "There are three main types of machine learning: supervised...",
          "Deep learning uses neural networks with multiple layers..."
        ],
        "scores": [0.95, 0.87, 0.82],
        "result_count": 3
      }
    }
  ]
}

Adding Custom Attributes and Events

To add custom attributes and events, you need to define an output processor:

# Custom output processor for calculator
CALCULATOR_OUTPUT_PROCESSOR = {
    "type": "computation",
    "attributes": [
        {
            "attribute": "calculator.precision",
            "accessor": lambda arguments: arguments['instance'].precision
        },
        {
            "attribute": "calculator.operation_count", 
            "accessor": lambda arguments: arguments['instance'].operation_count
        }
    ],
    "events": [
        {
            "name": "data.input",
            "attributes": [
                {
                    "attribute": "operand_a",
                    "accessor": lambda arguments: arguments['args'][0] if arguments['args'] else None
                },
                {
                    "attribute": "operand_b",
                    "accessor": lambda arguments: arguments['args'][1] if len(arguments['args']) > 1 else None
                }
            ]
        },
        {
            "name": "data.output",
            "attributes": [
                {
                    "attribute": "result",
                    "accessor": lambda arguments: arguments['output']
                }
            ]
        }
    ]
}

# Use the custom output processor
setup_monocle_telemetry(
    workflow_name="calculator_app",
    span_processors=[BatchSpanProcessor(ConsoleSpanExporter())],
    wrapper_methods=[
        WrapperMethod(
            package="simple_calculator",
            object_name="SimpleCalculator",
            method="add",
            span_name="calculator.add",
            wrapper_method=task_wrapper,
            output_processor=CALCULATOR_OUTPUT_PROCESSOR
        )
    ]
)

Production Best Practices

  1. Robust Accessor Functions: Write accessor functions that handle missing or malformed data gracefully
  2. Error Handling: Add proper error handling in accessors to avoid instrumentation failures
  3. Performance Considerations: Avoid expensive operations in accessor functions
  4. Attribute Organization: Use consistent naming patterns and group related attributes
  5. Event Standardization: Use standard event names like data.input, data.output, and metadata
  6. Documentation: Document your custom output processors and their expected data structures

Benefits of Advanced Instrumentation

  1. Complete Visibility: Capture detailed telemetry from all GenAI components
  2. Performance Monitoring: Track execution times and identify bottlenecks
  3. Error Analysis: Detailed error information for debugging and optimization
  4. Usage Analytics: Understand how your GenAI components are being used
  5. Cost Tracking: Monitor token usage and API costs across different components
  6. Quality Assurance: Validate that your GenAI components are working as expected

Next Steps