LangChain Integration¶
Headroom provides seamless integration with LangChain, enabling automatic context optimization across all LangChain patterns: chat models, memory, retrievers, agents, and observability.
Installation¶
This installs Headroom with LangChain dependencies (langchain-core).
Quick Start¶
Wrap Any Chat Model (1 Line)¶
from langchain_openai import ChatOpenAI
from headroom.integrations import HeadroomChatModel
# Wrap your model - that's it!
llm = HeadroomChatModel(ChatOpenAI(model="gpt-4o"))
# Use exactly like before
response = llm.invoke("Hello!")
Headroom automatically: - Detects the provider (OpenAI, Anthropic, Google) - Compresses tool outputs in conversation history - Optimizes for provider caching - Tracks token savings
Check Your Savings¶
# After some usage
print(llm.get_metrics())
# {'tokens_saved': 12500, 'savings_percent': 45.2, 'requests': 50}
Integration Patterns¶
1. Chat Model Wrapper¶
The HeadroomChatModel wraps any LangChain BaseChatModel:
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic
from headroom.integrations import HeadroomChatModel
# OpenAI
llm = HeadroomChatModel(ChatOpenAI(model="gpt-4o"))
# Anthropic (auto-detected)
llm = HeadroomChatModel(ChatAnthropic(model="claude-3-5-sonnet-20241022"))
# Custom configuration
from headroom import HeadroomConfig, HeadroomMode
config = HeadroomConfig(
default_mode=HeadroomMode.OPTIMIZE,
smart_crusher_target_ratio=0.3, # Target 70% compression
)
llm = HeadroomChatModel(
ChatOpenAI(model="gpt-4o"),
headroom_config=config,
)
Async Support¶
Full async support for ainvoke and astream:
# Async invoke
response = await llm.ainvoke("Hello!")
# Async streaming
async for chunk in llm.astream("Tell me a story"):
print(chunk.content, end="", flush=True)
Tool Calling¶
Works seamlessly with LangChain tool calling:
from langchain_core.tools import tool
@tool
def search(query: str) -> str:
"""Search the web."""
return {"results": [...]} # Large JSON response
llm_with_tools = llm.bind_tools([search])
response = llm_with_tools.invoke("Search for Python tutorials")
# Tool outputs are automatically compressed in subsequent turns
2. Memory Integration¶
HeadroomChatMessageHistory wraps any chat history with automatic compression:
from langchain.memory import ConversationBufferMemory
from langchain_community.chat_message_histories import ChatMessageHistory
from headroom.integrations import HeadroomChatMessageHistory
# Wrap any history
base_history = ChatMessageHistory()
compressed_history = HeadroomChatMessageHistory(
base_history,
compress_threshold_tokens=4000, # Compress when over 4K tokens
keep_recent_turns=5, # Always keep last 5 turns
)
# Use with any memory class
memory = ConversationBufferMemory(chat_memory=compressed_history)
# Zero changes to your chain!
chain = ConversationChain(llm=llm, memory=memory)
Why this matters: Long conversations can blow up to 50K+ tokens. HeadroomChatMessageHistory automatically compresses older turns while preserving recent context.
# Check compression stats
print(compressed_history.get_compression_stats())
# {'compression_count': 12, 'total_tokens_saved': 28000}
3. Retriever Integration¶
HeadroomDocumentCompressor filters retrieved documents by relevance:
from langchain.retrievers import ContextualCompressionRetriever
from langchain_community.vectorstores import FAISS
from headroom.integrations import HeadroomDocumentCompressor
# Create vector store retriever (retrieve many for recall)
vectorstore = FAISS.from_documents(documents, embeddings)
base_retriever = vectorstore.as_retriever(search_kwargs={"k": 50})
# Wrap with Headroom compression (keep best for precision)
compressor = HeadroomDocumentCompressor(
max_documents=10, # Keep top 10
min_relevance=0.3, # Minimum relevance score
prefer_diverse=True, # MMR-style diversity
)
retriever = ContextualCompressionRetriever(
base_compressor=compressor,
base_retriever=base_retriever,
)
# Retrieves 50 docs, returns best 10
docs = retriever.invoke("What is Python?")
Why this matters: Vector search often returns many marginally-relevant documents. HeadroomDocumentCompressor uses BM25-style scoring to keep only the most relevant ones, reducing context size while improving answer quality.
4. Agent Tool Wrapping¶
wrap_tools_with_headroom compresses tool outputs for agents:
from langchain.agents import create_openai_tools_agent, AgentExecutor
from langchain_core.tools import tool
from headroom.integrations import wrap_tools_with_headroom
@tool
def search_database(query: str) -> str:
"""Search the database."""
# Returns 1000 results as JSON
return json.dumps({"results": [...], "total": 1000})
@tool
def fetch_logs(service: str) -> str:
"""Fetch service logs."""
# Returns 500 log entries
return json.dumps({"logs": [...]})
# Wrap tools with compression
tools = [search_database, fetch_logs]
wrapped_tools = wrap_tools_with_headroom(
tools,
min_chars_to_compress=1000, # Only compress large outputs
)
# Create agent with wrapped tools
agent = create_openai_tools_agent(llm, wrapped_tools, prompt)
executor = AgentExecutor(agent=agent, tools=wrapped_tools)
# Tool outputs are automatically compressed
result = executor.invoke({"input": "Find users who logged in yesterday"})
Per-tool metrics:
from headroom.integrations import get_tool_metrics
metrics = get_tool_metrics()
print(metrics.get_summary())
# {
# 'total_invocations': 25,
# 'total_compressions': 18,
# 'total_chars_saved': 450000,
# 'by_tool': {
# 'search_database': {'invocations': 15, 'chars_saved': 320000},
# 'fetch_logs': {'invocations': 10, 'chars_saved': 130000},
# }
# }
5. Streaming Metrics¶
Track output tokens during streaming:
from headroom.integrations import StreamingMetricsTracker
tracker = StreamingMetricsTracker(model="gpt-4o")
for chunk in llm.stream("Write a poem about coding"):
tracker.add_chunk(chunk)
print(chunk.content, end="", flush=True)
metrics = tracker.finish()
print(f"\nOutput tokens: {metrics.output_tokens}")
print(f"Duration: {metrics.duration_ms:.0f}ms")
Context manager style:
from headroom.integrations import StreamingMetricsCallback
with StreamingMetricsCallback(model="gpt-4o") as tracker:
for chunk in llm.stream(messages):
tracker.add_chunk(chunk)
print(chunk.content, end="")
print(f"Metrics: {tracker.metrics}")
6. LangSmith Integration¶
Add Headroom metrics to LangSmith traces:
from headroom.integrations import HeadroomLangSmithCallbackHandler
# Create callback handler
langsmith_handler = HeadroomLangSmithCallbackHandler()
# Use with your LLM
llm = HeadroomChatModel(
ChatOpenAI(model="gpt-4o"),
callbacks=[langsmith_handler],
)
# After calls, metrics appear in LangSmith traces:
# - headroom.tokens_before
# - headroom.tokens_after
# - headroom.tokens_saved
# - headroom.compression_ratio
Real-World Examples¶
Example 1: LangGraph ReAct Agent¶
The ReAct pattern is the most common agent architecture. Here's how to optimize it:
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langgraph.prebuilt import create_react_agent
from headroom.integrations import HeadroomChatModel, wrap_tools_with_headroom
# Define tools that return large outputs
@tool
def search_web(query: str) -> str:
"""Search the web for information."""
# Simulating large search results
return json.dumps({
"results": [
{"title": f"Result {i}", "snippet": "..." * 100, "url": f"https://..."}
for i in range(100)
],
"total": 1000,
})
@tool
def query_database(sql: str) -> str:
"""Execute SQL query."""
return json.dumps({
"rows": [{"id": i, "data": "..." * 50} for i in range(500)],
"total": 500,
})
# Wrap model with Headroom
llm = HeadroomChatModel(ChatOpenAI(model="gpt-4o"))
# Wrap tools with compression
tools = wrap_tools_with_headroom([search_web, query_database])
# Create ReAct agent
agent = create_react_agent(llm, tools)
# Run - tool outputs are automatically compressed between iterations
result = agent.invoke({
"messages": [("user", "Find all users who signed up last week and their activity")]
})
# Check savings
print(f"Tokens saved: {llm.get_metrics()['tokens_saved']}")
Without Headroom: Each tool call adds 10-50K tokens to context. With Headroom: Tool outputs compressed to 1-2K tokens, agent runs faster and cheaper.
Example 2: RAG Pipeline with Document Filtering¶
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.chains import RetrievalQA
from langchain.retrievers import ContextualCompressionRetriever
from headroom.integrations import HeadroomChatModel, HeadroomDocumentCompressor
# Setup vector store
embeddings = OpenAIEmbeddings()
vectorstore = Chroma.from_documents(documents, embeddings)
# High-recall retriever (get many candidates)
base_retriever = vectorstore.as_retriever(search_kwargs={"k": 50})
# Headroom compressor for precision
compressor = HeadroomDocumentCompressor(
max_documents=5, # Keep only top 5
min_relevance=0.4, # Must be 40%+ relevant
prefer_diverse=True, # Avoid redundant docs
)
# Combine into compression retriever
retriever = ContextualCompressionRetriever(
base_compressor=compressor,
base_retriever=base_retriever,
)
# Wrap LLM
llm = HeadroomChatModel(ChatOpenAI(model="gpt-4o"))
# Create QA chain
qa_chain = RetrievalQA.from_chain_type(
llm=llm,
retriever=retriever,
return_source_documents=True,
)
# Query - retrieves 50 docs, uses best 5
result = qa_chain.invoke({"query": "How do I configure authentication?"})
print(f"Answer: {result['result']}")
print(f"Sources: {len(result['source_documents'])} docs")
Impact: - Without filtering: 50 docs × ~500 tokens = 25K context tokens - With Headroom: 5 docs × ~500 tokens = 2.5K context tokens (90% reduction)
Example 3: Conversational Agent with Memory¶
from langchain_openai import ChatOpenAI
from langchain.memory import ConversationBufferMemory
from langchain_community.chat_message_histories import ChatMessageHistory
from langchain.chains import ConversationChain
from headroom.integrations import HeadroomChatModel, HeadroomChatMessageHistory
# Wrap LLM
llm = HeadroomChatModel(ChatOpenAI(model="gpt-4o"))
# Wrap memory with auto-compression
base_history = ChatMessageHistory()
compressed_history = HeadroomChatMessageHistory(
base_history,
compress_threshold_tokens=8000, # Compress when over 8K
keep_recent_turns=10, # Always keep last 10 turns
)
memory = ConversationBufferMemory(
chat_memory=compressed_history,
return_messages=True,
)
# Create conversation chain
chain = ConversationChain(llm=llm, memory=memory)
# Long conversation - memory auto-compresses
for i in range(100):
response = chain.invoke({"input": f"Tell me about topic {i}"})
print(f"Turn {i}: {len(response['response'])} chars")
# Check memory stats
print(compressed_history.get_compression_stats())
# {'compression_count': 8, 'total_tokens_saved': 45000}
Impact: Without compression, 100-turn conversation = 100K+ tokens. With HeadroomChatMessageHistory, it stays under 8K tokens while preserving recent context.
Example 4: Multi-Tool Research Agent¶
from langchain_openai import ChatOpenAI
from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.tools import tool
from headroom.integrations import (
HeadroomChatModel,
wrap_tools_with_headroom,
get_tool_metrics,
reset_tool_metrics,
)
@tool
def search_arxiv(query: str) -> str:
"""Search arXiv for papers."""
return json.dumps({"papers": [{"title": f"Paper {i}", "abstract": "..." * 200} for i in range(50)]})
@tool
def search_github(query: str) -> str:
"""Search GitHub repositories."""
return json.dumps({"repos": [{"name": f"repo-{i}", "description": "..." * 100, "stars": i * 100} for i in range(100)]})
@tool
def fetch_documentation(url: str) -> str:
"""Fetch documentation from URL."""
return "..." * 5000 # Large doc content
# Wrap everything
llm = HeadroomChatModel(ChatOpenAI(model="gpt-4o"))
tools = wrap_tools_with_headroom([search_arxiv, search_github, fetch_documentation])
prompt = ChatPromptTemplate.from_messages([
("system", "You are a research assistant. Use tools to gather information."),
("human", "{input}"),
("placeholder", "{agent_scratchpad}"),
])
agent = create_openai_tools_agent(llm, tools, prompt)
executor = AgentExecutor(agent=agent, tools=tools, verbose=True)
# Reset metrics for this session
reset_tool_metrics()
# Run complex research task
result = executor.invoke({
"input": "Research the latest advances in LLM context compression and find relevant GitHub projects"
})
# Check per-tool metrics
metrics = get_tool_metrics().get_summary()
print(f"Total chars saved: {metrics['total_chars_saved']:,}")
print(f"Per-tool breakdown: {metrics['by_tool']}")
Configuration Options¶
HeadroomChatModel¶
HeadroomChatModel(
wrapped_model, # Any LangChain BaseChatModel
headroom_config=HeadroomConfig(), # Headroom configuration
auto_detect_provider=True, # Auto-detect from wrapped model
)
HeadroomChatMessageHistory¶
HeadroomChatMessageHistory(
base_history, # Any BaseChatMessageHistory
compress_threshold_tokens=4000, # Token threshold for compression
keep_recent_turns=5, # Minimum turns to preserve
model="gpt-4o", # Model for token counting
)
HeadroomDocumentCompressor¶
HeadroomDocumentCompressor(
max_documents=10, # Maximum docs to return
min_relevance=0.0, # Minimum relevance score (0-1)
prefer_diverse=False, # Use MMR for diversity
)
wrap_tools_with_headroom¶
wrap_tools_with_headroom(
tools, # List of LangChain tools
min_chars_to_compress=1000, # Minimum output size
smart_crusher_config=None, # SmartCrusher configuration
)
Import Reference¶
from headroom.integrations import (
# Chat Model
HeadroomChatModel,
# Memory
HeadroomChatMessageHistory,
# Retrievers
HeadroomDocumentCompressor,
# Agents
HeadroomToolWrapper,
wrap_tools_with_headroom,
get_tool_metrics,
reset_tool_metrics,
# Streaming
StreamingMetricsTracker,
StreamingMetricsCallback,
track_streaming_response,
# LangSmith
HeadroomLangSmithCallbackHandler,
# Provider Detection
detect_provider,
get_headroom_provider,
)
# Or import from subpackage directly
from headroom.integrations.langchain import HeadroomChatModel
from headroom.integrations.langchain.memory import HeadroomChatMessageHistory
Troubleshooting¶
LangChain not detected¶
from headroom.integrations import langchain_available
if not langchain_available():
print("Install with: pip install headroom-ai[langchain]")
Provider detection failing¶
# Force a specific provider
from headroom.providers import AnthropicProvider
llm = HeadroomChatModel(
ChatAnthropic(model="claude-3-5-sonnet-20241022"),
auto_detect_provider=False,
)
llm._provider = AnthropicProvider()
Memory not compressing¶
Check that your message count exceeds the threshold:
history = HeadroomChatMessageHistory(
base_history,
compress_threshold_tokens=1000, # Lower threshold
keep_recent_turns=2, # Fewer preserved turns
)
Performance Tips¶
- Use tool wrapping for agents - Agents with tools benefit most from compression
- Set appropriate thresholds - Don't compress small conversations
- Enable diversity for RAG -
prefer_diverse=Trueimproves answer quality - Monitor with LangSmith - Use the callback handler to track savings over time
- Batch similar requests - Provider caching works better with stable prefixes