# DailyFinancialBrain_v12_SMART.py """ SMART ORCHESTRATION - Calculates capacity before processing - Smart queue management - AI image generation for missing thumbnails - Rate limit aware scheduling """ import json import os import urllib.request import urllib.parse import boto3 import time import logging import re import hashlib import base64 from datetime import datetime, timezone, timedelta from boto3.dynamodb.conditions import Key, Attr from decimal import Decimal logger = logging.getLogger() logger.setLevel(logging.INFO) # ===================================================================== # CONFIGURATION # ===================================================================== dynamodb = boto3.resource('dynamodb') secrets_client = boto3.client('secretsmanager') table = dynamodb.Table(os.environ.get('TABLE_NAME', 'FinancialNews')) # Model capabilities and rate limits AI_MODELS = { 'gemini-2.5-flash': { 'rpm': 10, 'rpd': 250, 'capabilities': ['text_analysis'], 'priority': 1 }, 'gemini-2.5-flash-lite': { 'rpm': 15, 'rpd': 1000, 'capabilities': ['text_analysis'], 'priority': 2 }, 'gemini-2.5-pro': { 'rpm': 2, 'rpd': 50, 'capabilities': ['text_analysis', 'complex_reasoning'], 'priority': 3 }, 'gemini-2.0-flash-lite': { 'rpm': 30, 'rpd': 200, 'capabilities': ['text_analysis'], 'priority': 4 } } # Image generation models IMAGE_MODELS = { 'imagen-3.0-generate-002': { 'rpm': 10, 'rpd': 100, 'capabilities': ['image_generation'] } } # S3 bucket for generated images S3_BUCKET = os.environ.get('IMAGE_BUCKET', 'news.vpnparamesh.click') s3_client = boto3.client('s3') # ===================================================================== # CAPACITY CALCULATOR (FIX #5) # ===================================================================== class CapacityPlanner: """Calculate available capacity before processing""" def __init__(self, execution_time_limit=540): # 9 minutes safety margin self.execution_time_limit = execution_time_limit self.start_time = time.time() self.requests_made = {model: 0 for model in AI_MODELS} self.image_requests_made = 0 def calculate_capacity(self, items_to_process): """Calculate how many items we can process given rate limits""" # Calculate total available requests per minute total_rpm = sum(m['rpm'] for m in AI_MODELS.values()) # Time available time_available = self.execution_time_limit # Max requests we can make max_text_requests = int((time_available / 60) * total_rpm * 0.7) # 70% safety factor # Image generation capacity if IMAGE_MODELS: img_model = list(IMAGE_MODELS.values())[0] max_image_requests = int((time_available / 60) * img_model['rpm'] * 0.5) else: max_image_requests = 0 # Items needing processing items_needing_analysis = len([i for i in items_to_process if not i.get('ai_status') == 'COMPLETED']) items_needing_images = len([i for i in items_to_process if not self._has_valid_image(i)]) plan = { 'total_items': len(items_to_process), 'items_needing_analysis': items_needing_analysis, 'items_needing_images': items_needing_images, 'max_text_requests': max_text_requests, 'max_image_requests': max_image_requests, 'text_to_process': min(items_needing_analysis, max_text_requests), 'images_to_generate': min(items_needing_images, max_image_requests), 'estimated_time': self._estimate_time( min(items_needing_analysis, max_text_requests), min(items_needing_images, max_image_requests) ) } logger.info(f"šŸ“Š CAPACITY PLAN:") logger.info(f" šŸ“ Text Analysis: {plan['text_to_process']}/{items_needing_analysis} items") logger.info(f" šŸŽØ Image Generation: {plan['images_to_generate']}/{items_needing_images} items") logger.info(f" ā±ļø Estimated Time: {plan['estimated_time']:.1f}s") return plan def _has_valid_image(self, item): img = item.get('image_url') or item.get('youtube_thumbnail_url') return img and len(img) > 20 and 'placeholder' not in img and img.startswith('http') def _estimate_time(self, text_count, image_count): # Assume 3 seconds per text request, 5 seconds per image return (text_count * 3) + (image_count * 5) def get_best_model(self, capability='text_analysis'): """Get the best available model for a capability""" available = [ (name, config) for name, config in AI_MODELS.items() if capability in config['capabilities'] and self.requests_made[name] < config['rpm'] ] if not available: return None # Sort by priority available.sort(key=lambda x: x[1]['priority']) return available[0][0] def record_request(self, model_name): if model_name in self.requests_made: self.requests_made[model_name] += 1 def time_remaining(self): return self.execution_time_limit - (time.time() - self.start_time) def should_continue(self): return self.time_remaining() > 30 # Keep 30s buffer # ===================================================================== # IMAGE GENERATION (FIX #3) # ===================================================================== class ImageGenerator: """Generate images for items without thumbnails""" def __init__(self, api_key): self.api_key = api_key def generate_image(self, item): """Generate an image based on news title""" if not self.api_key: return None title = item.get('title', '') source = item.get('source', '') item_type = item.get('type', 'news') # Create a prompt for image generation prompt = self._create_prompt(title, source, item_type) try: # Use Imagen API url = f"https://generativelanguage.googleapis.com/v1beta/models/imagen-3.0-generate-002:generateImage?key={self.api_key}" payload = { "prompt": prompt, "number_of_images": 1, "aspect_ratio": "16:9", "safety_filter_level": "BLOCK_MEDIUM_AND_ABOVE" } data = json.dumps(payload).encode('utf-8') headers = {'Content-Type': 'application/json'} req = urllib.request.Request(url, method='POST', data=data, headers=headers) with urllib.request.urlopen(req, timeout=30) as response: result = json.loads(response.read()) if result.get('images') and len(result['images']) > 0: # Get base64 image image_data = result['images'][0].get('image', {}).get('image_bytes') if image_data: # Upload to S3 return self._upload_to_s3(image_data, item) except urllib.error.HTTPError as e: if e.code == 429: logger.warning("šŸŽØ Image generation rate limited") else: logger.error(f"šŸŽØ Image generation error: {e.code}") except Exception as e: logger.error(f"šŸŽØ Image generation failed: {e}") return None def _create_prompt(self, title, source, item_type): """Create appropriate image prompt based on content""" # Extract key themes themes = { 'bitcoin': 'golden bitcoin cryptocurrency coin, digital finance', 'crypto': 'cryptocurrency digital coins, blockchain technology', 'stock': 'stock market trading floor, financial charts', 'market': 'global financial markets, stock exchange screens', 'gold': 'gold bars and coins, precious metals', 'oil': 'oil barrels and refinery, energy sector', 'fed': 'federal reserve building, central banking', 'inflation': 'rising prices chart, economic indicators', 'china': 'chinese financial district, asian markets', 'europe': 'european central bank, euro currency', 'tech': 'technology company headquarters, silicon valley', 'bank': 'modern bank building, financial institution', 'trade': 'global trade shipping containers, international commerce', 'war': 'geopolitical map, global tensions', 'rate': 'interest rate chart, monetary policy' } title_lower = title.lower() matched_theme = None for keyword, theme in themes.items(): if keyword in title_lower: matched_theme = theme break if not matched_theme: matched_theme = 'professional financial news, stock market, business' prompt = f"""Professional financial news thumbnail image showing {matched_theme}. Modern, clean design with dramatic lighting. Professional photography style, high quality. No text or watermarks. Corporate and trustworthy aesthetic. 16:9 aspect ratio, suitable for news website.""" return prompt def _upload_to_s3(self, image_base64, item): """Upload generated image to S3""" try: # Generate unique filename article_id = item.get('article_id', hashlib.md5(item.get('title', '').encode()).hexdigest()[:16]) filename = f"generated/{article_id}.png" # Decode base64 image_bytes = base64.b64decode(image_base64) # Upload to S3 s3_client.put_object( Bucket=S3_BUCKET, Key=filename, Body=image_bytes, ContentType='image/png', ACL='public-read' ) # Return public URL url = f"https://{S3_BUCKET}.s3.amazonaws.com/{filename}" logger.info(f"šŸŽØ Image uploaded: {url}") return url except Exception as e: logger.error(f"šŸŽØ S3 upload failed: {e}") return None # ===================================================================== # AI ANALYSIS # ===================================================================== class AIAnalyzer: """Analyze items with smart model selection""" def __init__(self, api_key, capacity_planner): self.api_key = api_key self.planner = capacity_planner def analyze(self, item): """Analyze item with best available model""" if not self.api_key: return None, 'PENDING' model = self.planner.get_best_model('text_analysis') if not model: logger.warning("āš ļø No available models") return None, 'PENDING' prompt = f"""Analyze this financial content for professional investors. Title: "{item.get('title', '')}" Source: "{item.get('source', '')}" Description: "{item.get('description', '')[:500]}" Return ONLY valid JSON: {{"summary":"2-3 sentence executive summary for investors","key_points":["actionable point 1","actionable point 2","actionable point 3"],"analyst_take":"Strategic portfolio implication","sentiment":"Bullish/Bearish/Neutral"}}""" payload = { "contents": [{"parts": [{"text": prompt}]}], "generationConfig": {"temperature": 0.7, "maxOutputTokens": 600} } url = f"https://generativelanguage.googleapis.com/v1beta/models/{model}:generateContent?key={self.api_key}" try: data = json.dumps(payload).encode('utf-8') headers = {'Content-Type': 'application/json'} req = urllib.request.Request(url, method='POST', data=data, headers=headers) with urllib.request.urlopen(req, timeout=30) as response: result = json.loads(response.read()) text = result['candidates'][0]['content']['parts'][0]['text'] clean = text.replace('``````', '').strip() parsed = json.loads(clean) if 'summary' in parsed: self.planner.record_request(model) logger.info(f"āœ… {model} analysis complete") return parsed, 'COMPLETED' except urllib.error.HTTPError as e: if e.code == 429: logger.warning(f"āš ļø {model} rate limited") except Exception as e: logger.error(f"āŒ Analysis error: {e}") return None, 'PENDING' # ===================================================================== # DATA FETCHING (Same as before) # ===================================================================== # [Keep all the fetch_youtube_videos, fetch_rss_feeds, deduplication code from v11] # ... (code omitted for brevity - use the same fetching logic from v11) # ===================================================================== # LAMBDA HANDLER # ===================================================================== def lambda_handler(event, context): """Main handler with smart orchestration""" logger.info("šŸš€ DailyFinancialBrain v12 SMART ORCHESTRATION") # Get secrets gemini_key, yt_key = get_secrets() # Initialize planner planner = CapacityPlanner(execution_time_limit=540) # Initialize engines analyzer = AIAnalyzer(gemini_key, planner) image_gen = ImageGenerator(gemini_key) # PHASE 1: AGGREGATE ALL ITEMS logger.info("\n" + "="*60) logger.info("PHASE 1: AGGREGATING ITEMS") logger.info("="*60) # Load existing items from DB existing_items = load_existing_items(days=3) # Fetch new items new_videos = fetch_youtube_videos(yt_key) new_rss = fetch_rss_feeds() new_items = new_videos + new_rss # Combine and deduplicate all_items = deduplicate_items(existing_items + new_items) logger.info(f"šŸ“Š Total unique items: {len(all_items)}") # PHASE 2: CAPACITY PLANNING (FIX #5) logger.info("\n" + "="*60) logger.info("PHASE 2: CAPACITY PLANNING") logger.info("="*60) plan = planner.calculate_capacity(all_items) # PHASE 3: PROCESS ITEMS logger.info("\n" + "="*60) logger.info("PHASE 3: PROCESSING") logger.info("="*60) # Sort items: prioritize new items and those without analysis items_to_analyze = [i for i in all_items if i.get('ai_status') != 'COMPLETED'][:plan['text_to_process']] items_needing_images = [i for i in all_items if not has_valid_image(i)][:plan['images_to_generate']] processed = {'analysis': 0, 'images': 0} # Process text analysis for idx, item in enumerate(items_to_analyze): if not planner.should_continue(): logger.warning("ā° Time limit approaching, stopping") break logger.info(f"šŸ“ [{idx+1}/{len(items_to_analyze)}] Analyzing: {item.get('title', '')[:40]}...") ai_data, status = analyzer.analyze(item) if status == 'COMPLETED' and ai_data: item['ai_status'] = 'COMPLETED' item['ai_summary'] = ai_data.get('summary', '') item['ai_points'] = ai_data.get('key_points', []) item['ai_take'] = ai_data.get('analyst_take', '') item['ai_sentiment'] = ai_data.get('sentiment', 'Neutral') processed['analysis'] += 1 # Save to DB save_item(item) # Brief pause between requests time.sleep(2) # Process image generation for idx, item in enumerate(items_needing_images): if not planner.should_continue(): break logger.info(f"šŸŽØ [{idx+1}/{len(items_needing_images)}] Generating image: {item.get('title', '')[:40]}...") image_url = image_gen.generate_image(item) if image_url: item['image_url'] = image_url item['image_generated'] = True processed['images'] += 1 # Update in DB save_item(item) time.sleep(3) # Summary summary = { 'status': 'success', 'total_items': len(all_items), 'plan': plan, 'processed': processed, 'time_used': time.time() - planner.start_time } logger.info(f"\nšŸ“Š SUMMARY: {json.dumps(summary, default=str)}") return { 'statusCode': 200, 'body': json.dumps(summary, default=str) } # Helper functions def get_secrets(): try: response = secrets_client.get_secret_value(SecretId='FinancialKeys') secrets = json.loads(response['SecretString']) return secrets.get('GEMINI_API_KEY'), secrets.get('YT_API_KEY') except: return None, None def load_existing_items(days=3): items = [] today = datetime.now(timezone.utc) for i in range(days): date_str = (today - timedelta(days=i)).strftime('%Y-%m-%d') try: response = table.query( KeyConditionExpression='#date = :date', ExpressionAttributeNames={'#date': 'date'}, ExpressionAttributeValues={':date': date_str} ) items.extend(response.get('Items', [])) except: pass return items def has_valid_image(item): img = item.get('image_url') or item.get('youtube_thumbnail_url') return img and len(img) > 20 and 'placeholder' not in img def deduplicate_items(items): seen = set() result = [] for item in items: key = item.get('url') or item.get('title', '') if key and key.lower() not in seen: seen.add(key.lower()) result.append(item) return result def save_item(item): try: item['ingested_at'] = datetime.now(timezone.utc).isoformat() table.put_item(Item=item) except Exception as e: logger.error(f"āŒ Save error: {e}")