#!/usr/bin/env python3 """ Parallel Processing Performance Diagnostic Tool Test various theories about why HuggingFace deployment is slow """ import time import threading import multiprocessing import concurrent.futures import os import sys import psutil import torch from pathlib import Path def test_basic_multiprocessing(): """Test 1: Basic multiprocessing capability""" print("=== TEST 1: Basic Multiprocessing ===") def simple_task(n): return n * n # Sequential start = time.time() results_seq = [simple_task(i) for i in range(100)] seq_time = time.time() - start print(f"Sequential: {seq_time:.3f}s") # Parallel start = time.time() with multiprocessing.Pool(processes=4) as pool: results_par = pool.map(simple_task, range(100)) par_time = time.time() - start print(f"Parallel (4 workers): {par_time:.3f}s") print(f"Speedup: {seq_time/par_time:.2f}x") print() def test_thread_vs_process(): """Test 2: Threading vs Processing""" print("=== TEST 2: Threading vs Processing ===") def cpu_task(n): # CPU intensive task total = 0 for i in range(n * 1000): total += i * i return total tasks = [1000] * 8 # Sequential start = time.time() seq_results = [cpu_task(t) for t in tasks] seq_time = time.time() - start print(f"Sequential: {seq_time:.3f}s") # Threading start = time.time() with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: thread_results = list(executor.map(cpu_task, tasks)) thread_time = time.time() - start print(f"ThreadPool: {thread_time:.3f}s, speedup: {seq_time/thread_time:.2f}x") # Processing start = time.time() with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor: process_results = list(executor.map(cpu_task, tasks)) process_time = time.time() - start print(f"ProcessPool: {process_time:.3f}s, speedup: {seq_time/process_time:.2f}x") print() def test_gpu_access(): """Test 3: GPU sharing capability""" print("=== TEST 3: GPU Access ===") if not torch.cuda.is_available(): print("No CUDA available - skipping GPU test") print() return def gpu_task(worker_id): try: device = torch.device("cuda") # Create a small tensor operation x = torch.randn(1000, 1000, device=device) y = torch.randn(1000, 1000, device=device) start = time.time() for _ in range(10): z = torch.mm(x, y) duration = time.time() - start return f"Worker {worker_id}: {duration:.3f}s" except Exception as e: return f"Worker {worker_id}: ERROR - {e}" # Sequential GPU access start = time.time() seq_results = [gpu_task(i) for i in range(4)] seq_time = time.time() - start print("Sequential GPU:") for result in seq_results: print(f" {result}") print(f"Total sequential time: {seq_time:.3f}s") # Parallel GPU access start = time.time() with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: par_results = list(executor.map(gpu_task, range(4))) par_time = time.time() - start print("Parallel GPU:") for result in par_results: print(f" {result}") print(f"Total parallel time: {par_time:.3f}s") print() def test_model_loading(): """Test 4: Model loading overhead""" print("=== TEST 4: Model Loading Simulation ===") # Simulate loading a heavy model def load_model(): # Simulate model loading time time.sleep(0.5) # 500ms loading time return {"model": "loaded", "size": "large"} def task_with_model_loading(worker_id): start = time.time() model = load_model() # Each worker loads model processing_time = 0.1 # Simulate 100ms processing time.sleep(processing_time) total_time = time.time() - start return f"Worker {worker_id}: {total_time:.3f}s" # Test with model loading per worker print("Each worker loads model:") start = time.time() with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: results = list(executor.map(task_with_model_loading, range(4))) total_time = time.time() - start for result in results: print(f" {result}") print(f"Total time with per-worker loading: {total_time:.3f}s") # Compare with shared model (simulation) shared_load_time = 0.5 # Load once processing_time = 0.1 * 4 # Process 4 items sequentially simulated_shared_time = shared_load_time + processing_time print(f"Simulated shared model time: {simulated_shared_time:.3f}s") print(f"Overhead from per-worker loading: {total_time - simulated_shared_time:.3f}s") print() def test_environment_info(): """Test 5: Environment information""" print("=== TEST 5: Environment Info ===") print(f"Python version: {sys.version}") print(f"Platform: {sys.platform}") print(f"CPU cores: {multiprocessing.cpu_count()}") print(f"CPU usage: {psutil.cpu_percent()}%") print(f"Memory: {psutil.virtual_memory().percent}% used") if torch.cuda.is_available(): print(f"CUDA available: Yes") print(f"CUDA devices: {torch.cuda.device_count()}") print(f"Current device: {torch.cuda.current_device()}") print(f"Device name: {torch.cuda.get_device_name()}") if hasattr(torch.cuda, 'memory_summary'): print("GPU Memory:") print(torch.cuda.memory_summary(abbreviated=True)) else: print("CUDA available: No") # Check for environment variables that might affect multiprocessing mp_vars = [ 'OMP_NUM_THREADS', 'MKL_NUM_THREADS', 'OPENBLAS_NUM_THREADS', 'VECLIB_MAXIMUM_THREADS', 'NUMEXPR_NUM_THREADS' ] print("Threading environment variables:") for var in mp_vars: value = os.environ.get(var, 'Not set') print(f" {var}: {value}") print() def test_worker_creation(): """Test 6: Worker creation monitoring""" print("=== TEST 6: Worker Creation ===") def monitored_task(worker_id): pid = os.getpid() tid = threading.get_ident() return f"Worker {worker_id}: PID={pid}, TID={tid}" print("Main process:") print(f" PID: {os.getpid()}") print(f" TID: {threading.get_ident()}") print("ThreadPoolExecutor workers:") with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: results = list(executor.map(monitored_task, range(4))) for result in results: print(f" {result}") print("ProcessPoolExecutor workers:") with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor: results = list(executor.map(monitored_task, range(4))) for result in results: print(f" {result}") print() def main(): print("🔍 Parallel Processing Diagnostic Tool") print("=" * 50) print() test_environment_info() test_basic_multiprocessing() test_thread_vs_process() test_gpu_access() test_model_loading() test_worker_creation() print("🏁 Diagnostic complete!") print() print("ANALYSIS:") print("- If basic multiprocessing is slow: Environment blocks parallelism") print("- If threading faster than processing: Use ThreadPoolExecutor") print("- If GPU parallel time >> sequential: GPU contention issue") print("- If model loading overhead high: Need model sharing strategy") print("- If same PID for all workers: Using threads, not processes") if __name__ == "__main__": main()