Spaces:
Paused
Paused
| import platform | |
| import os | |
| import sqlite3 | |
| import uuid | |
| import datetime | |
| import shutil | |
| import traceback | |
| import logging | |
| from pathlib import Path | |
| from abc import ABC, abstractmethod | |
| from typing import Dict, Any, List | |
| import gradio as gr | |
| import pandas as pd | |
| # --- Base Classes --- | |
| class Interface(ABC): | |
| def launch(self): | |
| pass | |
| class Command(ABC): | |
| def execute(self): | |
| pass | |
| # --- Database Manager Implementation --- | |
| class DatabaseManager: | |
| """Handles all database operations including creation, connection, and CRUD operations.""" | |
| def __init__(self, db_path: str = None): | |
| if db_path is None: | |
| if platform.system() == 'Windows': | |
| base_dir = os.path.join(os.environ['APPDATA'], 'FileStorageApp') | |
| elif platform.system() == 'Darwin': | |
| base_dir = os.path.join(os.path.expanduser('~'), 'Library', 'Application Support', 'FileStorageApp') | |
| else: | |
| base_dir = os.path.join(os.path.expanduser('~'), '.filestorage') | |
| os.makedirs(base_dir, exist_ok=True) | |
| self.db_path = os.path.join(base_dir, 'file_storage.db') | |
| else: | |
| self.db_path = db_path | |
| self.conn = None | |
| self.cursor = None | |
| self.connect() | |
| self.create_tables() | |
| def connect(self) -> None: | |
| """Establish a connection to the SQLite database.""" | |
| try: | |
| self.conn = sqlite3.connect(self.db_path) | |
| self.conn.execute("PRAGMA foreign_keys = ON") | |
| self.cursor = self.conn.cursor() | |
| except sqlite3.Error as e: | |
| logging.error(f"Database connection error: {e}") | |
| raise | |
| def create_tables(self) -> None: | |
| """Create necessary tables if they don't exist.""" | |
| tables = [ | |
| '''CREATE TABLE IF NOT EXISTS files ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| filename TEXT NOT NULL, | |
| original_filename TEXT NOT NULL, | |
| file_path TEXT NOT NULL, | |
| file_size INTEGER NOT NULL, | |
| file_type TEXT, | |
| upload_date DATETIME DEFAULT CURRENT_TIMESTAMP | |
| )''', | |
| '''CREATE TABLE IF NOT EXISTS metadata ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| file_id INTEGER NOT NULL, | |
| key TEXT NOT NULL, | |
| value TEXT, | |
| FOREIGN KEY (file_id) REFERENCES files (id) ON DELETE CASCADE | |
| )''', | |
| '''CREATE TABLE IF NOT EXISTS chunks ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| file_id INTEGER NOT NULL, | |
| chunk_index INTEGER NOT NULL, | |
| chunk_text TEXT NOT NULL, | |
| chunk_size INTEGER NOT NULL, | |
| FOREIGN KEY (file_id) REFERENCES files (id) ON DELETE CASCADE | |
| )''' | |
| ] | |
| try: | |
| for table in tables: | |
| self.cursor.execute(table) | |
| self.conn.commit() | |
| except sqlite3.Error as e: | |
| self.conn.rollback() | |
| logging.error(f"Error creating tables: {e}") | |
| raise | |
| def insert_file(self, file_data: Dict[str, Any]) -> int: | |
| """Insert file information into the database.""" | |
| try: | |
| self.cursor.execute(''' | |
| INSERT INTO files (filename, original_filename, file_path, file_size, file_type) | |
| VALUES (?, ?, ?, ?, ?) | |
| ''', (file_data['filename'], file_data['original_filename'], | |
| file_data['file_path'], file_data['file_size'], file_data['file_type'])) | |
| self.conn.commit() | |
| return self.cursor.lastrowid | |
| except sqlite3.Error as e: | |
| self.conn.rollback() | |
| logging.error(f"Error inserting file: {e}") | |
| raise | |
| def insert_metadata(self, file_id: int, metadata: Dict[str, str]) -> None: | |
| """Insert metadata for a specific file.""" | |
| try: | |
| for key, value in metadata.items(): | |
| self.cursor.execute(''' | |
| INSERT INTO metadata (file_id, key, value) | |
| VALUES (?, ?, ?) | |
| ''', (file_id, key, value)) | |
| self.conn.commit() | |
| except sqlite3.Error as e: | |
| self.conn.rollback() | |
| logging.error(f"Error inserting metadata: {e}") | |
| raise | |
| def insert_chunk(self, file_id: int, chunk_index: int, chunk_text: str) -> None: | |
| """Insert a text chunk into the database.""" | |
| try: | |
| chunk_size = len(chunk_text.split()) | |
| self.cursor.execute(''' | |
| INSERT INTO chunks (file_id, chunk_index, chunk_text, chunk_size) | |
| VALUES (?, ?, ?, ?) | |
| ''', (file_id, chunk_index, chunk_text, chunk_size)) | |
| self.conn.commit() | |
| except sqlite3.Error as e: | |
| self.conn.rollback() | |
| logging.error(f"Error inserting chunk: {e}") | |
| raise | |
| def log_error(self, error_data: Dict[str, str]) -> None: | |
| """Log errors to the database.""" | |
| try: | |
| self.cursor.execute(''' | |
| INSERT INTO metadata (file_id, key, value) | |
| VALUES (?, ?, ?) | |
| ''', (-1, 'error', str(error_data))) | |
| self.conn.commit() | |
| except sqlite3.Error as e: | |
| logging.error(f"Error logging error: {e}") | |
| def close(self) -> None: | |
| """Close the database connection.""" | |
| if self.conn: | |
| self.conn.close() | |
| # --- File Processor Implementation --- | |
| class FileProcessor: | |
| """Handles file uploads, storage, and metadata extraction.""" | |
| def __init__(self, upload_folder: str = None): | |
| self.upload_folder = upload_folder or os.path.join(Path.home(), 'FileUploads') | |
| os.makedirs(self.upload_folder, exist_ok=True) | |
| def save_file(self, file: Any) -> Dict[str, Any]: | |
| """Save the uploaded file and extract metadata.""" | |
| filename = f"{uuid.uuid4()}_{file.name}" | |
| file_path = os.path.join(self.upload_folder, filename) | |
| try: | |
| with open(file_path, "wb") as f: | |
| f.write(file.read()) | |
| return { | |
| 'filename': filename, | |
| 'original_filename': file.name, | |
| 'file_path': file_path, | |
| 'file_size': os.path.getsize(file_path), | |
| 'file_type': file.name.split('.')[-1] if '.' in file.name else 'unknown' | |
| } | |
| except Exception as e: | |
| logging.error(f"Error saving file: {e}") | |
| raise | |
| def extract_content(self, file_path: str) -> str: | |
| """Extract text content from a file.""" | |
| try: | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| return f.read() | |
| except Exception as e: | |
| logging.error(f"Error extracting content: {e}") | |
| raise | |
| # --- Text Chunker Implementation --- | |
| class TextChunker: | |
| """Splits text content into manageable chunks.""" | |
| def __init__(self, chunk_size: int = 500, overlap: int = 50): | |
| self.chunk_size = chunk_size | |
| self.overlap = overlap | |
| def chunk_text(self, text: str) -> List[str]: | |
| """Split text into chunks with overlap.""" | |
| words = text.split() | |
| chunks = [] | |
| start = 0 | |
| while start < len(words): | |
| end = start + self.chunk_size | |
| chunks.append(' '.join(words[start:end])) | |
| start = end - self.overlap | |
| return chunks | |
| # --- Command Handler Implementation --- | |
| class CommandHandler: | |
| """Manages command execution.""" | |
| def __init__(self): | |
| self.commands = {} | |
| def register_command(self, name: str, command: Command): | |
| self.commands[name] = command | |
| def execute_command(self, name: str) -> bool: | |
| if name in self.commands: | |
| self.commands[name].execute() | |
| return True | |
| logging.warning(f"Command '{name}' not found.") | |
| return False | |
| # --- Main Application Implementation --- | |
| class Application(Interface): | |
| """Core application class.""" | |
| def __init__(self): | |
| self.db_manager = DatabaseManager() | |
| self.file_processor = FileProcessor() | |
| self.text_chunker = TextChunker(chunk_size=512, overlap=50) | |
| self.command_handler = CommandHandler() | |
| self.processed_data = None | |
| def run(self, uploaded_file: Any) -> None: | |
| """Main processing pipeline.""" | |
| try: | |
| if not uploaded_file: | |
| raise ValueError("No file provided for processing") | |
| # Process file | |
| file_info = self.file_processor.save_file(uploaded_file) | |
| file_id = self.db_manager.insert_file(file_info) | |
| # Extract and chunk content | |
| raw_content = self.file_processor.extract_content(file_info['file_path']) | |
| chunks = self.text_chunker.chunk_text(raw_content) | |
| # Store chunks and metadata | |
| self.db_manager.insert_metadata(file_id, { | |
| 'source': 'upload', | |
| 'processed_at': datetime.datetime.now().isoformat() | |
| }) | |
| for idx, chunk in enumerate(chunks): | |
| self.db_manager.insert_chunk(file_id, idx+1, chunk) | |
| self.processed_data = { | |
| 'filename': uploaded_file.name, | |
| 'chunk_count': len(chunks), | |
| 'status': 'processed' | |
| } | |
| except Exception as e: | |
| self._handle_error(e) | |
| raise | |
| def _handle_error(self, error: Exception) -> None: | |
| """Centralized error handling.""" | |
| error_data = { | |
| 'timestamp': datetime.datetime.now().isoformat(), | |
| 'error_type': type(error).__name__, | |
| 'message': str(error), | |
| 'stack_trace': traceback.format_exc() | |
| } | |
| self.db_manager.log_error(error_data) | |
| self.processed_data = {'status': 'failed'} | |
| # --- Gradio Interface Implementation --- | |
| class DataDeityInterface: | |
| def __init__(self, app): | |
| self.app = app | |
| self._setup_theme() | |
| def _setup_theme(self): | |
| self.theme = gr.themes.Default( | |
| primary_hue="emerald", | |
| secondary_hue="teal", | |
| font=[gr.themes.GoogleFont("Fira Code"), "Arial", "sans-serif"] | |
| ) | |
| def _file_upload_tab(self): | |
| with gr.Tab("π€ Upload & Process"): | |
| with gr.Row(): | |
| file_input = gr.File(label="Drag files here", file_count="multiple") | |
| stats_output = gr.JSON(label="Processing Stats") | |
| with gr.Row(): | |
| process_btn = gr.Button("β‘ Process Files", variant="primary") | |
| clear_btn = gr.Button("π§Ή Clear Cache") | |
| file_output = gr.Dataframe(label="File Contents Preview") | |
| process_btn.click( | |
| self.process_file, | |
| inputs=file_input, | |
| outputs=[stats_output, file_output] | |
| ) | |
| clear_btn.click(lambda: None, outputs=[file_input, stats_output, file_output]) | |
| return file_input | |
| def _data_explorer_tab(self): | |
| with gr.Tab("π Data Explorer"): | |
| with gr.Row(): | |
| refresh_btn = gr.Button("π Refresh Data", variant="secondary") | |
| search_bar = gr.Textbox(placeholder="Search across all data...") | |
| with gr.Tabs(): | |
| with gr.Tab("Database View"): | |
| files_table = gr.Dataframe(label="Stored Files") | |
| metadata_table = gr.Dataframe(label="File Metadata") | |
| chunks_table = gr.Dataframe(label="Text Chunks") | |
| with gr.Tab("Analytics View"): | |
| stats_plot = gr.Plot(label="Data Distribution") | |
| correlations = gr.Matrix(label="Data Correlations") | |
| refresh_btn.click( | |
| self.refresh_data, | |
| outputs=[files_table, metadata_table, chunks_table] | |
| ) | |
| def _command_interface_tab(self): | |
| with gr.Tab("π» Command Console"): | |
| cmd_input = gr.Textbox( | |
| placeholder="Enter data command...", | |
| lines=3, | |
| max_lines=10 | |
| ) | |
| with gr.Row(): | |
| execute_btn = gr.Button("π Execute", variant="primary") | |
| cmd_history_btn = gr.Button("π History") | |
| cmd_output = gr.JSON(label="Command Results") | |
| cmd_explain = gr.Markdown("### Command Explanation") | |
| execute_btn.click( | |
| self.execute_command, | |
| inputs=cmd_input, | |
| outputs=[cmd_output, cmd_explain] | |
| ) | |
| def create_interface(self): | |
| with gr.Blocks(theme=self.theme, title="Data Deity") as interface: | |
| gr.Markdown("# π§ Data Deity - Ultimate Data Omnipotence Interface") | |
| with gr.Tabs(): | |
| file_input = self._file_upload_tab() | |
| self._data_explorer_tab() | |
| self._command_interface_tab() | |
| return interface | |
| def process_file(self, files): | |
| try: | |
| processed_files = [] | |
| for file in files: | |
| self.app.run(file) | |
| processed_files.append({ | |
| "filename": file.name, | |
| "chunks": self.app.processed_data['chunk_count'], | |
| "status": "processed", | |
| "timestamp": datetime.datetime.now().isoformat() | |
| }) | |
| stats = { | |
| "total_files": len(processed_files), | |
| "total_chunks": sum(f['chunks'] for f in processed_files), | |
| "average_size": f"{sum(f.size for f in files)/1024/1024:.2f}MB" | |
| } | |
| preview = pd.DataFrame({ | |
| "File": [f.name for f in files], | |
| "Type": [f.name.split('.')[-1] for f in files], | |
| "Status": ["β Processed"]*len(files) | |
| }) | |
| return stats, preview | |
| except Exception as e: | |
| return {"error": str(e)}, pd.DataFrame() | |
| def refresh_data(self): | |
| try: | |
| files = self.app.db_manager.cursor.execute("SELECT * FROM files").fetchall() | |
| metadata = self.app.db_manager.cursor.execute("SELECT * FROM metadata").fetchall() | |
| chunks = self.app.db_manager.cursor.execute("SELECT * FROM chunks").fetchall() | |
| files_df = pd.DataFrame(files, columns=["ID", "Filename", "Original", "Path", "Size", "Type", "Uploaded"]) | |
| metadata_df = pd.DataFrame(metadata, columns=["ID", "File ID", "Key", "Value"]) | |
| chunks_df = pd.DataFrame(chunks, columns=["ID", "File ID", "Index", "Text", "Size"]) | |
| return files_df, metadata_df, chunks_df | |
| except Exception as e: | |
| return pd.DataFrame(), pd.DataFrame(), pd.DataFrame() | |
| def execute_command(self, command): | |
| try: | |
| if "list files" in command.lower(): | |
| files = self.app.db_manager.cursor.execute("SELECT filename, file_type, upload_date FROM files").fetchall() | |
| return {"result": files}, "### File Listing Command\nRetrieved all stored files from database." | |
| elif "search" in command.lower(): | |
| term = command.split("search")[1].strip() | |
| results = self.app.db_manager.cursor.execute( | |
| "SELECT chunk_text FROM chunks WHERE chunk_text LIKE ?", | |
| (f"%{term}%",) | |
| ).fetchall() | |
| return {"matches": [r[0] for r in results]}, f"### Search Results\nFound {len(results)} matches for '{term}'" | |
| else: | |
| return {"error": "Command not recognized"}, "### Unrecognized Command\nTry 'list files' or 'search <term>'" | |
| except Exception as e: | |
| return {"error": str(e)}, "### Command Execution Failed" | |
| # --- Main Execution --- | |
| if __name__ == "__main__": | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s' | |
| ) | |
| try: | |
| app = Application() | |
| interface = DataDeityInterface(app) | |
| interface.create_interface().launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=True | |
| ) | |
| except KeyboardInterrupt: | |
| logging.info("\nApplication shutdown requested") | |
| finally: | |
| app.db_manager.close() | |