import re import json import unicodedata import io import urllib.parse import feedparser import pandas as pd from flask import current_app from gradio_client import Client from PIL import Image import base64 class ContentService: """Service for AI content generation using Hugging Face models.""" def __init__(self, hugging_key=None): # Store the hugging_key to be used later when needed # This avoids accessing current_app during initialization self.hugging_key = hugging_key # Initialize the Gradio client lazily - only when first needed self.client = None def _initialize_client(self): """Initialize the Gradio client, either with provided key or from app config.""" if self.client is None: # If hugging_key wasn't provided at initialization, try to get it now if not self.hugging_key: try: self.hugging_key = current_app.config.get('HUGGING_KEY') except RuntimeError: # We're outside of an application context raise RuntimeError("Hugging Face API key not provided and not available in app config. " "Please provide the key when initializing ContentService.") self.client = Client("Zelyanoth/Linkedin_poster_dev", hf_token=self.hugging_key) def validate_unicode_content(self, content): """Validate Unicode content while preserving original formatting and spaces.""" if not content or not isinstance(content, str): return content try: # Test if content can be encoded as UTF-8 content.encode('utf-8') return content # Return original content if it's valid UTF-8 except UnicodeEncodeError: try: # If encoding fails, try to preserve as much as possible return content.encode('utf-8', errors='replace').decode('utf-8') except: # Ultimate fallback return str(content) def preserve_formatting(self, content): """Preserve spaces, line breaks, and paragraph formatting.""" if not content: return content # Preserve all whitespace characters including spaces, tabs, and newlines # This ensures that paragraph breaks and indentation are maintained try: # Test encoding first content.encode('utf-8') return content except UnicodeEncodeError: # Fallback with error replacement but preserve whitespace return content.encode('utf-8', errors='replace').decode('utf-8') def sanitize_content_for_api(self, content): """Sanitize content for API calls while preserving original text, spaces, and formatting.""" if not content: return content # First preserve formatting and spaces preserved = self.preserve_formatting(content) # Only validate Unicode, don't remove spaces or formatting validated = self.validate_unicode_content(preserved) # Only remove null bytes that might cause issues in API calls if '\x00' in validated: validated = validated.replace('\x00', '') # Ensure line breaks and spaces are preserved validated = validated.replace('\r\n', '\n').replace('\r', '\n') return validated def _is_base64_image(self, data): """Check if the data is a base64 encoded image string.""" if not isinstance(data, str): return False # Check if it starts with data URL prefix if data.startswith('data:image/'): return True # Try to decode as base64 try: # Extract base64 part if it's a data URL if ',' in data: base64_part = data.split(',')[1] else: base64_part = data # Try to decode base64.b64decode(base64_part, validate=True) return True except Exception: return False def _base64_to_bytes(self, base64_string): """Convert a base64 encoded string to bytes.""" try: # If it's a data URL, extract the base64 part if base64_string.startswith('data:image/'): base64_part = base64_string.split(',')[1] else: base64_part = base64_string # Decode base64 to bytes return base64.b64decode(base64_part, validate=True) except Exception as e: current_app.logger.error(f"Failed to decode base64 image: {str(e)}") raise Exception(f"Failed to decode base64 image: {str(e)}") def generate_post_content(self, user_id: str) -> tuple: """ Generate post content using AI. Args: user_id (str): User ID for personalization Returns: tuple: (Generated post content, Image URL or None) """ try: # Ensure the client is initialized (lazy initialization) if self.client is None: self._initialize_client() # Call the Hugging Face model to generate content result = self.client.predict( code=user_id, api_name="/poster_linkedin" ) # Handle the case where result might be a tuple from Gradio # The Gradio API returns a tuple with (content, image_data) if isinstance(result, tuple) and len(result) >= 2: generated_content = result[0] if result[0] is not None else "Generated content will appear here..." image_data = result[1] if result[1] is not None else None else: # Parse the result (assuming it returns a list with content as first element) # First try to parse as JSON try: parsed_result = json.loads(result) except json.JSONDecodeError: # If JSON parsing fails, check if it's already a Python list/object try: # Try to evaluate as Python literal (safe for lists/dicts) import ast parsed_result = ast.literal_eval(result) except (ValueError, SyntaxError): # If that fails, treat the result as a plain string parsed_result = [result] # Extract the first element if it's a list if isinstance(parsed_result, list): generated_content = parsed_result[0] if parsed_result and parsed_result[0] is not None else "Generated content will appear here..." # Extract the second element as image URL if it exists image_data = parsed_result[1] if len(parsed_result) > 1 and parsed_result[1] is not None else None else: generated_content = str(parsed_result) if parsed_result is not None else "Generated content will appear here..." image_data = None # Validate, sanitize, and preserve formatting of the generated content sanitized_content = self.sanitize_content_for_api(generated_content) # Ensure paragraph breaks and formatting are preserved final_content = self.preserve_formatting(sanitized_content) # Handle image data - could be URL or base64 image_bytes = None if image_data: if self._is_base64_image(image_data): # Convert base64 to bytes for storage image_bytes = self._base64_to_bytes(image_data) else: # It's a URL, keep as string image_bytes = image_data return (final_content, image_bytes) except Exception as e: error_message = str(e) current_app.logger.error(f"Content generation failed: {error_message}") raise Exception(f"Content generation failed: {error_message}") def add_rss_source(self, rss_link: str, user_id: str) -> str: """ Add an RSS source for content generation. Args: rss_link (str): RSS feed URL user_id (str): User ID Returns: str: Result message """ try: # Ensure the client is initialized (lazy initialization) if self.client is None: self._initialize_client() # Call the Hugging Face model to add RSS source rss_input = f"{rss_link}__thi_irrh'èçs_my_id__! {user_id}" sanitized_rss_input = self.sanitize_content_for_api(rss_input) result = self.client.predict( rss_link=sanitized_rss_input, api_name="/ajouter_rss" ) # Sanitize and preserve formatting of the result sanitized_result = self.sanitize_content_for_api(result) return self.preserve_formatting(sanitized_result) except Exception as e: raise Exception(f"Failed to add RSS source: {str(e)}") def analyze_keyword_frequency(self, keyword, user_id, date_range='monthly'): """ Analyze the frequency of new articles/links appearing in RSS feeds generated from keywords. Args: keyword (str): The keyword to analyze user_id (str): User ID for filtering content date_range (str): The date range to analyze ('daily', 'weekly', 'monthly') Returns: dict: Analysis data with article frequency over time """ try: from flask import current_app from datetime import datetime, timedelta import re # Attempt to access current_app, but handle gracefully if outside of app context try: # Fetch posts from the database that belong to the user # Check if Supabase client is initialized if not hasattr(current_app, 'supabase') or current_app.supabase is None: raise Exception("Database connection not initialized") # Get all RSS sources for the user to analyze rss_response = ( current_app.supabase .table("Source") .select("source, categorie, created_at") .eq("user_id", user_id) .execute() ) user_rss_sources = rss_response.data if rss_response.data else [] # Analyze each RSS source for frequency of new articles/links keyword_data = [] # Create a DataFrame to store articles from RSS feeds all_articles = [] for rss_source in user_rss_sources: rss_link = rss_source["source"] # Check if the source is a keyword rather than an RSS URL # If it's a keyword, generate a Google News RSS URL if self._is_url(rss_link): # It's a URL, use it directly feed_url = rss_link else: # It's a keyword, generate Google News RSS URL feed_url = self._generate_google_news_rss_from_string(rss_link) # Parse the RSS feed feed = feedparser.parse(feed_url) # Log some debug information current_app.logger.info(f"Processing RSS feed: {feed_url}") current_app.logger.info(f"Number of entries in feed: {len(feed.entries)}") # Extract articles from the feed for entry in feed.entries: # Use the same date handling as in the original ai_agent.py article_data = { 'title': entry.title, 'link': entry.link, 'summary': entry.summary, 'date': entry.get('published', entry.get('updated', None)), 'content': entry.get('summary', '') + ' ' + entry.get('title', '') } # Log individual article data for debugging current_app.logger.info(f"Article title: {entry.title}") current_app.logger.info(f"Article date: {article_data['date']}") all_articles.append(article_data) # Create a DataFrame from the articles df_articles = pd.DataFrame(all_articles) current_app.logger.info(f"Total articles collected: {len(df_articles)}") if not df_articles.empty: current_app.logger.info(f"DataFrame columns: {df_articles.columns.tolist()}") current_app.logger.info(f"Sample of DataFrame:\n{df_articles.head()}") # Convert date column to datetime if it exists if not df_articles.empty and 'date' in df_articles.columns: # Convert struct_time objects to datetime df_articles['date'] = pd.to_datetime(df_articles['date'], errors='coerce', utc=True) current_app.logger.info(f"DataFrame shape after date conversion: {df_articles.shape}") current_app.logger.info(f"Date column after conversion:\n{df_articles['date'].head()}") df_articles = df_articles.dropna(subset=['date']) # Remove entries with invalid dates df_articles = df_articles.sort_values(by='date', ascending=True) current_app.logger.info(f"DataFrame shape after dropping invalid dates: {df_articles.shape}") # If we have articles, analyze article frequency over time if not df_articles.empty: # Group by date ranges and count all articles (not just those containing the keyword) # This will show how many new articles appear in RSS feeds over time # For the date grouping, use the appropriate pandas syntax # Handle timezone-aware dates properly to avoid warnings if date_range == 'daily': # Convert to date while preserving timezone info df_articles['date_group'] = df_articles['date'].dt.tz_localize(None).dt.date # Get date portion only interval = 'D' # Daily frequency elif date_range == 'weekly': # For weekly, get the start of the week (Monday) # First remove timezone info for proper date arithmetic tz_naive = df_articles['date'].dt.tz_localize(None) if df_articles['date'].dt.tz is not None else df_articles['date'] # Calculate the Monday of each week (0=Monday, 6=Sunday) df_articles['date_group'] = (tz_naive - pd.to_timedelta(tz_naive.dt.dayofweek, unit='d')).dt.date interval = 'W-MON' # Weekly frequency starting on Monday else: # monthly # For monthly, get the start of the month # Create a new datetime with day=1 for the start of the month df_articles['date_group'] = pd.to_datetime({ 'year': df_articles['date'].dt.year, 'month': df_articles['date'].dt.month, 'day': 1 }).dt.date interval = 'MS' # Month Start frequency # Count all articles by date group (this is the key difference - we're counting all articles, not keyword matches) article_counts = df_articles.groupby('date_group').size().reset_index(name='count') # Create a complete date range for the chart if not article_counts.empty: start_date = article_counts['date_group'].min() end_date = article_counts['date_group'].max() # Use the correct frequency for the date range generation if date_range == 'daily': freq = 'D' elif date_range == 'weekly': freq = 'W-MON' # Weekly on Monday else: # monthly freq = 'MS' # Month start frequency # Create a complete date range full_date_range = pd.date_range(start=start_date, end=end_date, freq=freq).to_frame(index=False, name='date_group') full_date_range['date_group'] = full_date_range['date_group'].dt.date # Merge with article counts article_counts = full_date_range.merge(article_counts, on='date_group', how='left').fillna(0) # Convert counts to integers article_counts['count'] = article_counts['count'].astype(int) # Format the data for the frontend chart for _, row in article_counts.iterrows(): date_str = row['date_group'].strftime('%Y-%m-%d') # Calculate values for different time ranges daily_val = row['count'] if date_range == 'daily' else int(row['count'] / 7) if date_range == 'weekly' else int(row['count'] / 30) weekly_val = daily_val * 7 if date_range == 'daily' else row['count'] if date_range == 'weekly' else int(row['count'] / 4) monthly_val = daily_val * 30 if date_range == 'daily' else weekly_val * 4 if date_range == 'weekly' else row['count'] keyword_data.append({ 'date': date_str, 'daily': daily_val, 'weekly': weekly_val, 'monthly': monthly_val }) else: # If no articles found, create empty data for the last 6 periods start_date = datetime.now() for i in range(6): if date_range == 'daily': date = (start_date - timedelta(days=i)).strftime('%Y-%m-%d') elif date_range == 'weekly': date = (start_date - timedelta(weeks=i)).strftime('%Y-%m-%d') else: # monthly date = (start_date - timedelta(days=30*i)).strftime('%Y-%m-%d') keyword_data.append({ 'date': date, 'daily': 0, 'weekly': 0, 'monthly': 0 }) else: # If no RSS sources or articles, create empty data for the last 6 periods start_date = datetime.now() for i in range(6): if date_range == 'daily': date = (start_date - timedelta(days=i)).strftime('%Y-%m-%d') elif date_range == 'weekly': date = (start_date - timedelta(weeks=i)).strftime('%Y-%m-%d') else: # monthly date = (start_date - timedelta(days=30*i)).strftime('%Y-%m-%d') keyword_data.append({ 'date': date, 'daily': 0, 'weekly': 0, 'monthly': 0 }) return keyword_data except RuntimeError: # We're outside of application context # Create mock data for testing purposes # This is for testing scenarios where the full application context isn't available start_date = datetime.now() keyword_data = [] for i in range(6): if date_range == 'daily': date = (start_date - timedelta(days=i)).strftime('%Y-%m-%d') elif date_range == 'weekly': date = (start_date - timedelta(weeks=i)).strftime('%Y-%m-%d') else: # monthly date = (start_date - timedelta(days=30*i)).strftime('%Y-%m-%d') keyword_data.append({ 'date': date, 'daily': 0, 'weekly': 0, 'monthly': 0 }) return keyword_data except Exception as e: import logging logging.error(f"Keyword frequency analysis failed: {str(e)}") raise Exception(f"Keyword frequency analysis failed: {str(e)}") def analyze_keyword_frequency_pattern(self, keyword, user_id): """ Analyze the frequency pattern of links generated from RSS feeds for a specific keyword over time. Determines if the keyword follows a daily, weekly, monthly, or rare pattern based on recency and frequency. Args: keyword (str): The keyword to analyze user_id (str): User ID for filtering content Returns: dict: Analysis data with frequency pattern classification """ try: from flask import current_app from datetime import datetime, timedelta import re # Create a DataFrame to store articles from RSS feeds all_articles = [] # Attempt to access current_app, but handle gracefully if outside of app context try: # Fetch posts from the database that belong to the user # Check if Supabase client is initialized if not hasattr(current_app, 'supabase') or current_app.supabase is None: raise Exception("Database connection not initialized") # Get all RSS sources for the user to analyze rss_response = ( current_app.supabase .table("Source") .select("source, categorie, created_at") .eq("user_id", user_id) .execute() ) user_rss_sources = rss_response.data if rss_response.data else [] # Analyze each RSS source # Check if the source matches the keyword or if it's any source # We'll analyze any source that contains the keyword or is related to it # Check if the source is a keyword rather than an RSS URL # If it's a keyword, generate a Google News RSS URL if self._is_url(keyword): # It's a URL, use it directly feed_url = keyword else: # It's a keyword, generate Google News RSS URL feed_url = self._generate_google_news_rss_from_string(keyword) # Parse the RSS feed feed = feedparser.parse(feed_url) # Log some debug information current_app.logger.info(f"Processing RSS feed: {feed_url}") current_app.logger.info(f"Number of entries in feed: {len(feed.entries)}") # Extract ALL articles from the feed (without filtering by keyword again) for entry in feed.entries: # Use the same date handling as in the original ai_agent.py article_data = { 'title': entry.title, 'link': entry.link, 'summary': entry.summary, 'date': entry.get('published', entry.get('updated', None)), 'content': entry.get('summary', '') + ' ' + entry.get('title', '') } # Log individual article data for debugging current_app.logger.info(f"Article title: {entry.title}") current_app.logger.info(f"Article date: {article_data['date']}") all_articles.append(article_data) # Create a DataFrame from the articles df_articles = pd.DataFrame(all_articles) current_app.logger.info(f"Total articles collected for keyword '{keyword}': {len(df_articles)}") if not df_articles.empty: current_app.logger.info(f"DataFrame columns: {df_articles.columns.tolist()}") current_app.logger.info(f"Sample of DataFrame:\n{df_articles.head()}") # Convert date column to datetime if it exists if not df_articles.empty and 'date' in df_articles.columns: # Convert struct_time objects to datetime df_articles['date'] = pd.to_datetime(df_articles['date'], errors='coerce', utc=True) current_app.logger.info(f"DataFrame shape after date conversion: {df_articles.shape}") current_app.logger.info(f"Date column after conversion:\n{df_articles['date'].head()}") df_articles = df_articles.dropna(subset=['date']) # Remove entries with invalid dates df_articles = df_articles.sort_values(by='date', ascending=False) # Sort by date descending to get most recent first current_app.logger.info(f"DataFrame shape after dropping invalid dates: {df_articles.shape}") # Analyze frequency pattern frequency_pattern = self._determine_frequency_pattern(df_articles) # Prepare recent articles to return with the response recent_articles = [] if not df_articles.empty: # Get the 5 most recent articles recent_df = df_articles.head(5) for _, row in recent_df.iterrows(): # Try to format the date properly formatted_date = None if pd.notna(row['date']): # Convert to string in a readable format formatted_date = row['date'].strftime('%Y-%m-%d %H:%M:%S') if hasattr(row['date'], 'strftime') else str(row['date']) recent_articles.append({ 'title': row['title'], 'link': row['link'], 'date': formatted_date }) # Return comprehensive analysis return { 'keyword': keyword, 'pattern': frequency_pattern['pattern'], 'details': frequency_pattern['details'], 'total_articles': len(df_articles), 'articles': recent_articles, 'date_range': { 'start': df_articles['date'].max().strftime('%Y-%m-%d') if not df_articles.empty else None, # Most recent date first 'end': df_articles['date'].min().strftime('%Y-%m-%d') if not df_articles.empty else None # Earliest date last } } except RuntimeError: # We're outside of application context # Return default analysis for testing purposes return { 'keyword': keyword, 'pattern': 'rare', 'details': { 'explanation': 'Application context not available, returning default analysis', 'confidence': 0.0 }, 'total_articles': 0, 'articles': [], 'date_range': { 'start': None, 'end': None } } except Exception as e: import logging logging.error(f"Keyword frequency pattern analysis failed: {str(e)}") raise Exception(f"Keyword frequency pattern analysis failed: {str(e)}") def _determine_frequency_pattern(self, df_articles): """ Determine the frequency pattern based on the recency and frequency of articles. Args: df_articles: DataFrame with articles data including dates Returns: dict: Pattern classification and details """ if df_articles.empty or 'date' not in df_articles.columns: return { 'pattern': 'rare', 'details': { 'explanation': 'No articles found', 'confidence': 1.0 } } # Calculate time since the latest article latest_date = df_articles['date'].max() current_time = pd.Timestamp.now(tz=latest_date.tz) if latest_date.tz else pd.Timestamp.now() time_since_latest = (current_time - latest_date).days # Calculate article frequency total_articles = len(df_articles) # Group articles by date to get daily counts df_articles['date_only'] = df_articles['date'].dt.date daily_counts = df_articles.groupby('date_only').size() # Calculate metrics avg_daily_frequency = daily_counts.mean() if len(daily_counts) > 0 else 0 recent_activity = daily_counts.tail(7).sum() # articles in last 7 days # Determine pattern based on multiple factors if total_articles == 0: return { 'pattern': 'rare', 'details': { 'explanation': 'No articles found', 'confidence': 1.0 } } # Check if pattern is truly persistent by considering recency if time_since_latest > 30: # If no activity in the last month, it's likely not a daily/weekly pattern anymore if total_articles > 0: return { 'pattern': 'rare', 'details': { 'explanation': f'No recent activity in the last {time_since_latest} days, despite {total_articles} total articles', 'confidence': 0.9 } } # If there are many recent articles per day, it's likely daily if recent_activity > 7 and time_since_latest <= 1: return { 'pattern': 'daily', 'details': { 'explanation': f'Many articles per day ({recent_activity} in the last 7 days) and recent activity', 'confidence': 0.9 } } # If there are few articles per day but regular weekly activity if 3 <= recent_activity <= 7 and time_since_latest <= 7: return { 'pattern': 'weekly', 'details': { 'explanation': f'About {recent_activity} articles per week with recent activity', 'confidence': 0.8 } } # If there are very few articles but they are somewhat spread over time if recent_activity < 3 and total_articles > 0 and time_since_latest <= 30: return { 'pattern': 'monthly', 'details': { 'explanation': f'Few articles per month with recent activity in the last {time_since_latest} days', 'confidence': 0.7 } } # Default to rare if no clear pattern return { 'pattern': 'rare', 'details': { 'explanation': f'Unclear pattern with {total_articles} total articles and last activity {time_since_latest} days ago', 'confidence': 0.5 } } def _is_url(self, s): # Vérifie si c'est une URL valide try: from urllib.parse import urlparse result = urlparse(s) return all([result.scheme, result.netloc]) except: return False def _generate_google_news_rss_from_string(self, query, language="en", country="US"): """ Génère un lien RSS Google News à partir d'une chaîne de recherche brute. Args: query (str): Requête brute de recherche Google News. language (str): Code langue, ex: "en". country (str): Code pays, ex: "US". Returns: str: URL du flux RSS Google News. """ query_encoded = urllib.parse.quote(query) url = ( f"https://news.google.com/rss/search?q={query_encoded}" f"&hl={language}&gl={country}&ceid={country}:{language}" ) return url