import gradio as gr import pandas as pd import os import json import sqlite3 import tempfile import nltk import traceback import datetime import time import numpy as np import matplotlib.pyplot as plt import io import base64 import requests import re from pathlib import Path from nltk.sentiment import SentimentIntensityAnalyzer from nltk.tokenize import word_tokenize from nltk.corpus import stopwords from sklearn.model_selection import train_test_split from sklearn.ensemble import RandomForestRegressor, RandomForestClassifier from sklearn.linear_model import LinearRegression from sklearn.cluster import KMeans from sklearn.preprocessing import StandardScaler from sklearn.metrics import mean_squared_error, r2_score, accuracy_score, classification_report from sklearn.feature_extraction.text import TfidfVectorizer import pymongo import redis import pymysql # Using pymysql instead of mysql.connector import psycopg2 from bs4 import BeautifulSoup def setup_nltk(): try: # Use a temporary directory for NLTK data nltk_data_dir = os.path.join(tempfile.gettempdir(), 'nltk_data') os.makedirs(nltk_data_dir, exist_ok=True) nltk.data.path.append(nltk_data_dir) # Download necessary NLTK data nltk_resources = ['punkt', 'stopwords', 'vader_lexicon'] for resource in nltk_resources: try: nltk.data.find(f'tokenizers/{resource}' if resource == 'punkt' else f'corpora/{resource}' if resource == 'stopwords' else f'sentiment/{resource}') except LookupError: nltk.download(resource, download_dir=nltk_data_dir, quiet=True) return True except Exception as e: print(f"Error setting up NLTK: {e}") return False # Initialize NLTK if not setup_nltk(): print("Failed to set up NLTK. Some NLP features may not work properly.") class DatabaseManager: def __init__(self, db_path=None): try: # Use a temporary directory for the database if db_path is None: db_dir = os.path.join(tempfile.gettempdir(), 'data') os.makedirs(db_dir, exist_ok=True) db_path = os.path.join(db_dir, 'data_deity.db') self.db_path = db_path self.connection = sqlite3.connect(db_path) self.cursor = self.connection.cursor() self._create_tables() print(f"Successfully initialized database at {db_path}") except sqlite3.Error as e: print(f"Failed to initialize database: {e}") # Fallback to in-memory database if file-based DB fails try: print("Trying in-memory database as fallback...") self.db_path = ":memory:" self.connection = sqlite3.connect(":memory:") self.cursor = self.connection.cursor() self._create_tables() print("Successfully initialized in-memory database") except sqlite3.Error as e2: print(f"Failed to initialize in-memory database: {e2}") raise def _create_tables(self): try: self.cursor.execute(''' CREATE TABLE IF NOT EXISTS files ( id INTEGER PRIMARY KEY, filename TEXT, original TEXT, path TEXT, size INTEGER, file_type TEXT, upload_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') self.cursor.execute(''' CREATE TABLE IF NOT EXISTS metadata ( id INTEGER PRIMARY KEY, file_id INTEGER, meta_key TEXT, meta_value TEXT, FOREIGN KEY (file_id) REFERENCES files (id) ) ''') self.cursor.execute(''' CREATE TABLE IF NOT EXISTS chunks ( id INTEGER PRIMARY KEY, file_id INTEGER, chunk_index INTEGER, chunk_text TEXT, chunk_size INTEGER, FOREIGN KEY (file_id) REFERENCES files (id) ) ''') self.cursor.execute(''' CREATE TABLE IF NOT EXISTS insights ( id INTEGER PRIMARY KEY, file_id INTEGER, insight_type TEXT, insight_text TEXT, confidence REAL, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, is_speculative BOOLEAN, FOREIGN KEY (file_id) REFERENCES files (id) ) ''') self.cursor.execute(''' CREATE TABLE IF NOT EXISTS analytics ( id INTEGER PRIMARY KEY, file_id INTEGER, analysis_type TEXT, analysis_result TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (file_id) REFERENCES files (id) ) ''') self.connection.commit() print("Successfully created database tables") except sqlite3.Error as e: print(f"Error creating tables: {e}") raise def add_file(self, filename, original, path, size, file_type): try: self.cursor.execute(''' INSERT INTO files (filename, original, path, size, file_type) VALUES (?, ?, ?, ?, ?) ''', (filename, original, path, size, file_type)) self.connection.commit() return self.cursor.lastrowid except sqlite3.Error as e: print(f"Database Error in add_file: {e}") self.connection.rollback() return None def add_metadata(self, file_id, meta_key, meta_value): try: self.cursor.execute(''' INSERT INTO metadata (file_id, meta_key, meta_value) VALUES (?, ?, ?) ''', (file_id, meta_key, meta_value)) self.connection.commit() except sqlite3.Error as e: print(f"Database Error in add_metadata: {e}") self.connection.rollback() def add_chunk(self, file_id, chunk_index, chunk_text, chunk_size): try: self.cursor.execute(''' INSERT INTO chunks (file_id, chunk_index, chunk_text, chunk_size) VALUES (?, ?, ?, ?) ''', (file_id, chunk_index, chunk_text, chunk_size)) self.connection.commit() except sqlite3.Error as e: print(f"Database Error in add_chunk: {e}") self.connection.rollback() def add_insight(self, file_id, insight_type, insight_text, confidence, is_speculative): try: self.cursor.execute(''' INSERT INTO insights (file_id, insight_type, insight_text, confidence, is_speculative) VALUES (?, ?, ?, ?, ?) ''', (file_id, insight_type, insight_text, confidence, is_speculative)) self.connection.commit() except sqlite3.Error as e: print(f"Database Error in add_insight: {e}") self.connection.rollback() def add_analysis(self, file_id, analysis_type, analysis_result): try: self.cursor.execute(''' INSERT INTO analytics (file_id, analysis_type, analysis_result) VALUES (?, ?, ?) ''', (file_id, analysis_type, analysis_result)) self.connection.commit() except sqlite3.Error as e: print(f"Database Error in add_analysis: {e}") self.connection.rollback() def get_file_by_id(self, file_id): try: self.cursor.execute(''' SELECT * FROM files WHERE id = ? ''', (file_id,)) return self.cursor.fetchone() except sqlite3.Error as e: print(f"Database Error in get_file_by_id: {e}") return None def get_analysis_by_file_id(self, file_id): try: self.cursor.execute(''' SELECT analysis_type, analysis_result FROM analytics WHERE file_id = ? ''', (file_id,)) return self.cursor.fetchall() except sqlite3.Error as e: print(f"Database Error in get_analysis_by_file_id: {e}") return [] def get_insights_by_file_id(self, file_id): try: self.cursor.execute(''' SELECT insight_type, insight_text, confidence FROM insights WHERE file_id = ? ''', (file_id,)) return self.cursor.fetchall() except sqlite3.Error as e: print(f"Database Error in get_insights_by_file_id: {e}") return [] def close(self): if hasattr(self, 'connection') and self.connection: self.connection.close() class FileProcessor: def __init__(self, db_manager): self.db_manager = db_manager self.sia = SentimentIntensityAnalyzer() def process_file(self, file): try: # Write the file content to a temporary file temp_dir = tempfile.mkdtemp() file_path = os.path.join(temp_dir, os.path.basename(file.name)) import shutil shutil.copy(file.name, file_path) file_size = os.path.getsize(file_path) file_extension = os.path.splitext(file.name)[1].lower() if file_extension == '.txt': file_type = 'text' elif file_extension == '.csv': file_type = 'csv' elif file_extension == '.json': file_type = 'json' else: raise ValueError(f"Unsupported file type: {file_extension}") file_id = self.db_manager.add_file( filename=os.path.basename(file.name), original=os.path.basename(file.name), path=file_path, size=file_size, file_type=file_type ) if not file_id: raise Exception("Failed to add file to database") chunk_count = 0 if file_type == 'text': chunk_count = self._process_text_file(file_path, file_id) elif file_type == 'csv': chunk_count = self._process_csv_file(file_path, file_id) elif file_type == 'json': chunk_count = self._process_json_file(file_path, file_id) return file_id, chunk_count except Exception as e: print(f"Error processing file: {e}") print(traceback.format_exc()) raise def _process_text_file(self, file_path, file_id): try: with open(file_path, 'r', encoding='utf-8') as f: text = f.read() self.db_manager.add_metadata(file_id, 'character_count', str(len(text))) self.db_manager.add_metadata(file_id, 'word_count', str(len(text.split()))) chunks = text.split('\n\n') for i, chunk in enumerate(chunks): if chunk.strip(): self.db_manager.add_chunk(file_id, i, chunk, len(chunk)) sentiment = self.sia.polarity_scores(text) sentiment_result = json.dumps(sentiment) self.db_manager.add_analysis(file_id, 'sentiment_analysis', sentiment_result) tokens = word_tokenize(text) stop_words = set(stopwords.words('english')) filtered_tokens = [word for word in tokens if word.lower() not in stop_words] token_analysis = { 'total_tokens': len(tokens), 'unique_tokens': len(set(tokens)), 'tokens_without_stopwords': len(filtered_tokens), 'sample_tokens': filtered_tokens[:20] if len(filtered_tokens) > 20 else filtered_tokens } self.db_manager.add_analysis(file_id, 'tokenization', json.dumps(token_analysis)) if sentiment['compound'] > 0.5: self.db_manager.add_insight( file_id, 'sentiment', 'Text has a very positive tone', sentiment['compound'], False ) elif sentiment['compound'] < -0.5: self.db_manager.add_insight( file_id, 'sentiment', 'Text has a very negative tone', abs(sentiment['compound']), False ) return len(chunks) except Exception as e: print(f"Error processing text file: {e}") print(traceback.format_exc()) raise def _process_csv_file(self, file_path, file_id): try: df = pd.read_csv(file_path) self.db_manager.add_metadata(file_id, 'row_count', str(len(df))) self.db_manager.add_metadata(file_id, 'column_count', str(len(df.columns))) self.db_manager.add_metadata(file_id, 'columns', ','.join(df.columns)) chunk_size = 100 chunks = [df[i:i + chunk_size] for i in range(0, len(df), chunk_size)] for i, chunk in enumerate(chunks): chunk_text = chunk.to_json(orient='records') self.db_manager.add_chunk(file_id, i, chunk_text, len(chunk_text)) numeric_columns = df.select_dtypes(include=['number']).columns if len(numeric_columns) > 0: stats = df[numeric_columns].describe().to_json() self.db_manager.add_analysis(file_id, 'statistical_analysis', stats) if len(numeric_columns) >= 2 and len(df) >= 20: try: target_col = numeric_columns[0] feature_cols = [col for col in numeric_columns if col != target_col] X = df[feature_cols] y = df[target_col] X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.2, random_state=42 ) model = RandomForestRegressor(n_estimators=50, random_state=42) model.fit(X_train, y_train) y_pred = model.predict(X_test) mse = mean_squared_error(y_test, y_pred) r2 = r2_score(y_test, y_pred) model_results = { 'target_column': target_col, 'feature_columns': feature_cols, 'mean_squared_error': mse, 'r2_score': r2, 'feature_importance': {col: imp for col, imp in zip(feature_cols, model.feature_importances_)} } self.db_manager.add_analysis(file_id, 'predictive_model', json.dumps(model_results)) if r2 > 0.7: self.db_manager.add_insight( file_id, 'prediction', f'Strong predictive relationship found between {target_col} and other variables', r2, False ) elif r2 > 0.3: self.db_manager.add_insight( file_id, 'prediction', f'Moderate predictive relationship found between {target_col} and other variables', r2, False ) except Exception as e: print(f"Could not create predictive model: {e}") return len(chunks) except Exception as e: print(f"Error processing CSV file: {e}") print(traceback.format_exc()) raise def _process_json_file(self, file_path, file_id): try: with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) json_str = json.dumps(data) if isinstance(data, list): self.db_manager.add_metadata(file_id, 'item_count', str(len(data))) self.db_manager.add_metadata(file_id, 'structure', 'array') elif isinstance(data, dict): self.db_manager.add_metadata(file_id, 'key_count', str(len(data.keys()))) self.db_manager.add_metadata(file_id, 'structure', 'object') self.db_manager.add_metadata(file_id, 'keys', ','.join(data.keys())) chunks = [] if isinstance(data, list): chunk_size = 10 chunks = [data[i:i + chunk_size] for i in range(0, len(data), chunk_size)] else: chunks = [data] for i, chunk in enumerate(chunks): chunk_text = json.dumps(chunk) self.db_manager.add_chunk(file_id, i, chunk_text, len(chunk_text)) structure_analysis = self._analyze_json_structure(data) self.db_manager.add_analysis(file_id, 'structure_analysis', json.dumps(structure_analysis)) return len(chunks) except Exception as e: print(f"Error processing JSON file: {e}") print(traceback.format_exc()) raise def _analyze_json_structure(self, data, max_depth=3, current_depth=0): if current_depth >= max_depth: return "..." if isinstance(data, dict): result = {} for k, v in list(data.items())[:10]: result[k] = self._analyze_json_structure(v, max_depth, current_depth + 1) if len(data) > 10: result["..."] = f"{len(data) - 10} more keys" return result elif isinstance(data, list): if len(data) == 0: return [] if len(data) > 5: return [ self._analyze_json_structure(data[0], max_depth, current_depth + 1), "...", f"{len(data)} items total" ] return [self._analyze_json_structure(item, max_depth, current_depth + 1) for item in data] else: return type(data).__name__ class DataDeityApp: def __init__(self): self.db_manager = DatabaseManager() self.file_processor = FileProcessor(self.db_manager) self.processed_data = {} def run(self, file): try: file_id, chunk_count = self.file_processor.process_file(file) self.processed_data[file.name] = file_id return chunk_count except Exception as e: print(f"Error in app.run: {e}") print(traceback.format_exc()) return 0 def get_analysis_results(self, file_id): try: file_info = self.db_manager.get_file_by_id(file_id) if not file_info: return {"Error": "File not found"} file_type = file_info[5] analyses = self.db_manager.get_analysis_by_file_id(file_id) insights = self.db_manager.get_insights_by_file_id(file_id) results = {} results["File Information"] = f"""

