Monocle Advanced Patterns Guide¶
This guide covers advanced instrumentation patterns for production-ready GenAI applications. You'll learn how to instrument third-party libraries, create sophisticated output processors, and implement comprehensive testing strategies.
Leveraging Monocle's Extensibility¶
When the out-of-box features from app frameworks are not sufficient, app developers have to add custom code. For example, if you are extending an LLM class in LlamaIndex to use a model hosted in NVIDIA Triton, this new class is not known to Monocle. You can specify this new class method as part of Monocle's enabling API and it will be able to trace it.
Default Configuration of Instrumented Methods¶
The following files comprise the default configuration of instrumented methods and span names corresponding to them, for each framework respectively: - src/monocle_apptrace/instrumentation/metamodel/langchain/__init__.py - src/monocle_apptrace/instrumentation/metamodel/llamaindex/__init__.py - src/monocle_apptrace/instrumentation/metamodel/haystack/__init__.py
Following configuration instruments invoke(..) of RunnableSequence, aka chain or workflow in LangChain parlance, to emit the span:
{
"package": "langchain.schema.runnable",
"object": "RunnableSequence",
"method": "invoke",
"span_name": "langchain.workflow",
"wrapper": task_wrapper
}
Example - Monitoring Custom Methods with Monocle¶
from monocle_apptrace import setup_monocle_telemetry
from monocle_apptrace.instrumentation.common.wrapper_method import WrapperMethod
from monocle_apptrace.instrumentation.common.wrapper import task_wrapper, atask_wrapper
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
# Extend the default wrapped methods list as follows
app_name = "simple_math_app"
setup_monocle_telemetry(
workflow_name=app_name,
span_processors=[BatchSpanProcessor(ConsoleSpanExporter())],
wrapper_methods=[
WrapperMethod(
package="langchain.schema.runnable",
object_name="RunnableParallel",
method="invoke",
span_name="langchain.workflow",
wrapper_method=task_wrapper
),
WrapperMethod(
package="langchain.schema.runnable",
object_name="RunnableParallel",
method="ainvoke",
span_name="langchain.workflow",
wrapper_method=atask_wrapper
)
]
)
Going Beyond Supported GenAI Components¶
- If you are using an application framework, model hosting service/infra etc. that's not currently supported by Monocle, please submit a GitHub issue to add that support.
- Monocle community is working on adding an SDK to enable applications to generate their own traces.
Real-World Output Processor Examples¶
Custom OpenAI Client Instrumentation¶
Here's how to instrument a custom OpenAI client with detailed output processing:
# custom_openai_client.py
import openai
from typing import Dict, Any, List
class CustomOpenAIClient:
def __init__(self, api_key: str, model: str = "gpt-3.5-turbo"):
self.client = openai.OpenAI(api_key=api_key)
self.model = model
self.request_count = 0
def chat_completion(self, messages: List[Dict[str, str]], **kwargs) -> Dict[str, Any]:
"""
Create a chat completion with custom processing.
Args:
messages: List of message dictionaries
**kwargs: Additional parameters for the API call
Returns:
Dict containing the completion response
"""
self.request_count += 1
# Add custom processing
processed_messages = self._preprocess_messages(messages)
response = self.client.chat.completions.create(
model=self.model,
messages=processed_messages,
**kwargs
)
return {
"content": response.choices[0].message.content,
"usage": response.usage.dict() if response.usage else None,
"model": response.model,
"request_id": response.id
}
def _preprocess_messages(self, messages: List[Dict[str, str]]) -> List[Dict[str, str]]:
"""Preprocess messages before sending to API"""
return [{"role": msg["role"], "content": msg["content"].strip()} for msg in messages]
OpenAI Output Processor¶
# openai_output_processor.py
OPENAI_OUTPUT_PROCESSOR = {
"type": "inference",
"attributes": [
{
"attribute": "openai.model",
"accessor": lambda arguments: arguments['instance'].model
},
{
"attribute": "openai.request_count",
"accessor": lambda arguments: arguments['instance'].request_count
},
{
"attribute": "openai.temperature",
"accessor": lambda arguments: arguments['kwargs'].get('temperature', 1.0)
},
{
"attribute": "openai.max_tokens",
"accessor": lambda arguments: arguments['kwargs'].get('max_tokens', None)
}
],
"events": [
{
"name": "data.input",
"attributes": [
{
"attribute": "messages",
"accessor": lambda arguments: [msg.get('content', '') for msg in arguments['args'][0]] if arguments['args'] else []
},
{
"attribute": "message_count",
"accessor": lambda arguments: len(arguments['args'][0]) if arguments['args'] else 0
}
]
},
{
"name": "data.output",
"attributes": [
{
"attribute": "response",
"accessor": lambda arguments: arguments['output'].get('content', '') if arguments['output'] else ''
},
{
"attribute": "response_length",
"accessor": lambda arguments: len(arguments['output'].get('content', '')) if arguments['output'] else 0
}
]
},
{
"name": "metadata",
"attributes": [
{
"attribute": "prompt_tokens",
"accessor": lambda arguments: arguments['output'].get('usage', {}).get('prompt_tokens', 0) if arguments['output'] else 0
},
{
"attribute": "completion_tokens",
"accessor": lambda arguments: arguments['output'].get('usage', {}).get('completion_tokens', 0) if arguments['output'] else 0
},
{
"attribute": "total_tokens",
"accessor": lambda arguments: arguments['output'].get('usage', {}).get('total_tokens', 0) if arguments['output'] else 0
},
{
"attribute": "request_id",
"accessor": lambda arguments: arguments['output'].get('request_id', '') if arguments['output'] else ''
}
]
}
]
}
Vector Database Instrumentation¶
# custom_vector_db.py
import numpy as np
from typing import List, Dict, Any, Tuple
from sentence_transformers import SentenceTransformer
class CustomVectorDB:
def __init__(self, model_name: str = "sentence-transformers/all-MiniLM-L6-v2"):
self.embedding_model = SentenceTransformer(model_name)
self.model_name = model_name
self.documents = []
self.embeddings = []
self.search_count = 0
def add_documents(self, documents: List[str]) -> None:
"""Add documents to the vector database"""
embeddings = self.embedding_model.encode(documents)
self.documents.extend(documents)
self.embeddings.extend(embeddings)
def search(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]:
"""
Search for similar documents.
Args:
query: Search query
top_k: Number of results to return
Returns:
List of search results with scores
"""
self.search_count += 1
# Generate query embedding
query_embedding = self.embedding_model.encode([query])
# Calculate similarities
similarities = np.dot(self.embeddings, query_embedding.T).flatten()
# Get top results
top_indices = np.argsort(similarities)[::-1][:top_k]
results = []
for idx in top_indices:
results.append({
"document": self.documents[idx],
"score": float(similarities[idx]),
"index": int(idx)
})
return results
Vector DB Output Processor¶
# vector_db_output_processor.py
VECTOR_DB_OUTPUT_PROCESSOR = {
"type": "retrieval",
"attributes": [
{
"attribute": "vector_db.model",
"accessor": lambda arguments: arguments['instance'].model_name
},
{
"attribute": "vector_db.document_count",
"accessor": lambda arguments: len(arguments['instance'].documents)
},
{
"attribute": "vector_db.search_count",
"accessor": lambda arguments: arguments['instance'].search_count
},
{
"attribute": "vector_db.top_k",
"accessor": lambda arguments: arguments['kwargs'].get('top_k', 5)
}
],
"events": [
{
"name": "data.input",
"attributes": [
{
"attribute": "query",
"accessor": lambda arguments: arguments['args'][0] if arguments['args'] else ''
},
{
"attribute": "query_length",
"accessor": lambda arguments: len(arguments['args'][0]) if arguments['args'] else 0
}
]
},
{
"name": "data.output",
"attributes": [
{
"attribute": "results",
"accessor": lambda arguments: [r['document'] for r in arguments['output']] if arguments['output'] else []
},
{
"attribute": "scores",
"accessor": lambda arguments: [r['score'] for r in arguments['output']] if arguments['output'] else []
},
{
"attribute": "result_count",
"accessor": lambda arguments: len(arguments['output']) if arguments['output'] else 0
}
]
}
]
}
Third-Party Library Instrumentation¶
Instrumenting Custom Components¶
# custom_instrumentation.py
from monocle_apptrace import setup_monocle_telemetry
from monocle_apptrace.instrumentation.common.wrapper_method import WrapperMethod
from monocle_apptrace.instrumentation.common.wrapper import task_wrapper, atask_wrapper
# Instrument custom components
setup_monocle_telemetry(
workflow_name="custom_app",
span_processors=[BatchSpanProcessor(ConsoleSpanExporter())],
wrapper_methods=[
# Custom OpenAI client
WrapperMethod(
package="custom_openai_client",
object_name="CustomOpenAIClient",
method="chat_completion",
span_name="openai.chat_completion",
wrapper_method=task_wrapper,
output_processor=OPENAI_OUTPUT_PROCESSOR
),
# Custom vector database
WrapperMethod(
package="custom_vector_db",
object_name="CustomVectorDB",
method="search",
span_name="vector_db.search",
wrapper_method=task_wrapper,
output_processor=VECTOR_DB_OUTPUT_PROCESSOR
)
]
)
Complete Trace Examples¶
OpenAI Client Trace Output¶
With the custom OpenAI client and output processor, you'll get traces like:
{
"name": "openai.chat_completion",
"attributes": {
"span.type": "inference",
"openai.model": "gpt-3.5-turbo",
"openai.request_count": 1,
"openai.temperature": 0.7,
"openai.max_tokens": 150
},
"events": [
{
"name": "data.input",
"attributes": {
"messages": ["What is machine learning?", "Can you explain it simply?"],
"message_count": 2
}
},
{
"name": "data.output",
"attributes": {
"response": "Machine learning is a subset of artificial intelligence that enables computers to learn and make decisions from data without being explicitly programmed for every task.",
"response_length": 142
}
},
{
"name": "metadata",
"attributes": {
"prompt_tokens": 15,
"completion_tokens": 25,
"total_tokens": 40,
"request_id": "chatcmpl-1234567890"
}
}
]
}
Vector Database Trace Output¶
With the custom vector database and output processor, you'll get traces like:
{
"name": "vector_db.search",
"attributes": {
"span.type": "retrieval",
"vector_db.model": "sentence-transformers/all-MiniLM-L6-v2",
"vector_db.document_count": 1000,
"vector_db.search_count": 5,
"vector_db.top_k": 3
},
"events": [
{
"name": "data.input",
"attributes": {
"query": "machine learning algorithms",
"query_length": 25
}
},
{
"name": "data.output",
"attributes": {
"results": [
"Machine learning is a subset of AI that focuses on algorithms...",
"There are three main types of machine learning: supervised...",
"Deep learning uses neural networks with multiple layers..."
],
"scores": [0.95, 0.87, 0.82],
"result_count": 3
}
}
]
}
Adding Custom Attributes and Events¶
To add custom attributes and events, you need to define an output processor:
# Custom output processor for calculator
CALCULATOR_OUTPUT_PROCESSOR = {
"type": "computation",
"attributes": [
{
"attribute": "calculator.precision",
"accessor": lambda arguments: arguments['instance'].precision
},
{
"attribute": "calculator.operation_count",
"accessor": lambda arguments: arguments['instance'].operation_count
}
],
"events": [
{
"name": "data.input",
"attributes": [
{
"attribute": "operand_a",
"accessor": lambda arguments: arguments['args'][0] if arguments['args'] else None
},
{
"attribute": "operand_b",
"accessor": lambda arguments: arguments['args'][1] if len(arguments['args']) > 1 else None
}
]
},
{
"name": "data.output",
"attributes": [
{
"attribute": "result",
"accessor": lambda arguments: arguments['output']
}
]
}
]
}
# Use the custom output processor
setup_monocle_telemetry(
workflow_name="calculator_app",
span_processors=[BatchSpanProcessor(ConsoleSpanExporter())],
wrapper_methods=[
WrapperMethod(
package="simple_calculator",
object_name="SimpleCalculator",
method="add",
span_name="calculator.add",
wrapper_method=task_wrapper,
output_processor=CALCULATOR_OUTPUT_PROCESSOR
)
]
)
Production Best Practices¶
- Robust Accessor Functions: Write accessor functions that handle missing or malformed data gracefully
- Error Handling: Add proper error handling in accessors to avoid instrumentation failures
- Performance Considerations: Avoid expensive operations in accessor functions
- Attribute Organization: Use consistent naming patterns and group related attributes
- Event Standardization: Use standard event names like
data.input,data.output, andmetadata - Documentation: Document your custom output processors and their expected data structures
Benefits of Advanced Instrumentation¶
- Complete Visibility: Capture detailed telemetry from all GenAI components
- Performance Monitoring: Track execution times and identify bottlenecks
- Error Analysis: Detailed error information for debugging and optimization
- Usage Analytics: Understand how your GenAI components are being used
- Cost Tracking: Monitor token usage and API costs across different components
- Quality Assurance: Validate that your GenAI components are working as expected
Next Steps¶
- Basic Instrumentation: See Monocle Custom Instrumentation Guide for getting started
- Trace Analysis: See Monocle Trace Analysis Guide to understand the generated traces
- Testing: See Monocle Testing Guide for testing your custom instrumentation