Monocle Custom Instrumentation Guide¶
This guide demonstrates how to extend Monocle instrumentation for custom classes and methods. You'll learn how to instrument both synchronous and asynchronous methods, and understand how data flows into span attributes and events.
Overview¶
Monocle provides a flexible instrumentation system that allows you to trace custom classes and methods. When you instrument a method, Monocle captures:
- Attributes: Static metadata about the operation (e.g., class name, method name, configuration)
- Events: Dynamic data captured during execution (e.g., input parameters, output results, timestamps)
Data Flow Architecture¶
- Wrapper: Intercepts the method call (
task_wrapperfor sync,atask_wrapperfor async) - Span Handler: Manages span lifecycle and processes data
- Output Processor: Defines how to extract data from method arguments and return values
- Arguments Dictionary:
{"instance": instance, "args": args, "kwargs": kwargs, "output": return_value}
Example 1: Synchronous Method Instrumentation¶
Real-World Scenario: Financial Calculation Service¶
Imagine you're building a financial application that needs to perform various mathematical calculations for loan processing, interest calculations, or investment portfolio analysis. The SimpleCalculator class represents a core calculation engine that handles these financial computations with configurable precision.
Custom Class with Sync Method¶
# simple_calculator.py
class SimpleCalculator:
def __init__(self, precision=2):
self.precision = precision
self.operation_count = 0
def add(self, a, b):
"""
Calculate the sum of two financial values (e.g., principal + interest).
Args:
a (float): First financial value (e.g., principal amount)
b (float): Second financial value (e.g., interest amount)
Returns:
float: Sum of the two values with configured precision
"""
self.operation_count += 1
result = round(a + b, self.precision)
return result
def multiply(self, a, b):
"""
Calculate the product of two financial values (e.g., principal * interest rate).
Args:
a (float): First financial value (e.g., principal amount)
b (float): Second financial value (e.g., interest rate multiplier)
Returns:
float: Product of the two values with configured precision
"""
self.operation_count += 1
result = round(a * b, self.precision)
return result
Custom Output Processor¶
# calculator_output_processor.py
CALCULATOR_OUTPUT_PROCESSOR = {
"type": "computation",
"attributes": [
{
"attribute": "calculator.precision",
"accessor": lambda arguments: arguments['instance'].precision
},
{
"attribute": "calculator.operation_count",
"accessor": lambda arguments: arguments['instance'].operation_count
}
],
"events": [
{
"name": "data.input",
"attributes": [
{
"attribute": "operand_a",
"accessor": lambda arguments: arguments['args'][0] if arguments['args'] else None
},
{
"attribute": "operand_b",
"accessor": lambda arguments: arguments['args'][1] if len(arguments['args']) > 1 else None
}
]
},
{
"name": "data.output",
"attributes": [
{
"attribute": "result",
"accessor": lambda arguments: arguments['output']
}
]
}
]
}
Instrumentation Setup¶
# main.py
from monocle_apptrace import setup_monocle_telemetry
from monocle_apptrace.instrumentation.common.wrapper_method import WrapperMethod
from monocle_apptrace.instrumentation.common.wrapper import task_wrapper
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from simple_calculator import SimpleCalculator
from calculator_output_processor import CALCULATOR_OUTPUT_PROCESSOR
# Setup Monocle telemetry with custom instrumentation
setup_monocle_telemetry(
workflow_name="calculator_app",
span_processors=[BatchSpanProcessor(ConsoleSpanExporter())],
wrapper_methods=[
WrapperMethod(
package="simple_calculator",
object_name="SimpleCalculator",
method="add",
span_name="calculator.add",
wrapper_method=task_wrapper,
output_processor=CALCULATOR_OUTPUT_PROCESSOR
),
WrapperMethod(
package="simple_calculator",
object_name="SimpleCalculator",
method="multiply",
span_name="calculator.multiply",
wrapper_method=task_wrapper,
output_processor=CALCULATOR_OUTPUT_PROCESSOR
)
]
)
# Use the instrumented class
calc = SimpleCalculator(precision=3)
# Simulate loan calculation: principal + interest
result1 = calc.add(1.5, 2.3) # $1,500 principal + $2,300 interest
# Simulate interest calculation: principal * rate
result2 = calc.multiply(3.2, 4.1) # $3,200 principal * 4.1% rate
print(f"Total loan amount: ${result1}")
print(f"Interest amount: ${result2}")
Generated Span Structure¶
The instrumentation will generate spans with:
Attributes:
{
"monocle_apptrace.version": "0.6.0",
"monocle_apptrace.language": "python",
"span_source": "",
"workflow.name": "calculator_app",
"span.type": "generic"
}
Events: (With custom output processor)
[
{
"name": "data.input",
"timestamp": "2025-10-21T00:50:48.145759Z",
"attributes": {
"operand_a": 1.5,
"operand_b": 2.3
}
},
{
"name": "data.output",
"timestamp": "2025-10-21T00:50:48.145805Z",
"attributes": {
"result": 3.8
}
}
]
Note: With the custom output processor, the instrumentation captures both the input parameters (operands) and the output result in events, providing complete visibility into the method's behavior.
Example 2: Asynchronous Method Instrumentation¶
Real-World Scenario: Document Processing Pipeline¶
Consider a document management system that processes large batches of documents asynchronously. This could be an AI-powered document analysis service that extracts information, performs OCR, or applies machine learning models to classify and process documents. The AsyncDataProcessor class simulates this real-world scenario where processing happens in batches to optimize performance and resource utilization.
Custom Class with Async Method¶
# async_data_processor.py
import asyncio
from typing import List, Dict, Any
class AsyncDataProcessor:
def __init__(self, batch_size=10):
self.batch_size = batch_size
self.processed_count = 0
async def process_items(self, items: List[str]) -> Dict[str, Any]:
"""
Process a batch of documents asynchronously (e.g., OCR, text extraction, classification).
Args:
items: List of document identifiers or file paths to process
Returns:
Dict containing processing results with processed documents and metadata
"""
# Simulate async processing (e.g., calling external APIs, ML models, or file I/O)
await asyncio.sleep(0.1)
# Simulate document processing results (e.g., extracted text, classifications, metadata)
processed_items = [f"processed_{item}" for item in items]
self.processed_count += len(items)
return {
"processed_items": processed_items,
"count": len(processed_items),
"batch_size": self.batch_size
}
async def batch_process(self, data: List[List[str]]) -> List[Dict[str, Any]]:
"""
Process multiple batches of documents in sequence (e.g., processing different document types).
Args:
data: List of document batches, where each batch contains document identifiers
Returns:
List of processing results for each batch
"""
results = []
for batch in data:
result = await self.process_items(batch)
results.append(result)
return results
Custom Output Processor¶
# async_processor_output_processor.py
ASYNC_PROCESSOR_OUTPUT_PROCESSOR = {
"type": "processing",
"attributes": [
{
"attribute": "processor.batch_size",
"accessor": lambda arguments: arguments['instance'].batch_size
},
{
"attribute": "processor.processed_count",
"accessor": lambda arguments: arguments['instance'].processed_count
}
],
"events": [
{
"name": "data.input",
"attributes": [
{
"attribute": "items",
"accessor": lambda arguments: arguments['args'][0] if arguments['args'] else []
},
{
"attribute": "item_count",
"accessor": lambda arguments: len(arguments['args'][0]) if arguments['args'] else 0
}
]
},
{
"name": "data.output",
"attributes": [
{
"attribute": "processed_items",
"accessor": lambda arguments: arguments['output'].get('processed_items', []) if arguments['output'] else []
},
{
"attribute": "result_count",
"accessor": lambda arguments: arguments['output'].get('count', 0) if arguments['output'] else 0
}
]
}
]
}
Instrumentation Setup¶
# main_async.py
from monocle_apptrace import setup_monocle_telemetry
from monocle_apptrace.instrumentation.common.wrapper_method import WrapperMethod
from monocle_apptrace.instrumentation.common.wrapper import atask_wrapper
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from async_data_processor import AsyncDataProcessor
from async_processor_output_processor import ASYNC_PROCESSOR_OUTPUT_PROCESSOR
# Setup Monocle telemetry
setup_monocle_telemetry(
workflow_name="async_processor_app",
span_processors=[BatchSpanProcessor(ConsoleSpanExporter())],
wrapper_methods=[
WrapperMethod(
package="async_data_processor",
object_name="AsyncDataProcessor",
method="process_items",
span_name="processor.process_items",
wrapper_method=atask_wrapper,
output_processor=ASYNC_PROCESSOR_OUTPUT_PROCESSOR
),
WrapperMethod(
package="async_data_processor",
object_name="AsyncDataProcessor",
method="batch_process",
span_name="processor.batch_process",
wrapper_method=atask_wrapper,
output_processor=ASYNC_PROCESSOR_OUTPUT_PROCESSOR
)
]
)
# Use the instrumented async class
async def main():
processor = AsyncDataProcessor(batch_size=5)
# Test single batch processing - simulate processing PDF documents
documents = ["contract_001.pdf", "invoice_002.pdf", "report_003.pdf"]
result = await processor.process_items(documents)
print(f"Processed {result['count']} documents")
# Test batch processing - simulate processing different document types
document_batches = [["legal_docs.pdf", "contracts.pdf"], ["invoices.pdf", "receipts.pdf", "statements.pdf"]]
results = await processor.batch_process(document_batches)
print(f"Processed {len(results)} document batches")
# Run the async main function
import asyncio
asyncio.run(main())
Generated Span Structure¶
The async instrumentation will generate spans with:
Attributes:
{
"monocle_apptrace.version": "0.6.0",
"monocle_apptrace.language": "python",
"span_source": "/path/to/async_data_processor.py:44",
"workflow.name": "async_processor_app",
"span.type": "generic"
}
Events: (With custom output processor)
[
{
"name": "data.input",
"timestamp": "2025-10-21T00:51:02.851381Z",
"attributes": {
"items": ["item1", "item2", "item3"],
"item_count": 3
}
},
{
"name": "data.output",
"timestamp": "2025-10-21T00:51:02.952581Z",
"attributes": {
"processed_items": ["processed_item1", "processed_item2", "processed_item3"],
"result_count": 3
}
}
]
Note: The span_source attribute shows the file path and line number where the instrumented method is called, which is particularly useful for debugging async operations. With the custom output processor, the instrumentation captures both the input items and the processed results in events.
Running the Examples¶
- Save the class files (
simple_calculator.py,async_data_processor.py) - Save the output processor files (
calculator_output_processor.py,async_processor_output_processor.py) - Save the main files (
main.py,main_async.py) - Install Monocle:
pip install monocle_apptrace - Run the examples:
python main.pyorpython main_async.py
The examples will print spans to the console showing the instrumentation working with captured input parameters and output results in events.
Understanding Data Capture¶
For detailed information about what data can be captured and how the arguments dictionary works, see the Data Capture Capabilities section in the Trace Analysis Guide.
Going Beyond Supported GenAI Components¶
- If you are using an application framework, model hosting service/infra etc. that's not currently supported by Monocle, please submit a GitHub issue to add that support.
Next Steps¶
- Understanding Trace Output: See Monocle Trace Analysis Guide to learn how to interpret the generated traces and understand data capture capabilities
- Advanced Patterns: See Monocle Advanced Patterns Guide for production-ready instrumentation patterns
- Testing: See Monocle Testing Guide for testing your custom instrumentation