Filename: {file_info[1]}

Size: {file_info[4]} bytes

Type: {file_info[5]}

""" if file_type == 'text': for analysis_type, analysis_result in analyses: if analysis_type == 'sentiment_analysis': sentiment = json.loads(analysis_result) results["Sentiment Analysis"] = f"""

Compound Score: {sentiment['compound']:.4f}

Positive: {sentiment['pos']:.4f}

Neutral: {sentiment['neu']:.4f}

Negative: {sentiment['neg']:.4f}

""" elif analysis_type == 'tokenization': token_data = json.loads(analysis_result) results["Text Tokenization"] = f"""

Total Tokens: {token_data['total_tokens']}

Unique Tokens: {token_data['unique_tokens']}

Tokens without Stopwords: {token_data['tokens_without_stopwords']}

Sample Tokens: {', '.join(token_data['sample_tokens'])}

""" elif file_type == 'csv': for analysis_type, analysis_result in analyses: if analysis_type == 'statistical_analysis': stats = json.loads(analysis_result) # stats is now a dictionary stats_html = "
" stats_html += "" # Corrected line: stats is already a dict, no need for json.loads() columns = list(stats.keys()) for col in columns: stats_html += f"" stats_html += "" metrics = ['count', 'mean', 'std', 'min', '25%', '50%', '75%', 'max'] for metric in metrics: stats_html += f"" for col in columns: # Corrected line: stats is already a dict, col_stats = stats[col] col_stats = stats[col] if metric in col_stats: value = col_stats[metric] stats_html += f"" else: stats_html += "" stats_html += "" stats_html += "
Statistic{col}
{metric}{value:.4f if isinstance(value, float) else value}N/A
" results["Statistical Analysis"] = stats_html elif analysis_type == 'predictive_model': model_data = json.loads(analysis_result) results["Predictive Model"] = f"""

