Dbgod / 2app.py
google-labs-jules[bot]
Fix SyntaxError in app.py and 2app.py
f4c4bbd
Raw
History Blame
16.9 kB
import platform
import os
import sqlite3
import uuid
import datetime
import shutil
import traceback
import logging
from pathlib import Path
from abc import ABC, abstractmethod
from typing import Dict, Any, List
import gradio as gr
import pandas as pd
# --- Base Classes ---
class Interface(ABC):
@abstractmethod
def launch(self):
pass
class Command(ABC):
@abstractmethod
def execute(self):
pass
# --- Database Manager Implementation ---
class DatabaseManager:
"""Handles all database operations including creation, connection, and CRUD operations."""
def __init__(self, db_path: str = None):
if db_path is None:
if platform.system() == 'Windows':
base_dir = os.path.join(os.environ['APPDATA'], 'FileStorageApp')
elif platform.system() == 'Darwin':
base_dir = os.path.join(os.path.expanduser('~'), 'Library', 'Application Support', 'FileStorageApp')
else:
base_dir = os.path.join(os.path.expanduser('~'), '.filestorage')
os.makedirs(base_dir, exist_ok=True)
self.db_path = os.path.join(base_dir, 'file_storage.db')
else:
self.db_path = db_path
self.conn = None
self.cursor = None
self.connect()
self.create_tables()
def connect(self) -> None:
"""Establish a connection to the SQLite database."""
try:
self.conn = sqlite3.connect(self.db_path)
self.conn.execute("PRAGMA foreign_keys = ON")
self.cursor = self.conn.cursor()
except sqlite3.Error as e:
logging.error(f"Database connection error: {e}")
raise
def create_tables(self) -> None:
"""Create necessary tables if they don't exist."""
tables = [
'''CREATE TABLE IF NOT EXISTS files (
id INTEGER PRIMARY KEY AUTOINCREMENT,
filename TEXT NOT NULL,
original_filename TEXT NOT NULL,
file_path TEXT NOT NULL,
file_size INTEGER NOT NULL,
file_type TEXT,
upload_date DATETIME DEFAULT CURRENT_TIMESTAMP
)''',
'''CREATE TABLE IF NOT EXISTS metadata (
id INTEGER PRIMARY KEY AUTOINCREMENT,
file_id INTEGER NOT NULL,
key TEXT NOT NULL,
value TEXT,
FOREIGN KEY (file_id) REFERENCES files (id) ON DELETE CASCADE
)''',
'''CREATE TABLE IF NOT EXISTS chunks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
file_id INTEGER NOT NULL,
chunk_index INTEGER NOT NULL,
chunk_text TEXT NOT NULL,
chunk_size INTEGER NOT NULL,
FOREIGN KEY (file_id) REFERENCES files (id) ON DELETE CASCADE
)'''
]
try:
for table in tables:
self.cursor.execute(table)
self.conn.commit()
except sqlite3.Error as e:
self.conn.rollback()
logging.error(f"Error creating tables: {e}")
raise
def insert_file(self, file_data: Dict[str, Any]) -> int:
"""Insert file information into the database."""
try:
self.cursor.execute('''
INSERT INTO files (filename, original_filename, file_path, file_size, file_type)
VALUES (?, ?, ?, ?, ?)
''', (file_data['filename'], file_data['original_filename'],
file_data['file_path'], file_data['file_size'], file_data['file_type']))
self.conn.commit()
return self.cursor.lastrowid
except sqlite3.Error as e:
self.conn.rollback()
logging.error(f"Error inserting file: {e}")
raise
def insert_metadata(self, file_id: int, metadata: Dict[str, str]) -> None:
"""Insert metadata for a specific file."""
try:
for key, value in metadata.items():
self.cursor.execute('''
INSERT INTO metadata (file_id, key, value)
VALUES (?, ?, ?)
''', (file_id, key, value))
self.conn.commit()
except sqlite3.Error as e:
self.conn.rollback()
logging.error(f"Error inserting metadata: {e}")
raise
def insert_chunk(self, file_id: int, chunk_index: int, chunk_text: str) -> None:
"""Insert a text chunk into the database."""
try:
chunk_size = len(chunk_text.split())
self.cursor.execute('''
INSERT INTO chunks (file_id, chunk_index, chunk_text, chunk_size)
VALUES (?, ?, ?, ?)
''', (file_id, chunk_index, chunk_text, chunk_size))
self.conn.commit()
except sqlite3.Error as e:
self.conn.rollback()
logging.error(f"Error inserting chunk: {e}")
raise
def log_error(self, error_data: Dict[str, str]) -> None:
"""Log errors to the database."""
try:
self.cursor.execute('''
INSERT INTO metadata (file_id, key, value)
VALUES (?, ?, ?)
''', (-1, 'error', str(error_data)))
self.conn.commit()
except sqlite3.Error as e:
logging.error(f"Error logging error: {e}")
def close(self) -> None:
"""Close the database connection."""
if self.conn:
self.conn.close()
# --- File Processor Implementation ---
class FileProcessor:
"""Handles file uploads, storage, and metadata extraction."""
def __init__(self, upload_folder: str = None):
self.upload_folder = upload_folder or os.path.join(Path.home(), 'FileUploads')
os.makedirs(self.upload_folder, exist_ok=True)
def save_file(self, file: Any) -> Dict[str, Any]:
"""Save the uploaded file and extract metadata."""
filename = f"{uuid.uuid4()}_{file.name}"
file_path = os.path.join(self.upload_folder, filename)
try:
with open(file_path, "wb") as f:
f.write(file.read())
return {
'filename': filename,
'original_filename': file.name,
'file_path': file_path,
'file_size': os.path.getsize(file_path),
'file_type': file.name.split('.')[-1] if '.' in file.name else 'unknown'
}
except Exception as e:
logging.error(f"Error saving file: {e}")
raise
def extract_content(self, file_path: str) -> str:
"""Extract text content from a file."""
try:
with open(file_path, 'r', encoding='utf-8') as f:
return f.read()
except Exception as e:
logging.error(f"Error extracting content: {e}")
raise
# --- Text Chunker Implementation ---
class TextChunker:
"""Splits text content into manageable chunks."""
def __init__(self, chunk_size: int = 500, overlap: int = 50):
self.chunk_size = chunk_size
self.overlap = overlap
def chunk_text(self, text: str) -> List[str]:
"""Split text into chunks with overlap."""
words = text.split()
chunks = []
start = 0
while start < len(words):
end = start + self.chunk_size
chunks.append(' '.join(words[start:end]))
start = end - self.overlap
return chunks
# --- Command Handler Implementation ---
class CommandHandler:
"""Manages command execution."""
def __init__(self):
self.commands = {}
def register_command(self, name: str, command: Command):
self.commands[name] = command
def execute_command(self, name: str) -> bool:
if name in self.commands:
self.commands[name].execute()
return True
logging.warning(f"Command '{name}' not found.")
return False
# --- Main Application Implementation ---
class Application(Interface):
"""Core application class."""
def __init__(self):
self.db_manager = DatabaseManager()
self.file_processor = FileProcessor()
self.text_chunker = TextChunker(chunk_size=512, overlap=50)
self.command_handler = CommandHandler()
self.processed_data = None
def run(self, uploaded_file: Any) -> None:
"""Main processing pipeline."""
try:
if not uploaded_file:
raise ValueError("No file provided for processing")
# Process file
file_info = self.file_processor.save_file(uploaded_file)
file_id = self.db_manager.insert_file(file_info)
# Extract and chunk content
raw_content = self.file_processor.extract_content(file_info['file_path'])
chunks = self.text_chunker.chunk_text(raw_content)
# Store chunks and metadata
self.db_manager.insert_metadata(file_id, {
'source': 'upload',
'processed_at': datetime.datetime.now().isoformat()
})
for idx, chunk in enumerate(chunks):
self.db_manager.insert_chunk(file_id, idx+1, chunk)
self.processed_data = {
'filename': uploaded_file.name,
'chunk_count': len(chunks),
'status': 'processed'
}
except Exception as e:
self._handle_error(e)
raise
def _handle_error(self, error: Exception) -> None:
"""Centralized error handling."""
error_data = {
'timestamp': datetime.datetime.now().isoformat(),
'error_type': type(error).__name__,
'message': str(error),
'stack_trace': traceback.format_exc()
}
self.db_manager.log_error(error_data)
self.processed_data = {'status': 'failed'}
# --- Gradio Interface Implementation ---
class DataDeityInterface:
def __init__(self, app):
self.app = app
self._setup_theme()
def _setup_theme(self):
self.theme = gr.themes.Default(
primary_hue="emerald",
secondary_hue="teal",
font=[gr.themes.GoogleFont("Fira Code"), "Arial", "sans-serif"]
)
def _file_upload_tab(self):
with gr.Tab("πŸ“€ Upload & Process"):
with gr.Row():
file_input = gr.File(label="Drag files here", file_count="multiple")
stats_output = gr.JSON(label="Processing Stats")
with gr.Row():
process_btn = gr.Button("⚑ Process Files", variant="primary")
clear_btn = gr.Button("🧹 Clear Cache")
file_output = gr.Dataframe(label="File Contents Preview")
process_btn.click(
self.process_file,
inputs=file_input,
outputs=[stats_output, file_output]
)
clear_btn.click(lambda: None, outputs=[file_input, stats_output, file_output])
return file_input
def _data_explorer_tab(self):
with gr.Tab("πŸ” Data Explorer"):
with gr.Row():
refresh_btn = gr.Button("πŸ”„ Refresh Data", variant="secondary")
search_bar = gr.Textbox(placeholder="Search across all data...")
with gr.Tabs():
with gr.Tab("Database View"):
files_table = gr.Dataframe(label="Stored Files")
metadata_table = gr.Dataframe(label="File Metadata")
chunks_table = gr.Dataframe(label="Text Chunks")
with gr.Tab("Analytics View"):
stats_plot = gr.Plot(label="Data Distribution")
correlations = gr.Matrix(label="Data Correlations")
refresh_btn.click(
self.refresh_data,
outputs=[files_table, metadata_table, chunks_table]
)
def _command_interface_tab(self):
with gr.Tab("πŸ’» Command Console"):
cmd_input = gr.Textbox(
placeholder="Enter data command...",
lines=3,
max_lines=10
)
with gr.Row():
execute_btn = gr.Button("πŸš€ Execute", variant="primary")
cmd_history_btn = gr.Button("πŸ•’ History")
cmd_output = gr.JSON(label="Command Results")
cmd_explain = gr.Markdown("### Command Explanation")
execute_btn.click(
self.execute_command,
inputs=cmd_input,
outputs=[cmd_output, cmd_explain]
)
def create_interface(self):
with gr.Blocks(theme=self.theme, title="Data Deity") as interface:
gr.Markdown("# 🧠 Data Deity - Ultimate Data Omnipotence Interface")
with gr.Tabs():
file_input = self._file_upload_tab()
self._data_explorer_tab()
self._command_interface_tab()
return interface
def process_file(self, files):
try:
processed_files = []
for file in files:
self.app.run(file)
processed_files.append({
"filename": file.name,
"chunks": self.app.processed_data['chunk_count'],
"status": "processed",
"timestamp": datetime.datetime.now().isoformat()
})
stats = {
"total_files": len(processed_files),
"total_chunks": sum(f['chunks'] for f in processed_files),
"average_size": f"{sum(f.size for f in files)/1024/1024:.2f}MB"
}
preview = pd.DataFrame({
"File": [f.name for f in files],
"Type": [f.name.split('.')[-1] for f in files],
"Status": ["βœ… Processed"]*len(files)
})
return stats, preview
except Exception as e:
return {"error": str(e)}, pd.DataFrame()
def refresh_data(self):
try:
files = self.app.db_manager.cursor.execute("SELECT * FROM files").fetchall()
metadata = self.app.db_manager.cursor.execute("SELECT * FROM metadata").fetchall()
chunks = self.app.db_manager.cursor.execute("SELECT * FROM chunks").fetchall()
files_df = pd.DataFrame(files, columns=["ID", "Filename", "Original", "Path", "Size", "Type", "Uploaded"])
metadata_df = pd.DataFrame(metadata, columns=["ID", "File ID", "Key", "Value"])
chunks_df = pd.DataFrame(chunks, columns=["ID", "File ID", "Index", "Text", "Size"])
return files_df, metadata_df, chunks_df
except Exception as e:
return pd.DataFrame(), pd.DataFrame(), pd.DataFrame()
def execute_command(self, command):
try:
if "list files" in command.lower():
files = self.app.db_manager.cursor.execute("SELECT filename, file_type, upload_date FROM files").fetchall()
return {"result": files}, "### File Listing Command\nRetrieved all stored files from database."
elif "search" in command.lower():
term = command.split("search")[1].strip()
results = self.app.db_manager.cursor.execute(
"SELECT chunk_text FROM chunks WHERE chunk_text LIKE ?",
(f"%{term}%",)
).fetchall()
return {"matches": [r[0] for r in results]}, f"### Search Results\nFound {len(results)} matches for '{term}'"
else:
return {"error": "Command not recognized"}, "### Unrecognized Command\nTry 'list files' or 'search <term>'"
except Exception as e:
return {"error": str(e)}, "### Command Execution Failed"
# --- Main Execution ---
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
try:
app = Application()
interface = DataDeityInterface(app)
interface.create_interface().launch(
server_name="0.0.0.0",
server_port=7860,
share=True
)
except KeyboardInterrupt:
logging.info("\nApplication shutdown requested")
finally:
app.db_manager.close()