$ cat node-template.py
Recursive Summarizer
// Recursively summarize large text content using LLM with automatic chunking for content exceeding context limits
Process
LLM
template.py
1import os2import sys3import json4import traceback5import asyncio6import httpx7import time8from tenacity import (9 retry,10 stop_after_attempt,11 wait_exponential,12 retry_if_exception_type,13 retry_if_result,14)1516# Environment variables automatically set by workspace executor17LITELLM_BASE_URL = os.getenv("OPENAI_BASE_URL")18LITELLM_API_KEY = os.getenv("LITELLM_API_KEY")1920if not LITELLM_BASE_URL:21 raise ValueError("OPENAI_BASE_URL environment variable not set")22if not LITELLM_API_KEY:23 raise ValueError("LITELLM_API_KEY environment variable not set")2425# Retry configuration (matching background-task/llm.py)26LLM_TIMEOUT = 3600 # 1 hour (increased for long-running summarization tasks)27LLM_MAX_RETRIES = 528LLM_RETRY_MIN_WAIT = 2 # seconds29LLM_RETRY_MAX_WAIT = 10 # seconds30LLM_RETRY_MULTIPLIER = 23132# Model context limits and token estimation parameters33MODEL_PRESETS = {34 "llama3.1-8b-instruct": {35 "context": 128000,36 "safe_input": 115000,37 "chars_per_token": 4,38 },39 "llama3.3-70b-instruct": {40 "context": 128000,41 "safe_input": 115000,42 "chars_per_token": 4,43 },44 "nemotron-super-49b-1_5-FP8": {45 "context": 32768,46 "safe_input": 28000,47 "chars_per_token": 4,48 },49 "gpt-oss-120b": {50 "context": 65536,51 "safe_input": 57000,52 "chars_per_token": 4,53 },54 "openai/gpt-oss-120b": {55 "context": 65536,56 "safe_input": 57000,57 "chars_per_token": 4,58 },59 "deepseek-chat": {60 "context": 64000,61 "safe_input": 58000,62 "chars_per_token": 4,63 },64 "deepseek-reasoner": {65 "context": 64000,66 "safe_input": 58000,67 "chars_per_token": 4,68 },69 "gemini-2.5-flash": {70 "context": 1000000,71 "safe_input": 950000,72 "chars_per_token": 4,73 },74 "gemini-2.5-pro": {75 "context": 2000000,76 "safe_input": 1950000,77 "chars_per_token": 4,78 },79}808182def estimate_tokens(text: str, model: str) -> int:83 """84 Estimate token count from character count.8586 Uses model-specific chars_per_token ratio (default 4).87 Conservative approximation for English text.88 """89 preset = MODEL_PRESETS.get(model, MODEL_PRESETS["gpt-oss-120b"])90 chars_per_token = preset["chars_per_token"]91 return len(text) // chars_per_token929394def estimate_safe_char_count(model: str) -> int:95 """Get safe character count for a model"""96 preset = MODEL_PRESETS.get(model, MODEL_PRESETS["gpt-oss-120b"])97 return preset["safe_input"] * preset["chars_per_token"]9899100async def call_llm_chat_async(messages: list, model: str, temperature: float = 0, semaphore: asyncio.Semaphore = None) -> str:101 """102 Call LiteLLM proxy using OpenAI-compatible API (async version).103104 Args:105 messages: List of message dicts with role and content106 model: Model name from litellm config107 temperature: Sampling temperature (0 = deterministic)108 semaphore: Optional semaphore for rate limiting109110 Returns:111 LLM response text112 """113 headers = {114 "Authorization": f"Bearer {LITELLM_API_KEY}",115 "Content-Type": "application/json",116 }117118 payload = {119 "model": model,120 "messages": messages,121 "temperature": temperature,122 }123124 url = f"{LITELLM_BASE_URL}/v1/chat/completions"125126 # Use semaphore if provided for rate limiting127 async def _make_request():128 async with httpx.AsyncClient(timeout=LLM_TIMEOUT) as client:129 response = await client.post(url, headers=headers, json=payload)130 response.raise_for_status()131 result = response.json()132 return result["choices"][0]["message"]["content"].strip()133134 if semaphore:135 async with semaphore:136 return await _make_request()137 else:138 return await _make_request()139140141def split_content_into_chunks(content: str, num_chunks: int, model: str) -> list:142 """143 Split content into roughly equal chunks by character count.144145 Tries to split on paragraph boundaries when possible.146147 Args:148 content: Text to split149 num_chunks: Number of chunks to create150 model: Model name for char limit calculation151152 Returns:153 List of text chunks154 """155 safe_chars = estimate_safe_char_count(model)156 chunk_size = min(len(content) // num_chunks, safe_chars)157158 chunks = []159 current_pos = 0160161 while current_pos < len(content):162 # Calculate end position163 end_pos = min(current_pos + chunk_size, len(content))164165 # Try to find a good break point (paragraph, sentence, space)166 if end_pos < len(content):167 # Look for paragraph break168 para_break = content.rfind("\n\n", current_pos, end_pos)169 if para_break > current_pos + chunk_size // 2:170 end_pos = para_break + 2171 else:172 # Look for sentence break173 sent_break = content.rfind(". ", current_pos, end_pos)174 if sent_break > current_pos + chunk_size // 2:175 end_pos = sent_break + 2176 else:177 # Look for any whitespace178 space_break = content.rfind(" ", current_pos, end_pos)179 if space_break > current_pos + chunk_size // 2:180 end_pos = space_break + 1181182 chunk = content[current_pos:end_pos].strip()183 if chunk:184 chunks.append(chunk)185186 current_pos = end_pos187188 return chunks189190191async def recursive_summarize(192 content: str,193 user_prompt: str,194 model: str,195 level: int = 0,196 max_levels: int = 10,197 max_concurrent: int = 5198) -> str:199 """200 Recursively summarize content using hierarchical reduction with parallel chunk processing.201202 Algorithm:203 1. Estimate tokens in content204 2. If fits in context → summarize directly205 3. If exceeds → split into chunks, summarize in parallel, combine, recurse206 4. Safety: max 10 levels, then truncate207208 Args:209 content: Text to summarize210 user_prompt: User's summarization instructions211 model: LLM model name212 level: Current recursion level (0-indexed)213 max_levels: Maximum recursion depth214 max_concurrent: Maximum concurrent LLM requests (default: 5)215216 Returns:217 Summary text218 """219 estimated_tokens = estimate_tokens(content, model)220 preset = MODEL_PRESETS.get(model, MODEL_PRESETS["gpt-oss-120b"])221 safe_input = preset["safe_input"]222223 # Log progress to stderr224 print(f"[Level {level}] Estimated tokens: {estimated_tokens:,}, Safe limit: {safe_input:,}", file=sys.stderr)225226 # Base case: content fits in context227 if estimated_tokens <= safe_input:228 print(f"[Level {level}] Content fits in context, generating summary...", file=sys.stderr)229230 system_prompt = (231 "You are an expert summarizer. Extract all key information, "232 "main points, and important details. Preserve critical facts and context."233 )234235 user_message = (236 f"<user_prompt>{user_prompt}</user_prompt>\n\n"237 f"Content to summarize:\n\n"238 f"<content>{content}</content>"239 )240241 messages = [242 {"role": "system", "content": system_prompt},243 {"role": "user", "content": user_message},244 ]245246 return await call_llm_chat_async(messages, model, temperature=0)247248 # Safety limit: max recursion depth249 if level >= max_levels:250 print(f"[Level {level}] ⚠️ Max recursion depth reached, truncating content...", file=sys.stderr)251252 # Truncate to safe char count253 safe_chars = estimate_safe_char_count(model)254 truncated = content[:safe_chars]255256 system_prompt = (257 "You are an expert summarizer. Extract all key information, "258 "main points, and important details. Preserve critical facts and context. "259 "Note: Content was truncated due to length."260 )261262 user_message = (263 f"<user_prompt>{user_prompt}</user_prompt>\n\n"264 f"Content to summarize (truncated):\n\n"265 f"<content>{truncated}</content>"266 )267268 messages = [269 {"role": "system", "content": system_prompt},270 {"role": "user", "content": user_message},271 ]272273 return await call_llm_chat_async(messages, model, temperature=0)274275 # Recursive case: split and reduce with parallel processing276 reduction_needed = estimated_tokens / safe_input277278 # Adaptive group size (from background-task tool.py pattern)279 if reduction_needed > 8:280 num_chunks = 5281 elif reduction_needed > 4:282 num_chunks = 4283 else:284 num_chunks = 3285286 print(f"[Level {level}] Content exceeds limit ({reduction_needed:.2f}x), splitting into {num_chunks} chunks...", file=sys.stderr)287288 # Split content289 chunks = split_content_into_chunks(content, num_chunks, model)290 print(f"[Level {level}] Split into {len(chunks)} chunks", file=sys.stderr)291292 # Create semaphore for rate limiting293 semaphore = asyncio.Semaphore(max_concurrent)294295 # Summarize each chunk in parallel296 async def summarize_chunk(i: int, chunk: str) -> str:297 """Summarize a single chunk with error handling"""298 try:299 start_time = time.time()300 timestamp = time.strftime("%H:%M:%S", time.localtime(start_time))301 print(f"[Level {level}] [{timestamp}] 🚀 STARTING chunk {i+1}/{len(chunks)} (task launched)", file=sys.stderr)302303 summary_prompt = "Create a comprehensive summary that preserves all key information and important details."304 system_prompt = "You are an expert summarizer."305306 user_message = (307 f"<summary_prompt>{summary_prompt}</summary_prompt>\n\n"308 f"Content:\n\n"309 f"<content>{chunk}</content>"310 )311312 messages = [313 {"role": "system", "content": system_prompt},314 {"role": "user", "content": user_message},315 ]316317 llm_start = time.time()318 llm_timestamp = time.strftime("%H:%M:%S", time.localtime(llm_start))319 print(f"[Level {level}] [{llm_timestamp}] 📡 Sending LLM request for chunk {i+1}/{len(chunks)}", file=sys.stderr)320321 summary = await call_llm_chat_async(messages, model, temperature=0, semaphore=semaphore)322323 end_time = time.time()324 end_timestamp = time.strftime("%H:%M:%S", time.localtime(end_time))325 duration = end_time - start_time326 print(f"[Level {level}] [{end_timestamp}] ✅ COMPLETED chunk {i+1}/{len(chunks)} (took {duration:.2f}s)", file=sys.stderr)327 return summary328329 except Exception as e:330 error_time = time.time()331 error_timestamp = time.strftime("%H:%M:%S", time.localtime(error_time))332 print(f"[Level {level}] [{error_timestamp}] ⚠️ Error in chunk {i+1}: {e}", file=sys.stderr)333 # Return truncated chunk as fallback334 return f"[Partial] {chunk[:500]}..."335336 batch_start = time.time()337 batch_timestamp = time.strftime("%H:%M:%S", time.localtime(batch_start))338 print(f"[Level {level}] [{batch_timestamp}] ⚡ LAUNCHING {len(chunks)} chunks in PARALLEL (max {max_concurrent} concurrent)...", file=sys.stderr)339340 # Execute all chunk summaries in parallel341 chunk_summaries = await asyncio.gather(342 *[summarize_chunk(i, chunk) for i, chunk in enumerate(chunks)],343 return_exceptions=False # Errors are handled in summarize_chunk344 )345346 batch_end = time.time()347 batch_end_timestamp = time.strftime("%H:%M:%S", time.localtime(batch_end))348 batch_duration = batch_end - batch_start349 print(f"[Level {level}] [{batch_end_timestamp}] 🎉 ALL {len(chunks)} chunks completed in {batch_duration:.2f}s (speedup: {len(chunks)}x vs sequential)", file=sys.stderr)350351 # Combine summaries and recurse352 combined = "\n\n".join(chunk_summaries)353 combined_tokens = estimate_tokens(combined, model)354355 print(f"[Level {level}] Combined {len(chunk_summaries)} summaries ({combined_tokens:,} tokens), recursing to level {level+1}...", file=sys.stderr)356357 return await recursive_summarize(358 combined,359 user_prompt,360 model,361 level=level + 1,362 max_levels=max_levels,363 max_concurrent=max_concurrent364 )365366367def main():368 """Main execution function"""369 try:370 # Read execution input from stdin371 input_json = sys.stdin.read()372 execution_input = json.loads(input_json)373374 # Extract inputs375 inputs = execution_input.get("inputs", {})376 content = inputs.get("content")377 llm_model = inputs.get("llmModel")378 prompt = inputs.get("prompt")379380 # Validate inputs381 if not content:382 raise ValueError("Required input 'content' not provided")383 if not llm_model:384 raise ValueError("Required input 'llmModel' not provided")385 if not prompt:386 raise ValueError("Required input 'prompt' not provided")387388 # Validate model389 if llm_model not in MODEL_PRESETS:390 raise ValueError(f"Unsupported model: {llm_model}")391392 print("="*80, file=sys.stderr)393 print(f"🚀 RECURSIVE SUMMARIZER v11 - Starting (Parallel Processing + Timing Logs)", file=sys.stderr)394 print("="*80, file=sys.stderr)395 print(f"📋 Selected Model ID: '{llm_model}'", file=sys.stderr)396 print(f"📄 Content length: {len(content):,} characters", file=sys.stderr)397 print(f"🔢 Estimated tokens: {estimate_tokens(content, llm_model):,}", file=sys.stderr)398 print(f"⚡ Max concurrent requests: 5", file=sys.stderr)399 print("="*80, file=sys.stderr)400401 # Perform recursive summarization with async execution402 summary = asyncio.run(recursive_summarize(content, prompt, llm_model, max_concurrent=5))403404 print(f"✓ Summarization complete! Summary length: {len(summary):,} characters", file=sys.stderr)405406 # Prepare output matching OUTPUT_SCHEMA407 output = {408 "summary": summary,409 }410411 # Write output to stdout412 print(json.dumps(output, indent=2))413414 except httpx.HTTPStatusError as e:415 # HTTP error from LiteLLM API416 error_output = {417 "error": f"LLM API request failed: {e}",418 "errorType": "HTTPError",419 "status": e.response.status_code,420 "detail": e.response.text[:500] if hasattr(e.response, 'text') else str(e),421 }422 print(json.dumps(error_output), file=sys.stderr)423 sys.exit(1)424 except Exception as e:425 # Other errors426 error_output = {427 "error": str(e),428 "errorType": type(e).__name__,429 "traceback": traceback.format_exc(),430 }431 print(json.dumps(error_output), file=sys.stderr)432 sys.exit(1)433434435if __name__ == "__main__":436 main()