import json import os import re import time import logging import mimetypes import zipfile import tempfile import chardet import io import csv import xml.etree.ElementTree as ET from datetime import datetime from typing import List, Dict, Optional, Union, Tuple, Any from pathlib import Path from urllib.parse import urlparse, urljoin import requests import validators import gradio as gr from diskcache import Cache from bs4 import BeautifulSoup from fake_useragent import UserAgent from cleantext import clean import qrcode from PIL import Image, ImageDraw, ImageFont import numpy as np import tarfile import gzip import math import random import pandas as pd from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry # Setup enhanced logging with more detailed formatting logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler('app.log', encoding='utf-8') ] ) logger = logging.getLogger(__name__) # Conditional imports for document processing try: from PyPDF2 import PdfReader PDF_SUPPORT = True except ImportError: PDF_SUPPORT = False logger.warning("PyPDF2 not installed. PDF file processing will be limited.") try: from docx import Document DOCX_SUPPORT = True except ImportError: DOCX_SUPPORT = False logger.warning("python-docx not installed. DOCX file processing will be limited.") try: from pyth.plugins.plaintext.writer import PlaintextWriter from pyth.plugins.rtf15.reader import Rtf15Reader RTF_SUPPORT = True except ImportError: RTF_SUPPORT = False logger.warning("pyth not installed. RTF file processing will be limited.") try: from odf.opendocument import OpenDocumentText from odf import text as odftext ODT_SUPPORT = True except ImportError: ODT_SUPPORT = False logger.warning("odfpy not installed. ODT file processing will be limited.") # Ensure output directories exist with modern structure OUTPUTS_DIR = Path('output') QR_CODES_DIR = OUTPUTS_DIR / 'qr_codes' TEMP_DIR = OUTPUTS_DIR / 'temp' for directory in [OUTPUTS_DIR, QR_CODES_DIR, TEMP_DIR]: directory.mkdir(parents=True, exist_ok=True) class EnhancedURLProcessor: """Advanced URL processing with enhanced content extraction and recursive link following.""" def __init__(self): self.session = requests.Session() retry_strategy = Retry( total=3, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504], allowed_methods=["HEAD", "GET"] ) adapter = HTTPAdapter(max_retries=retry_strategy) self.session.mount("http://", adapter) self.session.mount("https://", adapter) self.user_agent = UserAgent() self.timeout = 15 # seconds def validate_url(self, url: str) -> Dict[str, Any]: """Enhanced URL validation with accessibility check.""" if not validators.url(url): return {'is_valid': False, 'message': 'Invalid URL format', 'details': 'URL must begin with http:// or https://'} parsed = urlparse(url) if not all([parsed.scheme, parsed.netloc]): return {'is_valid': False, 'message': 'Incomplete URL', 'details': 'Missing scheme or domain'} try: headers = {'User-Agent': self.user_agent.random} response = self.session.head(url, timeout=self.timeout, headers=headers, allow_redirects=True) response.raise_for_status() content_type = response.headers.get('Content-Type', '').split(';')[0].strip() return { 'is_valid': True, 'message': 'URL is valid and accessible', 'details': { 'final_url': response.url, 'content_type': content_type, 'server': response.headers.get('Server', 'N/A'), 'size': response.headers.get('Content-Length', 'N/A') } } except requests.exceptions.RequestException as e: return {'is_valid': False, 'message': 'URL not accessible', 'details': str(e)} except Exception as e: logger.error(f"Unexpected error during URL validation for {url}: {e}") return {'is_valid': False, 'message': 'Unexpected validation error', 'details': str(e)} def fetch_content(self, url: str, retry_count: int = 0) -> Optional[Dict[str, Any]]: """Enhanced content fetcher with retry mechanism and complete character extraction.""" try: logger.info(f"Fetching content from URL: {url} (Attempt {retry_count + 1})") headers = {'User-Agent': self.user_agent.random} response = self.session.get(url, timeout=self.timeout, headers=headers, allow_redirects=True) response.raise_for_status() final_url = response.url content_type = response.headers.get('Content-Type', '').split(';')[0].strip() encoding = response.encoding if encoding is None or encoding == 'ISO-8859-1': encoding_detection = chardet.detect(response.content) encoding = encoding_detection['encoding'] or 'utf-8' logger.debug(f"Chardet detected encoding: {encoding} for {url}") else: encoding = 'utf-8' raw_content = response.content.decode(encoding, errors='replace') metadata = { 'original_url': url, 'final_url': final_url, 'timestamp': datetime.now().isoformat(), 'detected_encoding': encoding, 'content_type': content_type, 'content_length': len(response.content), 'headers': dict(response.headers), 'status_code': response.status_code } processed_extraction = self._process_web_content(raw_content, metadata['content_type'], final_url) return { 'source': 'url', 'url': url, 'raw_content': raw_content, 'metadata': metadata, 'extracted_data': processed_extraction['data'], 'processing_notes': processed_extraction['notes'] } except requests.exceptions.RequestException as e: logger.error(f"Failed to fetch content from {url}: {e}") return { 'source': 'url', 'url': url, 'raw_content': None, 'metadata': {'original_url': url, 'timestamp': datetime.now().isoformat(), 'status_code': getattr(e.response, 'status_code', None)}, 'extracted_data': None, 'processing_notes': [f"Failed to fetch content: {str(e)}"] } except Exception as e: logger.error(f"Unexpected error while fetching or processing URL {url}: {e}") return { 'source': 'url', 'url': url, 'raw_content': raw_content if 'raw_content' in locals() else None, 'metadata': metadata if 'metadata' in locals() else {'original_url': url, 'timestamp': datetime.now().isoformat(), 'status_code': None}, 'extracted_data': None, 'processing_notes': [f"Unexpected processing error: {str(e)}"] } def _process_web_content(self, content: str, content_type: str, base_url: str) -> Dict[str, Any]: """Process content based on detected content type""" lower_content_type = content_type.lower() notes = [] extracted_data: Any = None try: if 'text/html' in lower_content_type: logger.debug(f"Processing HTML content from {base_url}") extracted_data = self._process_html_content_enhanced(content, base_url) notes.append("Processed as HTML") elif 'application/json' in lower_content_type or 'text/json' in lower_content_type: logger.debug(f"Processing JSON content from {base_url}") try: extracted_data = json.loads(content) notes.append("Parsed as JSON") except json.JSONDecodeError as e: extracted_data = content notes.append(f"Failed to parse as JSON: {e}") logger.warning(f"Failed to parse JSON from {base_url}: {e}") elif 'application/xml' in lower_content_type or 'text/xml' in lower_content_type or lower_content_type.endswith('+xml'): logger.debug(f"Processing XML content from {base_url}") try: root = ET.fromstring(content) xml_text = ET.tostring(root, encoding='unicode', method='xml') extracted_data = xml_text notes.append("Parsed as XML (text representation)") except ET.ParseError as e: extracted_data = content notes.append(f"Failed to parse as XML: {e}") logger.warning(f"Failed to parse XML from {base_url}: {e}") elif 'text/plain' in lower_content_type or 'text/' in lower_content_type: logger.debug(f"Processing Plain Text content from {base_url}") extracted_data = content notes.append("Processed as Plain Text") else: logger.debug(f"Unknown content type '{content_type}' from {base_url}. Storing raw content.") extracted_data = content notes.append(f"Unknown content type '{content_type}'. Stored raw text.") except Exception as e: logger.error(f"Unexpected error in _process_web_content for {base_url} ({content_type}): {e}") extracted_data = content notes.append(f"Unexpected processing error: {e}. Stored raw text.") return {'data': extracted_data, 'notes': notes} def _process_html_content_enhanced(self, content: str, base_url: str) -> Dict[str, Any]: """Process HTML content, preserving text, and extracting metadata and links.""" extracted: Dict[str, Any] = { 'title': None, 'meta_description': None, 'full_text': "", 'links': [], 'images': [], 'media': [] } try: soup = BeautifulSoup(content, 'html.parser') if soup.title and soup.title.string: extracted['title'] = soup.title.string.strip() meta_desc = soup.find('meta', attrs={'name': 'description'}) if meta_desc and meta_desc.get('content'): extracted['meta_description'] = meta_desc['content'].strip() unique_links = set() for a_tag in soup.find_all('a', href=True): href = a_tag['href'].strip() if href and not href.startswith(('#', 'mailto:', 'tel:', 'javascript:')): text = a_tag.get_text().strip() try: absolute_url = urljoin(base_url, href) if absolute_url not in unique_links: extracted['links'].append({'text': text, 'url': absolute_url}) unique_links.add(absolute_url) except Exception: if validators.url(href) and href not in unique_links: extracted['links'].append({'text': text, 'url': href}) unique_links.add(href) unique_images = set() for img_tag in soup.find_all('img', src=True): src = img_tag['src'].strip() alt = img_tag.get('alt', '').strip() if src and src not in unique_images: absolute_url = urljoin(base_url, src) extracted['images'].append({'src': absolute_url, 'alt': alt}) unique_images.add(src) soup_copy = BeautifulSoup(content, 'html.parser') for script_or_style in soup_copy(["script", "style"]): script_or_style.extract() text = soup_copy.get_text(separator='\n') lines = text.splitlines() cleaned_lines = [line.strip() for line in lines if line.strip()] extracted['full_text'] = '\n'.join(cleaned_lines) except Exception as e: logger.error(f"Enhanced HTML processing error for {base_url}: {e}") soup_copy = BeautifulSoup(content, 'html.parser') for script_or_style in soup_copy(["script", "style"]): script_or_style.extract() extracted['full_text'] = soup_copy.get_text(separator='\n').strip() extracted['processing_error'] = f"Enhanced HTML processing failed: {e}" return extracted def fetch_content_with_depth(self, url: str, max_steps: int = 0) -> Dict[str, Any]: """Fetches content from a URL and recursively follows links up to max_steps depth.""" if not isinstance(max_steps, int) or not (0 <= max_steps <= 10): logger.error(f"Invalid max_steps value: {max_steps}. Must be an integer between 0 and 10.") return { 'url': url, 'level': 0, 'fetch_result': None, 'linked_extractions': [], 'processing_notes': [f"Invalid max_steps value: {max_steps}."] } validation_result = self.validate_url(url) if not validation_result['is_valid']: logger.error(f"Initial URL validation failed for {url}: {validation_result['message']}") return { 'url': url, 'level': 0, 'fetch_result': None, 'linked_extractions': [], 'processing_notes': [f"Initial URL validation failed: {validation_result['message']}"] } visited_urls = set() return self._fetch_content_recursive(url, max_steps, current_step=0, visited_urls=visited_urls) def _fetch_content_recursive(self, url: str, max_steps: int, current_step: int, visited_urls: set) -> Dict[str, Any]: """Recursive helper function to fetch content and follow links.""" if current_step > max_steps: logger.debug(f"Depth limit ({max_steps}) reached for {url} at level {current_step}.") return { 'url': url, 'level': current_step, 'fetch_result': None, 'linked_extractions': [], 'processing_notes': [f"Depth limit ({max_steps}) reached."] } normalized_url = url.rstrip('/') if normalized_url in visited_urls: logger.debug(f"Skipping already visited URL: {url} at level {current_step}.") return { 'url': url, 'level': current_step, 'fetch_result': None, 'linked_extractions': [], 'processing_notes': ["URL already visited in this crawl."] } visited_urls.add(normalized_url) logger.info(f"Processing URL: {url} at level {current_step}/{max_steps}") fetch_result = self.fetch_content(url) linked_extractions: List[Dict[str, Any]] = [] if fetch_result and fetch_result.get('extracted_data') and 'text/html' in fetch_result.get('metadata', {}).get('content_type', '').lower(): extracted_data = fetch_result['extracted_data'] links = extracted_data.get('links', []) logger.info(f"Found {len(links)} potential links on {url} at level {current_step}.") if current_step < max_steps: for link_info in links: linked_url = link_info.get('url') if linked_url: base_domain = urlparse(url).netloc linked_domain = urlparse(linked_url).netloc if linked_domain and linked_domain != base_domain: logger.debug(f"Skipping external link: {linked_url}") continue linked_result = self._fetch_content_recursive(linked_url, max_steps, current_step + 1, visited_urls) if linked_result: linked_extractions.append(linked_result) current_notes = fetch_result.get('processing_notes', []) if fetch_result else ['Fetch failed.'] if f"Processed at level {current_step}" not in current_notes: current_notes.append(f"Processed at level {current_step}") return { 'url': url, 'level': current_step, 'fetch_result': fetch_result, 'linked_extractions': linked_extractions, 'processing_notes': current_notes } class EnhancedFileProcessor: """Advanced file processing with enhanced content extraction""" def __init__(self, max_file_size: int = 5 * 1024 * 1024 * 1024): self.max_file_size = max_file_size self.supported_extensions = { '.txt', '.md', '.csv', '.json', '.xml', '.html', '.htm', '.log', '.yml', '.yaml', '.ini', '.conf', '.cfg', '.pdf', '.doc', '.docx', '.rtf', '.odt', '.zip', '.tar', '.gz', '.bz2', '.7z', '.rar', } self.archive_extensions = {'.zip', '.tar', '.gz', '.bz2', '.7z', '.rar'} def process_file(self, file) -> List[Dict]: """Process uploaded file with enhanced error handling and complete extraction""" if not file or not hasattr(file, 'name'): logger.warning("Received invalid file object.") return [] dataset = [] file_path = Path(file.name) if not file_path.exists(): logger.error(f"File path does not exist: {file_path}") return [{ 'source': 'file', 'filename': file.name if hasattr(file, 'name') else 'unknown', 'file_size': None, 'extracted_data': None, 'processing_notes': ['File path does not exist.'] }] try: file_size = file_path.stat().st_size if file_size > self.max_file_size: logger.warning(f"File '{file_path.name}' size ({file_size} bytes) exceeds maximum allowed size ({self.max_file_size} bytes).") return [{ 'source': 'file', 'filename': file_path.name, 'file_size': file_size, 'extracted_data': None, 'processing_notes': ['File size exceeds limit.'] }] with tempfile.TemporaryDirectory() as temp_dir: temp_dir_path = Path(temp_dir) if file_path.suffix.lower() in self.archive_extensions: dataset.extend(self._process_archive(file_path, temp_dir_path)) elif file_path.suffix.lower() in self.supported_extensions: dataset.extend(self._process_single_file(file_path)) else: logger.warning(f"Unsupported file type for processing: '{file_path.name}'. Attempting to read as plain text.") try: content_bytes = file_path.read_bytes() encoding_detection = chardet.detect(content_bytes) encoding = encoding_detection['encoding'] or 'utf-8' raw_content = content_bytes.decode(encoding, errors='replace') dataset.append({ 'source': 'file', 'filename': file_path.name, 'file_size': file_size, 'mime_type': mimetypes.guess_type(file_path.name)[0] or 'unknown/unknown', 'extracted_data': {'plain_text': raw_content}, 'processing_notes': ['Processed as plain text (unsupported extension).'] }) except Exception as e: logger.error(f"Error reading or processing unsupported file '{file_path.name}' as text: {e}") dataset.append({ 'source': 'file', 'filename': file_path.name, 'file_size': file_size, 'mime_type': mimetypes.guess_type(file_path.name)[0] or 'unknown/unknown', 'extracted_data': None, 'processing_notes': [f'Unsupported file type and failed to read as text: {e}'] }) except Exception as e: logger.error(f"Error processing file '{file_path.name}': {str(e)}") dataset.append({ 'source': 'file', 'filename': file_path.name, 'file_size': file_size if 'file_size' in locals() else None, 'extracted_data': None, 'processing_notes': [f'Overall file processing error: {str(e)}'] }) return dataset def _process_single_file(self, file_path: Path) -> List[Dict]: """Process a single file with enhanced character extraction and format-specific handling""" dataset_entries = [] filename = file_path.name file_size = file_path.stat().st_size mime_type, _ = mimetypes.guess_type(file_path) mime_type = mime_type or 'unknown/unknown' file_extension = file_path.suffix.lower() logger.info(f"Processing single file: '{filename}' ({mime_type}, {file_size} bytes)") raw_content: Optional[str] = None extracted_data: Any = None processing_notes: List[str] = [] try: content_bytes = file_path.read_bytes() encoding_detection = chardet.detect(content_bytes) encoding = encoding_detection['encoding'] or 'utf-8' raw_content = content_bytes.decode(encoding, errors='replace') is_explicit_json = mime_type == 'application/json' or file_extension == '.json' looks_like_json = raw_content.strip().startswith('{') or raw_content.strip().startswith('[') if is_explicit_json or looks_like_json: try: extracted_data = json.loads(raw_content) processing_notes.append("Parsed as JSON.") if not is_explicit_json: processing_notes.append("Note: Content looked like JSON despite extension/mime.") logger.warning(f"File '{filename}' identified as JSON content despite extension/mime.") mime_type = 'application/json' except json.JSONDecodeError as e: processing_notes.append(f"Failed to parse as JSON: {e}.") if is_explicit_json: logger.error(f"Explicit JSON file '{filename}' has invalid format: {e}") looks_like_xml = extracted_data is None and raw_content.strip().startswith('<') and raw_content.strip().endswith('>') is_explicit_xml = extracted_data is None and (mime_type in ('application/xml', 'text/xml') or mime_type.endswith('+xml') or file_extension in ('.xml', '.xsd')) if extracted_data is None and (is_explicit_xml or looks_like_xml): try: root = ET.fromstring(raw_content) extracted_data = ET.tostring(root, encoding='unicode', method='xml') processing_notes.append("Parsed as XML (text representation).") if not is_explicit_xml: processing_notes.append("Note: Content looked like XML despite extension/mime.") if 'xml' not in mime_type: mime_type = 'application/xml' except ET.ParseError as e: processing_notes.append(f"Failed to parse as XML: {e}.") if is_explicit_xml: logger.error(f"Explicit XML file '{filename}' has invalid format: {e}") is_explicit_csv = extracted_data is None and (mime_type == 'text/csv' or file_extension == '.csv') looks_like_csv = extracted_data is None and (',' in raw_content or ';' in raw_content) and ('\n' in raw_content or len(raw_content.splitlines()) > 1) if extracted_data is None and (is_explicit_csv or looks_like_csv): try: dialect = 'excel' try: sample = '\n'.join(raw_content.splitlines()[:10]) if sample: dialect = csv.Sniffer().sniff(sample).name logger.debug(f"Sniffer detected CSV dialect: {dialect} for '{filename}'") except csv.Error: logger.debug(f"Sniffer failed to detect dialect for '{filename}', using 'excel'.") dialect = 'excel' csv_reader = csv.reader(io.StringIO(raw_content), dialect=dialect) rows = list(csv_reader) if rows: max_rows_preview = 100 extracted_data = { 'headers': rows[0] if rows and rows[0] else None, 'rows': rows[1:max_rows_preview + 1] if len(rows) > 1 else [] } if len(rows) > max_rows_preview + 1: processing_notes.append(f"CSV data rows truncated to {max_rows_preview}.") processing_notes.append("Parsed as CSV.") if not is_explicit_csv: processing_notes.append("Note: Content looked like CSV despite extension/mime.") mime_type = 'text/csv' else: extracted_data = "Empty CSV" processing_notes.append("Parsed as empty CSV.") except Exception as e: processing_notes.append(f"Failed to parse as CSV: {e}.") logger.warning(f"Failed to parse CSV from '{filename}': {e}") if extracted_data is None: try: extracted_text = None if file_extension == '.pdf' and PDF_SUPPORT: with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_file: tmp_file.write(content_bytes) temp_path = Path(tmp_file.name) try: reader = PdfReader(temp_path) text_content = "".join(page.extract_text() or "" for page in reader.pages) extracted_text = text_content processing_notes.append("Extracted text from PDF.") finally: if temp_path.exists(): temp_path.unlink() elif file_extension == '.docx' and DOCX_SUPPORT: with tempfile.NamedTemporaryFile(delete=False, suffix='.docx') as tmp_file: tmp_file.write(content_bytes) temp_path = Path(tmp_file.name) try: document = Document(temp_path) text_content = "\n".join(paragraph.text for paragraph in document.paragraphs) extracted_text = text_content processing_notes.append("Extracted text from DOCX.") finally: if temp_path.exists(): temp_path.unlink() elif file_extension == '.rtf' and RTF_SUPPORT: try: doc = Rtf15Reader.read(io.StringIO(raw_content)) text_content = PlaintextWriter.write(doc).getvalue() extracted_text = text_content processing_notes.append("Extracted text from RTF.") except Exception as e: processing_notes.append(f"RTF extraction error: {e}") logger.warning(f"Failed to extract RTF text from '{filename}': {e}") elif file_extension == '.odt' and ODT_SUPPORT: with tempfile.NamedTemporaryFile(delete=False, suffix='.odt') as tmp_file: tmp_file.write(content_bytes) temp_path = Path(tmp_file.name) try: text_doc = OpenDocumentText(temp_path) paragraphs = text_doc.getElementsByType(odftext.P) text_content = "\n".join( "".join(node.text for node in p.childNodes) for p in paragraphs ) extracted_text = text_content processing_notes.append("Extracted text from ODT.") finally: if temp_path.exists(): temp_path.unlink() if extracted_text is not None: max_extracted_text_size = 10000 extracted_data = {'text': extracted_text[:max_extracted_text_size]} if len(extracted_text) > max_extracted_text_size: extracted_data['text'] += "..." processing_notes.append("Extracted text truncated.") except Exception as e: processing_notes.append(f"Error during document text extraction: {e}") logger.warning(f"Error during document text extraction for '{filename}': {e}") if extracted_data is None: extracted_data = {'plain_text': raw_content} processing_notes.append("Stored as plain text.") if mime_type in ['unknown/unknown', 'application/octet-stream']: guessed_text_mime, _ = mimetypes.guess_type('dummy.txt') if guessed_text_mime: mime_type = guessed_text_mime except Exception as e: logger.error(f"Fatal error processing single file '{filename}': {e}") processing_notes.append(f"Fatal processing error: {e}") raw_content = None extracted_data = None entry = { 'source': 'file', 'filename': filename, 'file_size': file_size, 'mime_type': mime_type, 'created': datetime.fromtimestamp(file_path.stat().st_ctime).isoformat() if file_path.exists() else None, 'modified': datetime.fromtimestamp(file_path.stat().st_mtime).isoformat() if file_path.exists() else None, 'raw_content': raw_content, 'extracted_data': extracted_data, 'processing_notes': processing_notes } dataset_entries.append(entry) return dataset_entries def _process_archive(self, archive_path: Path, extract_to: Path) -> List[Dict]: """Process an archive file with enhanced extraction""" dataset = [] archive_extension = archive_path.suffix.lower() logger.info(f"Processing archive: '{archive_path.name}'") try: if archive_extension == '.zip': if zipfile.is_zipfile(archive_path): with zipfile.ZipFile(archive_path, 'r') as zip_ref: for file_info in zip_ref.infolist(): if file_info.file_size > 0 and not file_info.filename.endswith('/'): sanitized_filename = Path(file_info.filename).name extracted_file_path = extract_to / sanitized_filename try: with zip_ref.open(file_info) as zf, open(extracted_file_path, 'wb') as outfile: outfile.write(zf.read()) if extracted_file_path.suffix.lower() in self.supported_extensions: if extracted_file_path.suffix.lower() in self.archive_extensions: dataset.extend(self._process_archive(extracted_file_path, extract_to)) else: dataset.extend(self._process_single_file(extracted_file_path)) except Exception as e: logger.warning(f"Error extracting/processing file '{file_info.filename}' from zip: {e}") finally: if extracted_file_path.exists(): extracted_file_path.unlink(missing_ok=True) else: logger.error(f"'{archive_path.name}' is not a valid zip file.") elif archive_extension in {'.tar', '.gz', '.tgz', '.tar.gz'}: # CORRECTED LINE BELOW — THIS WAS THE BUG mode = 'r:gz' if archive_extension in {'.gz', '.tgz', '.tar.gz'} else 'r' with tarfile.open(archive_path, mode) as tar_ref: for member in tar_ref.getmembers(): if member.isfile(): sanitized_filename = Path(member.name).name extracted_file_path = extract_to / sanitized_filename try: tf = tar_ref.extractfile(member) if tf: with open(extracted_file_path, 'wb') as outfile: outfile.write(tf.read()) if extracted_file_path.suffix.lower() in self.supported_extensions: if extracted_file_path.suffix.lower() in self.archive_extensions: dataset.extend(self._process_archive(extracted_file_path, extract_to)) else: dataset.extend(self._process_single_file(extracted_file_path)) except Exception as e: logger.warning(f"Error processing {member.name} in tar: {e}") finally: if extracted_file_path.exists(): extracted_file_path.unlink(missing_ok=True) elif archive_extension == '.gz' and not archive_path.name.endswith(('.tar.gz', '.tgz')): # Handle standalone .gz files (not .tar.gz) extracted_name = archive_path.stem extracted_path = extract_to / extracted_name try: with gzip.open(archive_path, 'rb') as gz_file, open(extracted_path, 'wb') as outfile: outfile.write(gz_file.read()) if extracted_path.suffix.lower() in self.supported_extensions: if extracted_path.suffix.lower() in self.archive_extensions: dataset.extend(self._process_archive(extracted_path, extract_to)) else: dataset.extend(self._process_single_file(extracted_path)) except Exception as e: logger.error(f"Error processing standalone GZIP file '{archive_path.name}': {e}") finally: if extracted_path.exists(): extracted_path.unlink(missing_ok=True) except Exception as e: logger.error(f"Archive processing failed: {e}") return dataset def chunk_data(self, data: Union[Dict, List], max_size: int = 2953) -> List[str]: """ Enhanced data chunking for QR codes with sequence metadata and start/end tags.""" try: json_str = json.dumps(data, ensure_ascii=False, separators=(',', ':')) json_bytes = json_str.encode('utf-8') total_bytes_length = len(json_bytes) MAX_OVERHEAD_PER_CHUNK_BYTES = 250 PRACTICAL_MAX_QR_CODE_BYTE_CAPACITY = 2900 effective_payload_bytes_per_chunk = PRACTICAL_MAX_QR_CODE_BYTE_CAPACITY - MAX_OVERHEAD_PER_CHUNK_BYTES if effective_payload_bytes_per_chunk <= 0: logger.error(f"Effective payload size is zero or negative. QR size ({PRACTICAL_MAX_QR_CODE_BYTE_CAPACITY}) is too small for metadata overhead ({MAX_OVERHEAD_PER_CHUNK_BYTES}).") return [] num_chunks = math.ceil(total_bytes_length / effective_payload_bytes_per_chunk) if total_bytes_length > 0 else 0 if num_chunks == 0: return [] chunks_for_qr: List[str] = [] current_byte_pos = 0 for i in range(num_chunks): end_byte_pos = min(current_byte_pos + effective_payload_bytes_per_chunk, total_bytes_length) chunk_data_bytes = json_bytes[current_byte_pos:end_byte_pos] chunk_data_str = chunk_data_bytes.decode('utf-8', errors='replace') chunk_dict = { "idx": i + 1, "tc": num_chunks, "tl": total_bytes_length, "hash": hash(chunk_data_bytes) & 0xFFFFFFFF, "data": chunk_data_str } inner_json_string = json.dumps(chunk_dict, ensure_ascii=False, separators=(',', ':')) final_qr_string = f"{{start{i + 1}}}{inner_json_string}{{end{i + 1}}}" encoded_final_qr_string_len = len(final_qr_string.encode('utf-8')) if encoded_final_qr_string_len > PRACTICAL_MAX_QR_CODE_BYTE_CAPACITY: logger.warning(f"Chunk {i + 1} exceeds estimated QR capacity. Actual: {encoded_final_qr_string_len} bytes, Target Max: {PRACTICAL_MAX_QR_CODE_BYTE_CAPACITY} bytes.") chunks_for_qr.append(final_qr_string) current_byte_pos = end_byte_pos logger.info(f"Chunked data into {num_chunks} chunks for QR codes.") return chunks_for_qr except Exception as e: logger.error(f"Error chunking data: {e}") return [] def generate_stylish_qr(data: Union[str, Dict], filename: str, size: int = 10, border: int = 4, fill_color: str = "#000000", back_color: str = "#FFFFFF") -> str: """Generate a stylish QR code with enhanced visual appeal""" try: qr = qrcode.QRCode( version=None, error_correction=qrcode.constants.ERROR_CORRECT_M, box_size=size, border=border ) if isinstance(data, dict): qr.add_data(json.dumps(data, ensure_ascii=False, separators=(',', ':'))) else: qr.add_data(str(data)) qr.make(fit=True) qr_image = qr.make_image(fill_color=fill_color, back_color=back_color) qr_image = qr_image.convert('RGBA') try: gradient = Image.new('RGBA', qr_image.size, (0, 0, 0, 0)) draw = ImageDraw.Draw(gradient) for i in range(qr_image.width): alpha = int(255 * (i / qr_image.width) * 0.05) draw.line([(i, 0), (i, qr_image.height)], fill=(0, 0, 0, alpha)) final_image = Image.alpha_composite(qr_image, gradient) except Exception as e: logger.warning(f"Failed to add gradient overlay to QR code: {e}. Using plain QR.") final_image = qr_image output_path = QR_CODES_DIR / filename final_image.save(output_path, quality=90) return str(output_path) except Exception as e: logger.error(f"QR generation error: {e}") return "" def generate_qr_codes(data: Union[str, Dict, List], combined: bool = True) -> List[str]: """Generate QR codes with enhanced visual appeal and metadata""" if not isinstance(data, (list, dict, str)): logger.error("generate_qr_codes received data that is not a list, dict, or string.") return [] try: file_processor = EnhancedFileProcessor() paths = [] if combined: chunks_of_combined_data = file_processor.chunk_data(data) if not chunks_of_combined_data: logger.warning("No chunks generated for combined data.") return [] for i, chunk_str in enumerate(chunks_of_combined_data): filename = f'combined_qr_{i + 1}_of_{len(chunks_of_combined_data)}_{int(time.time())}.png' qr_path = generate_stylish_qr(data=chunk_str, filename=filename, fill_color="#1a365d", back_color="#ffffff") if qr_path: paths.append(qr_path) else: if isinstance(data, list): for idx, item in enumerate(data): item_chunks = file_processor.chunk_data(item) if not item_chunks: logger.warning(f"No chunks generated for item {idx + 1}.") continue for chunk_idx, chunk_str in enumerate(item_chunks): filename = f'item_{idx + 1}_chunk_{chunk_idx + 1}_of_{len(item_chunks)}_{int(time.time())}.png' qr_path = generate_stylish_qr(data=chunk_str, filename=filename, fill_color="#1a365d", back_color="#ffffff") if qr_path: paths.append(qr_path) elif isinstance(data, (dict, str)): single_item_chunks = file_processor.chunk_data(data) if not single_item_chunks: logger.warning("No chunks generated for single item.") return [] for chunk_idx, chunk_str in enumerate(single_item_chunks): filename = f'single_item_chunk_{chunk_idx + 1}_of_{len(single_item_chunks)}_{int(time.time())}.png' qr_path = generate_stylish_qr(data=chunk_str, filename=filename, fill_color="#1a365d", back_color="#ffffff") if qr_path: paths.append(qr_path) logger.info(f"Generated {len(paths)} QR codes.") return paths except Exception as e: logger.error(f"An unexpected error occurred in generate_qr_codes: {e}") return [] def create_qr_zip(qr_paths: List[str]) -> Optional[str]: """Creates a zip archive from a list of QR code image paths.""" if not qr_paths: logger.warning("Attempted to create a zip archive, but no QR code paths were provided.") return None try: timestamp = int(time.time()) zip_filename = f"qr_code_collection_{timestamp}.zip" zip_filepath = TEMP_DIR / zip_filename with zipfile.ZipFile(zip_filepath, 'w') as zipf: for path_str in qr_paths: path = Path(path_str) if path.exists(): zipf.write(path, arcname=path.name) else: logger.warning(f"QR code file not found, skipping: {path_str}") logger.info(f"Successfully created QR code zip archive: {zip_filepath}") return str(zip_filepath) except Exception as e: logger.error(f"Failed to create QR code zip archive: {e}") return None # ——— FIXED CSS: NO :fullscreen ERROR, FULLY WORKING ——— css = """ /* Modern, clean, professional design - FIXED fullscreen */ :root { --primary-color: #1e40af; --secondary-color: #1e293b; --accent-color: #3b82f6; --background-color: #f8fafc; --card-bg: #ffffff; --text-color: #1e293b; --border-color: #e2e8f0; } body, .gradio-container { background: var(--background-color) !important; color: var(--text-color); font-family: 'Segoe UI', system-ui, sans-serif; } /* Cards and containers */ .container, .block, .form { background: var(--card-bg) !important; border: 1px solid var(--border-color) !important; border-radius: 12px !important; padding: 1.5rem !important; box-shadow: 0 4px 20px rgba(0,0,0,0.08); margin-bottom: 1rem; } /* Buttons */ .gr-button, button { background: var(--primary-color) !important; color: white !important; border: none !important; border-radius: 8px !important; padding: 0.75rem 1.5rem !important; font-weight: 600 !important; transition: all 0.3s ease !important; } .gr-button:hover, button:hover { background: var(--accent-color) !important; transform: translateY(-2px); box-shadow: 0 8px 25px rgba(59,130,246,0.4); } /* Gallery */ .gallery { display: grid; grid-template-columns: repeat(auto-fit, minmax(180px, 1fr)); gap: 1.5rem; padding: 1rem; background: white; border-radius: 12px; border: 1px solid var(--border-color); } .gallery img { border-radius: 12px; border: 4px solid var(--border-color); transition: all 0.4s ease; box-shadow: 0 6px 20px rgba(0,0,0,0.1); } .gallery img:hover { transform: scale(1.08); border-color: var(--accent-color); box-shadow: 0 20px 40px rgba(59,130,246,0.5); } /* FULLSCREEN QR VIEWPORT - FIXED & WORKING */ #fullscreen-viewport-wrapper { transition: all 0.5s ease; } #fullscreen-viewport-wrapper.fullscreen-active { position: fixed !important; top: 0; left: 0; width: 100vw !important; height: 100vh !important; background: #0f172a !important; z-index: 99999 !important; padding: 4rem 2rem !important; overflow-y: auto; display: flex; flex-direction: column; align-items: center; } #fullscreen-viewport-wrapper.fullscreen-active .viewport-container { display: grid; grid-template-columns: repeat(auto-fit, minmax(320px, 1fr)); gap: 3rem; max-width: 1800px; margin: 2rem auto; } #fullscreen-viewport-wrapper.fullscreen-active img { max-width: 380px !important; max-height: 380px !important; border-radius: 20px; border: 6px solid var(--accent-color); box-shadow: 0 30px 60px rgba(59,130,246,0.7); } #fullscreen-viewport-wrapper.fullscreen-active .exit-button { position: fixed; top: 20px; right: 40px; background: #ef4444; color: white; padding: 1rem 2rem; border-radius: 50px; font-size: 1.2rem; z-index: 100000; cursor: pointer; } """ with gr.Blocks(title="Advanced Data Processor & QR Generator") as interface: # FIX: Ensure interface.head is initialized as a string before concatenation if interface.head is None: interface.head = "" interface.head += """ """ qr_code_paths = gr.State([]) chatbot_data = gr.State(None) gr.Markdown("""# Advanced Data Processing & QR Code Generator Transform your data into beautifully designed, sequenced QR codes.""") with gr.Row(): crawl_depth_slider = gr.Slider(label="Crawl Depth", minimum=0, maximum=10, value=0, step=1) with gr.Tab("URL Processing"): url_input = gr.Textbox(label="Enter URLs (comma or newline separated)", lines=5) with gr.Tab("File Input"): file_input = gr.File(label="Upload Files", file_count="multiple") with gr.Tab("JSON Input"): text_input = gr.TextArea(label="Direct JSON Input", lines=15) with gr.Row(): combine_data = gr.Checkbox(label="Combine all data into sequence", value=True) generate_qr_toggle = gr.Checkbox(label="Generate QR Codes", value=False) process_btn = gr.Button("Process & Generate QR", variant="primary") with gr.Row(): with gr.Column(scale=1): output_json = gr.JSON(label="Processed Data") with gr.Column(scale=1): output_gallery = gr.Gallery(label="Generated QR Codes", columns=None, height="auto", elem_classes=["gallery"]) download_qrs_btn = gr.Button("Download All QR Codes as ZIP") qr_zip_output = gr.File(label="Download QR Code ZIP") output_text = gr.Textbox(label="Processing Status", interactive=False, lines=8) with gr.Tab("QR Code Viewport") as viewport_tab: viewport_output = gr.HTML(label="QR Code Sequence Viewport") enabled_qr_codes = gr.State([]) def update_viewport(paths, enabled_states): if not paths: return "
No QR codes generated yet.
" html_content = '