$ cat node-template.py
Test File Generator
// Generates test files of various types (image, audio, video) for testing the file handling pipeline.
Process
Utility
template.py
1import os2import sys3import json4import struct5import wave6import zlib7import traceback8from datetime import datetime910EXECUTION_CONTEXT = {11 "workspace_id": os.getenv("WORKSPACE_ID"),12 "node_id": os.getenv("NODE_ID"),13 "execution_id": os.getenv("EXECUTION_ID"),14}1516OUTPUT_DIR = "/data/output"171819def generate_png(width=200, height=150, label="test"):20 """Generate a simple PNG image using pure Python (no Pillow needed)."""21 try:22 from PIL import Image, ImageDraw23 img = Image.new("RGB", (width, height), color=(70, 130, 180))24 draw = ImageDraw.Draw(img)25 draw.rectangle([10, 10, width - 10, height - 10], outline=(255, 255, 255), width=2)26 draw.text((20, height // 2 - 10), label or "Test Image", fill=(255, 255, 255))27 path = os.path.join(OUTPUT_DIR, "image.png")28 img.save(path)29 return path30 except ImportError:31 # Fallback: generate a minimal valid PNG with raw bytes32 path = os.path.join(OUTPUT_DIR, "image.png")3334 def create_raw_png(w, h):35 """Create a minimal valid PNG file."""36 def chunk(chunk_type, data):37 c = chunk_type + data38 crc = struct.pack(">I", zlib.crc32(c) & 0xFFFFFFFF)39 return struct.pack(">I", len(data)) + c + crc4041 # Header42 header = b"\x89PNG\r\n\x1a\n"43 # IHDR44 ihdr_data = struct.pack(">IIBBBBB", w, h, 8, 2, 0, 0, 0)45 ihdr = chunk(b"IHDR", ihdr_data)46 # IDAT - blue pixels with filter byte47 raw_rows = b""48 for y in range(h):49 raw_rows += b"\x00" # filter none50 for x in range(w):51 raw_rows += bytes([70, 130, 180]) # RGB steel blue52 compressed = zlib.compress(raw_rows)53 idat = chunk(b"IDAT", compressed)54 # IEND55 iend = chunk(b"IEND", b"")56 return header + ihdr + idat + iend5758 png_data = create_raw_png(width, height)59 with open(path, "wb") as f:60 f.write(png_data)61 return path626364def generate_wav(duration_sec=1.0, frequency=440, sample_rate=22050):65 """Generate a WAV file with a sine wave tone."""66 import math6768 path = os.path.join(OUTPUT_DIR, "audio.wav")69 num_samples = int(sample_rate * duration_sec)7071 with wave.open(path, "w") as wav_file:72 wav_file.setnchannels(1)73 wav_file.setsampwidth(2)74 wav_file.setframerate(sample_rate)7576 for i in range(num_samples):77 t = i / sample_rate78 value = int(16000 * math.sin(2 * math.pi * frequency * t))79 wav_file.writeframes(struct.pack("<h", value))8081 return path828384def generate_video_placeholder():85 """Generate a minimal AVI-like file as a video placeholder."""86 path = os.path.join(OUTPUT_DIR, "video.avi")8788 # Create a minimal RIFF/AVI container with a single frame89 # This is a valid but minimal AVI file90 frame_data = b"\x00" * (100 * 75 * 3) # 100x75 RGB black frame9192 # Build AVI structure93 def riff_chunk(tag, data):94 return tag + struct.pack("<I", len(data)) + data9596 # Video stream header97 strh = struct.pack("<4sI4s4sIIIIIIIIhhhh8s",98 b"vids", 0, b"DIB ", b"\x00\x00\x00\x00",99 0, 0, 1, 15, 0, 1, len(frame_data), 0,100 0, 0, 0, 0, b"\x00" * 8)101102 bmp_header = struct.pack("<IiiHHIIiiII",103 40, 100, 75, 1, 24, 0, len(frame_data), 0, 0, 0, 0)104105 strf = bmp_header106107 strl = riff_chunk(b"LIST", b"strl" +108 riff_chunk(b"strh", strh) +109 riff_chunk(b"strf", strf))110111 avih = struct.pack("<IIIIIIIIIIIIII",112 66666, 0, 0, 0, 1, 0, 0, 0, 100, 75, 0, 0, 0, 0)113114 hdrl = riff_chunk(b"LIST", b"hdrl" +115 riff_chunk(b"avih", avih) + strl)116117 movi_data = riff_chunk(b"00dc", frame_data)118 movi = riff_chunk(b"LIST", b"movi" + movi_data)119120 avi_data = b"AVI " + hdrl + movi121 avi_file = riff_chunk(b"RIFF", avi_data)122123 with open(path, "wb") as f:124 f.write(avi_file)125126 return path127128129def main():130 try:131 input_json = sys.stdin.read()132 execution_input = json.loads(input_json)133134 inputs = execution_input.get("inputs", {})135 file_type = inputs.get("file_type", "all")136 label = inputs.get("label", "Test")137138 os.makedirs(OUTPUT_DIR, exist_ok=True)139140 outputs = {}141142 if file_type in ("all", "image"):143 generate_png(label=label)144 outputs["image"] = "image.png"145146 if file_type in ("all", "audio"):147 generate_wav()148 outputs["audio"] = "audio.wav"149150 if file_type in ("all", "video"):151 generate_video_placeholder()152 outputs["video"] = "video.avi"153154 # Log metadata to stderr155 print(f"Generated file_type={file_type}, label={label}", file=sys.stderr)156157 # Flat output — keys match OUTPUT_SCHEMA158 print(json.dumps(outputs, indent=2))159160 except Exception as e:161 error_output = {162 "error": str(e),163 "errorType": type(e).__name__,164 "traceback": traceback.format_exc(),165 "executionContext": EXECUTION_CONTEXT,166 }167 print(json.dumps(error_output), file=sys.stderr)168 sys.exit(1)169170171if __name__ == "__main__":172 main()