$ cat node-template.py

Recursive Summarizer

// Recursively summarize large text content using LLM with automatic chunking for content exceeding context limits

Process
LLM
template.py
1import os2import sys3import json4import traceback5import asyncio6import httpx7import time8from tenacity import (9    retry,10    stop_after_attempt,11    wait_exponential,12    retry_if_exception_type,13    retry_if_result,14)1516# Environment variables automatically set by workspace executor17LITELLM_BASE_URL = os.getenv("OPENAI_BASE_URL")18LITELLM_API_KEY = os.getenv("LITELLM_API_KEY")1920if not LITELLM_BASE_URL:21    raise ValueError("OPENAI_BASE_URL environment variable not set")22if not LITELLM_API_KEY:23    raise ValueError("LITELLM_API_KEY environment variable not set")2425# Retry configuration (matching background-task/llm.py)26LLM_TIMEOUT = 3600  # 1 hour (increased for long-running summarization tasks)27LLM_MAX_RETRIES = 528LLM_RETRY_MIN_WAIT = 2  # seconds29LLM_RETRY_MAX_WAIT = 10  # seconds30LLM_RETRY_MULTIPLIER = 23132# Model context limits and token estimation parameters33MODEL_PRESETS = {34    "llama3.1-8b-instruct": {35        "context": 128000,36        "safe_input": 115000,37        "chars_per_token": 4,38    },39    "llama3.3-70b-instruct": {40        "context": 128000,41        "safe_input": 115000,42        "chars_per_token": 4,43    },44    "nemotron-super-49b-1_5-FP8": {45        "context": 32768,46        "safe_input": 28000,47        "chars_per_token": 4,48    },49    "gpt-oss-120b": {50        "context": 65536,51        "safe_input": 57000,52        "chars_per_token": 4,53    },54    "openai/gpt-oss-120b": {55        "context": 65536,56        "safe_input": 57000,57        "chars_per_token": 4,58    },59    "deepseek-chat": {60        "context": 64000,61        "safe_input": 58000,62        "chars_per_token": 4,63    },64    "deepseek-reasoner": {65        "context": 64000,66        "safe_input": 58000,67        "chars_per_token": 4,68    },69    "gemini-2.5-flash": {70        "context": 1000000,71        "safe_input": 950000,72        "chars_per_token": 4,73    },74    "gemini-2.5-pro": {75        "context": 2000000,76        "safe_input": 1950000,77        "chars_per_token": 4,78    },79}808182def estimate_tokens(text: str, model: str) -> int:83    """84    Estimate token count from character count.8586    Uses model-specific chars_per_token ratio (default 4).87    Conservative approximation for English text.88    """89    preset = MODEL_PRESETS.get(model, MODEL_PRESETS["gpt-oss-120b"])90    chars_per_token = preset["chars_per_token"]91    return len(text) // chars_per_token929394def estimate_safe_char_count(model: str) -> int:95    """Get safe character count for a model"""96    preset = MODEL_PRESETS.get(model, MODEL_PRESETS["gpt-oss-120b"])97    return preset["safe_input"] * preset["chars_per_token"]9899100async def call_llm_chat_async(messages: list, model: str, temperature: float = 0, semaphore: asyncio.Semaphore = None) -> str:101    """102    Call LiteLLM proxy using OpenAI-compatible API (async version).103104    Args:105        messages: List of message dicts with role and content106        model: Model name from litellm config107        temperature: Sampling temperature (0 = deterministic)108        semaphore: Optional semaphore for rate limiting109110    Returns:111        LLM response text112    """113    headers = {114        "Authorization": f"Bearer {LITELLM_API_KEY}",115        "Content-Type": "application/json",116    }117118    payload = {119        "model": model,120        "messages": messages,121        "temperature": temperature,122    }123124    url = f"{LITELLM_BASE_URL}/v1/chat/completions"125126    # Use semaphore if provided for rate limiting127    async def _make_request():128        async with httpx.AsyncClient(timeout=LLM_TIMEOUT) as client:129            response = await client.post(url, headers=headers, json=payload)130            response.raise_for_status()131            result = response.json()132            return result["choices"][0]["message"]["content"].strip()133134    if semaphore:135        async with semaphore:136            return await _make_request()137    else:138        return await _make_request()139140141def split_content_into_chunks(content: str, num_chunks: int, model: str) -> list:142    """143    Split content into roughly equal chunks by character count.144145    Tries to split on paragraph boundaries when possible.146147    Args:148        content: Text to split149        num_chunks: Number of chunks to create150        model: Model name for char limit calculation151152    Returns:153        List of text chunks154    """155    safe_chars = estimate_safe_char_count(model)156    chunk_size = min(len(content) // num_chunks, safe_chars)157158    chunks = []159    current_pos = 0160161    while current_pos < len(content):162        # Calculate end position163        end_pos = min(current_pos + chunk_size, len(content))164165        # Try to find a good break point (paragraph, sentence, space)166        if end_pos < len(content):167            # Look for paragraph break168            para_break = content.rfind("\n\n", current_pos, end_pos)169            if para_break > current_pos + chunk_size // 2:170                end_pos = para_break + 2171            else:172                # Look for sentence break173                sent_break = content.rfind(". ", current_pos, end_pos)174                if sent_break > current_pos + chunk_size // 2:175                    end_pos = sent_break + 2176                else:177                    # Look for any whitespace178                    space_break = content.rfind(" ", current_pos, end_pos)179                    if space_break > current_pos + chunk_size // 2:180                        end_pos = space_break + 1181182        chunk = content[current_pos:end_pos].strip()183        if chunk:184            chunks.append(chunk)185186        current_pos = end_pos187188    return chunks189190191async def recursive_summarize(192    content: str,193    user_prompt: str,194    model: str,195    level: int = 0,196    max_levels: int = 10,197    max_concurrent: int = 5198) -> str:199    """200    Recursively summarize content using hierarchical reduction with parallel chunk processing.201202    Algorithm:203    1. Estimate tokens in content204    2. If fits in context → summarize directly205    3. If exceeds → split into chunks, summarize in parallel, combine, recurse206    4. Safety: max 10 levels, then truncate207208    Args:209        content: Text to summarize210        user_prompt: User's summarization instructions211        model: LLM model name212        level: Current recursion level (0-indexed)213        max_levels: Maximum recursion depth214        max_concurrent: Maximum concurrent LLM requests (default: 5)215216    Returns:217        Summary text218    """219    estimated_tokens = estimate_tokens(content, model)220    preset = MODEL_PRESETS.get(model, MODEL_PRESETS["gpt-oss-120b"])221    safe_input = preset["safe_input"]222223    # Log progress to stderr224    print(f"[Level {level}] Estimated tokens: {estimated_tokens:,}, Safe limit: {safe_input:,}", file=sys.stderr)225226    # Base case: content fits in context227    if estimated_tokens <= safe_input:228        print(f"[Level {level}] Content fits in context, generating summary...", file=sys.stderr)229230        system_prompt = (231            "You are an expert summarizer. Extract all key information, "232            "main points, and important details. Preserve critical facts and context."233        )234235        user_message = (236            f"<user_prompt>{user_prompt}</user_prompt>\n\n"237            f"Content to summarize:\n\n"238            f"<content>{content}</content>"239        )240241        messages = [242            {"role": "system", "content": system_prompt},243            {"role": "user", "content": user_message},244        ]245246        return await call_llm_chat_async(messages, model, temperature=0)247248    # Safety limit: max recursion depth249    if level >= max_levels:250        print(f"[Level {level}] ⚠️  Max recursion depth reached, truncating content...", file=sys.stderr)251252        # Truncate to safe char count253        safe_chars = estimate_safe_char_count(model)254        truncated = content[:safe_chars]255256        system_prompt = (257            "You are an expert summarizer. Extract all key information, "258            "main points, and important details. Preserve critical facts and context. "259            "Note: Content was truncated due to length."260        )261262        user_message = (263            f"<user_prompt>{user_prompt}</user_prompt>\n\n"264            f"Content to summarize (truncated):\n\n"265            f"<content>{truncated}</content>"266        )267268        messages = [269            {"role": "system", "content": system_prompt},270            {"role": "user", "content": user_message},271        ]272273        return await call_llm_chat_async(messages, model, temperature=0)274275    # Recursive case: split and reduce with parallel processing276    reduction_needed = estimated_tokens / safe_input277278    # Adaptive group size (from background-task tool.py pattern)279    if reduction_needed > 8:280        num_chunks = 5281    elif reduction_needed > 4:282        num_chunks = 4283    else:284        num_chunks = 3285286    print(f"[Level {level}] Content exceeds limit ({reduction_needed:.2f}x), splitting into {num_chunks} chunks...", file=sys.stderr)287288    # Split content289    chunks = split_content_into_chunks(content, num_chunks, model)290    print(f"[Level {level}] Split into {len(chunks)} chunks", file=sys.stderr)291292    # Create semaphore for rate limiting293    semaphore = asyncio.Semaphore(max_concurrent)294295    # Summarize each chunk in parallel296    async def summarize_chunk(i: int, chunk: str) -> str:297        """Summarize a single chunk with error handling"""298        try:299            start_time = time.time()300            timestamp = time.strftime("%H:%M:%S", time.localtime(start_time))301            print(f"[Level {level}] [{timestamp}] 🚀 STARTING chunk {i+1}/{len(chunks)} (task launched)", file=sys.stderr)302303            summary_prompt = "Create a comprehensive summary that preserves all key information and important details."304            system_prompt = "You are an expert summarizer."305306            user_message = (307                f"<summary_prompt>{summary_prompt}</summary_prompt>\n\n"308                f"Content:\n\n"309                f"<content>{chunk}</content>"310            )311312            messages = [313                {"role": "system", "content": system_prompt},314                {"role": "user", "content": user_message},315            ]316317            llm_start = time.time()318            llm_timestamp = time.strftime("%H:%M:%S", time.localtime(llm_start))319            print(f"[Level {level}] [{llm_timestamp}] 📡 Sending LLM request for chunk {i+1}/{len(chunks)}", file=sys.stderr)320321            summary = await call_llm_chat_async(messages, model, temperature=0, semaphore=semaphore)322323            end_time = time.time()324            end_timestamp = time.strftime("%H:%M:%S", time.localtime(end_time))325            duration = end_time - start_time326            print(f"[Level {level}] [{end_timestamp}] ✅ COMPLETED chunk {i+1}/{len(chunks)} (took {duration:.2f}s)", file=sys.stderr)327            return summary328329        except Exception as e:330            error_time = time.time()331            error_timestamp = time.strftime("%H:%M:%S", time.localtime(error_time))332            print(f"[Level {level}] [{error_timestamp}] ⚠️  Error in chunk {i+1}: {e}", file=sys.stderr)333            # Return truncated chunk as fallback334            return f"[Partial] {chunk[:500]}..."335336    batch_start = time.time()337    batch_timestamp = time.strftime("%H:%M:%S", time.localtime(batch_start))338    print(f"[Level {level}] [{batch_timestamp}] ⚡ LAUNCHING {len(chunks)} chunks in PARALLEL (max {max_concurrent} concurrent)...", file=sys.stderr)339340    # Execute all chunk summaries in parallel341    chunk_summaries = await asyncio.gather(342        *[summarize_chunk(i, chunk) for i, chunk in enumerate(chunks)],343        return_exceptions=False  # Errors are handled in summarize_chunk344    )345346    batch_end = time.time()347    batch_end_timestamp = time.strftime("%H:%M:%S", time.localtime(batch_end))348    batch_duration = batch_end - batch_start349    print(f"[Level {level}] [{batch_end_timestamp}] 🎉 ALL {len(chunks)} chunks completed in {batch_duration:.2f}s (speedup: {len(chunks)}x vs sequential)", file=sys.stderr)350351    # Combine summaries and recurse352    combined = "\n\n".join(chunk_summaries)353    combined_tokens = estimate_tokens(combined, model)354355    print(f"[Level {level}] Combined {len(chunk_summaries)} summaries ({combined_tokens:,} tokens), recursing to level {level+1}...", file=sys.stderr)356357    return await recursive_summarize(358        combined,359        user_prompt,360        model,361        level=level + 1,362        max_levels=max_levels,363        max_concurrent=max_concurrent364    )365366367def main():368    """Main execution function"""369    try:370        # Read execution input from stdin371        input_json = sys.stdin.read()372        execution_input = json.loads(input_json)373374        # Extract inputs375        inputs = execution_input.get("inputs", {})376        content = inputs.get("content")377        llm_model = inputs.get("llmModel")378        prompt = inputs.get("prompt")379380        # Validate inputs381        if not content:382            raise ValueError("Required input 'content' not provided")383        if not llm_model:384            raise ValueError("Required input 'llmModel' not provided")385        if not prompt:386            raise ValueError("Required input 'prompt' not provided")387388        # Validate model389        if llm_model not in MODEL_PRESETS:390            raise ValueError(f"Unsupported model: {llm_model}")391392        print("="*80, file=sys.stderr)393        print(f"🚀 RECURSIVE SUMMARIZER v11 - Starting (Parallel Processing + Timing Logs)", file=sys.stderr)394        print("="*80, file=sys.stderr)395        print(f"📋 Selected Model ID: '{llm_model}'", file=sys.stderr)396        print(f"📄 Content length: {len(content):,} characters", file=sys.stderr)397        print(f"🔢 Estimated tokens: {estimate_tokens(content, llm_model):,}", file=sys.stderr)398        print(f"⚡ Max concurrent requests: 5", file=sys.stderr)399        print("="*80, file=sys.stderr)400401        # Perform recursive summarization with async execution402        summary = asyncio.run(recursive_summarize(content, prompt, llm_model, max_concurrent=5))403404        print(f"✓ Summarization complete! Summary length: {len(summary):,} characters", file=sys.stderr)405406        # Prepare output matching OUTPUT_SCHEMA407        output = {408            "summary": summary,409        }410411        # Write output to stdout412        print(json.dumps(output, indent=2))413414    except httpx.HTTPStatusError as e:415        # HTTP error from LiteLLM API416        error_output = {417            "error": f"LLM API request failed: {e}",418            "errorType": "HTTPError",419            "status": e.response.status_code,420            "detail": e.response.text[:500] if hasattr(e.response, 'text') else str(e),421        }422        print(json.dumps(error_output), file=sys.stderr)423        sys.exit(1)424    except Exception as e:425        # Other errors426        error_output = {427            "error": str(e),428            "errorType": type(e).__name__,429            "traceback": traceback.format_exc(),430        }431        print(json.dumps(error_output), file=sys.stderr)432        sys.exit(1)433434435if __name__ == "__main__":436    main()