Target Column: {model_data['target_column']}

Feature Columns: {', '.join(model_data['feature_columns'])}

Model Performance:

Feature Importance:

{''.join([f'
{feat}:
{imp:.4f}
' for feat, imp in sorted(model_data['feature_importance'].items(), key=lambda x: x[1], reverse=True)])}
""" elif file_type == 'json': for analysis_type, analysis_result in analyses: if analysis_type == 'structure_analysis': structure = json.loads(analysis_result) results["JSON Structure"] = f"""

Structure Overview:

{json.dumps(structure, indent=2)}
""" if insights: insights_html = "

Key Insights

" results["Insights"] = insights_html return results except Exception as e: print(f"Error getting analysis results: {e}") print(traceback.format_exc()) return {"Error": str(e)} def generate_report(self, file_id): try: file_info = self.db_manager.get_file_by_id(file_id) if not file_info: return None filename = file_info[1] file_type = file_info[5] os.makedirs('reports', exist_ok=True) report_filename = f"report_{os.path.splitext(filename)[0]}_{int(time.time())}.html" report_path = os.path.join('reports', report_filename) analyses = self.db_manager.get_analysis_by_file_id(file_id) insights = self.db_manager.get_insights_by_file_id(file_id) with open(report_path, 'w', encoding='utf-8') as f: f.write(f""" Analysis Report: {filename}

Analysis Report: {filename}

File Information

Filename: {filename}

Size: {file_info[4]} bytes

Type: {file_type}

Upload Date: {file_info[6]}

""") if file_type == 'text': for analysis_type, analysis_result in analyses: if analysis_type == 'sentiment_analysis': sentiment = json.loads(analysis_result) f.write(f"""

Sentiment Analysis

Compound Score: {sentiment['compound']:.4f}

Positive: {sentiment['pos']:.4f}

Neutral: {sentiment['neu']:.4f}

Negative: {sentiment['neg']:.4f}

""") elif analysis_type == 'tokenization': token_data = json.loads(analysis_result) f.write(f"""

Text Tokenization

Total Tokens: {token_data['total_tokens']}

Unique Tokens: {token_data['unique_tokens']}

Tokens without Stopwords: {token_data['tokens_without_stopwords']}

Sample Tokens: {', '.join(token_data['sample_tokens'])}

""") elif file_type == 'csv': for analysis_type, analysis_result in analyses: if analysis_type == 'statistical_analysis': stats = json.loads(analysis_result) # stats is now a dictionary f.write("""

Statistical Analysis

""") # Corrected line: stats is already a dict, no need for json.loads() columns = list(stats.keys()) for col in columns: f.write(f"") f.write("") metrics = ['count', 'mean', 'std', 'min', '25%', '50%', '75%', 'max'] for metric in metrics: f.write(f"") for col in columns: # Corrected line: stats is already a dict, col_stats = stats[col] col_stats = stats[col] if metric in col_stats: value = col_stats[metric] f.write(f"") else: f.write("") f.write("") f.write("""
Statistic{col}
{metric}{value:.4f if isinstance(value, float) else value}N/A
""") elif analysis_type == 'predictive_model': model_data = json.loads(analysis_result) f.write(f"""

Predictive Model

Target Column: {model_data['target_column']}

Feature Columns: {', '.join(model_data['feature_columns'])}

Model Performance:

Feature Importance:

{''.join([f'
{feat}:
{imp:.4f}
' for feat, imp in sorted(model_data['feature_importance'].items(), key=lambda x: x[1], reverse=True)])}
""") elif file_type == 'json': for analysis_type, analysis_result in analyses: if analysis_type == 'structure_analysis': structure = json.loads(analysis_result) f.write(f"""

JSON Structure

{json.dumps(structure, indent=2)}
""") if insights: f.write("""

Key Insights

    """) for insight_type, insight_text, confidence in insights: f.write(f"
  • {insight_type.title()}: {insight_text} (Confidence: {confidence:.2f})
  • ") f.write("""
""") f.write("""
""".format(datetime_now=datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) return report_path except Exception as e: print(f"Error generating report: {e}") print(traceback.format_exc()) return None def cleanup(self): try: self.db_manager.close() except Exception as e: print(f"Error during cleanup: {e}") def main(): import time import datetime app = DataDeityApp() custom_css = """ body { font-family: 'Arial', sans-serif; } .analysis-results { max-height: 800px; overflow-y: auto; padding: 15px; border-radius: 5px; border: 1px solid #eee; } .sentiment-analysis, .tokenization, .json-data { margin: 15px 0; padding: 15px; border: 1px solid #eee; border-radius: 5px; } pre { background-color: #f8f9fa; padding: 15px; border-radius: 5px; overflow-x: auto; } .stats-table table { width: 100%; border-collapse: collapse; } .stats-table th, .stats-table td { border: 1px solid #ddd; padding: 8px; text-align: left; } .stats-table th { background-color: #f2f2f2; } .error-message { color: #d9534f; padding: 15px; border: 1px solid #d9534f; border-radius: 5px; } .feature-importance { margin-top: 10px; } .insights { background-color: #f0f7ff; padding: 15px; border-radius: 5px; } """ def process_and_display(file): try: if file is None: return """

No File Selected

Please upload a file to analyze.

""" chunk_count = app.run(file) file_id = app.processed_data.get(file.name) if file_id is not None: analysis_results = app.get_analysis_results(file_id) output_html = f"""

Analysis Results for {file.name}

Processed {chunk_count} chunks

""" for key, value in analysis_results.items(): output_html += f"""

{key}

{value}
""" output_html += "
" return output_html else: return f"""

Processing Error

Failed to process file: {file.name}

Chunks processed: {chunk_count}

""" except Exception as e: print(f"Error in process_and_display: {e}") print(traceback.format_exc()) return f"""

Error

An error occurred while processing the file: {str(e)}

""" def generate_and_download_report(file): try: if file is None: return None file_id = app.processed_data.get(file.name) if file_id is not None: report_path = app.generate_report(file_id) if report_path: return report_path return None except Exception as e: print(f"Error generating report: {e}") print(traceback.format_exc()) return None with gr.Blocks(css=custom_css) as demo: gr.Markdown(""" # Advanced File Processing & Analysis Application This application provides comprehensive analysis of text, CSV, and JSON files. ### Supported File Types: - Text Files (.txt): Sentiment analysis and text tokenization - CSV Files (.csv): Statistical analysis and predictive modeling - JSON Files (.json): Structure analysis and data exploration ### Features: - Automated data processing and chunking - Advanced analytics and insights - Downloadable analysis reports """) with gr.Row(): file_input = gr.File(label="Upload a file (.txt, .csv, or .json)") with gr.Row(): analyze_btn = gr.Button("Analyze File", variant="primary") download_btn = gr.Button("Download Report", variant="secondary") output = gr.HTML(label="Analysis Results") report_output = gr.File(label="Download Report") analyze_btn.click( fn=process_and_display, inputs=[file_input], outputs=[output] ) download_btn.click( fn=generate_and_download_report, inputs=[file_input], outputs=[report_output] ) def generate_llm_dataset(file): if file is None: return None file_id = app.processed_data.get(file.name) if file_id is None: return None try: chunks = app.db_manager.cursor.execute("SELECT chunk_text FROM chunks WHERE file_id = ?", (file_id,)).fetchall() if not chunks: return None dataset_filename = f"dataset_{os.path.splitext(os.path.basename(file.name))[0]}_{int(time.time())}.jsonl" dataset_path = os.path.join(tempfile.gettempdir(), dataset_filename) with open(dataset_path, 'w', encoding='utf-8') as f: for chunk in chunks: # Simple format for pre-training or un-instruct fine-tuning entry = {"text": chunk[0]} f.write(json.dumps(entry) + '\n') return dataset_path except Exception as e: print(f"Error generating LLM dataset: {e}") print(traceback.format_exc()) return None dataset_btn = gr.Button("Download LLM Dataset (.jsonl)", variant="secondary") dataset_output = gr.File(label="Download Dataset") dataset_btn.click( fn=generate_llm_dataset, inputs=[file_input], outputs=[dataset_output] ) demo.launch(share=True) if __name__ == "__main__": main()