Spaces:
Sleeping
Sleeping
| """ | |
| Setup: | |
| 1. Sign up at api.together.ai → API Keys → Create key | |
| 2. $env:TOGETHER_API_KEY="your-key" | |
| pip install openai | |
| python analyze.py --input evaluation_results.json --out analysis_results.json | |
| """ | |
| import json, os, sys, time, argparse | |
| from pathlib import Path | |
| from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeout | |
| from openai import OpenAI | |
| MODEL = "meta-llama/Llama-3.3-70B-Instruct-Turbo" | |
| API_TIMEOUT = 45 | |
| def get_client(): | |
| api_key = os.environ.get("TOGETHER_API_KEY") | |
| if not api_key: | |
| print("No TOGETHER_API_KEY found.") | |
| print("Sign up free at api.together.ai → API Keys → Create key") | |
| print("Then: $env:TOGETHER_API_KEY=\"your-key\"") | |
| sys.exit(1) | |
| return OpenAI( | |
| api_key=api_key, | |
| base_url="https://api.together.xyz/v1", | |
| ) | |
| def _run_with_timeout(fn, *args, timeout=API_TIMEOUT, **kwargs): | |
| """ | |
| Run fn(*args, **kwargs) in a thread with a hard timeout. | |
| Raises TimeoutError if it takes longer than timeout seconds. | |
| This is the only reliable way to stop a hanging HTTP call. | |
| """ | |
| with ThreadPoolExecutor(max_workers=1) as ex: | |
| future = ex.submit(fn, *args, **kwargs) | |
| try: | |
| return future.result(timeout=timeout) | |
| except FuturesTimeout: | |
| raise TimeoutError(f"API call hung for {timeout}s — skipping") | |
| def analyze_one(client, fn_name, fn_code, test_code, cov, mut, sta): | |
| """ | |
| Analyze a single function + test pair. | |
| Returns dict with score, problems, suggestions, missing_cases. | |
| Never hangs — times out after API_TIMEOUT seconds. | |
| Importable by build_dataset.py and pipeline.py. | |
| """ | |
| if "error" in cov: | |
| cov_text = f"Coverage error: {cov['error']}" | |
| else: | |
| cov_text = ( | |
| f"Line coverage : {cov.get('line_coverage_pct', 'N/A')}%\n" | |
| f"Branch coverage : {cov.get('branch_coverage_pct', 'N/A')}%\n" | |
| f"Missing lines : {cov.get('missing_lines', [])}\n" | |
| f"Missing branches: {cov.get('missing_branches', [])}" | |
| ) | |
| survived = mut.get("survived_mutants", []) | |
| survived_text = "" | |
| if survived: | |
| samples = "\n".join(f" - {m[:80]}" for m in survived[:5]) | |
| survived_text = f"\nSurviving mutants (sample):\n{samples}" | |
| mut_text = ( | |
| f"Total mutants : {mut.get('total_mutants', 'N/A')}\n" | |
| f"Killed : {mut.get('killed', 'N/A')}\n" | |
| f"Survived : {mut.get('survived', 'N/A')}\n" | |
| f"Mutation score: {mut.get('mutation_score', 'N/A')}%" | |
| f"{survived_text}" | |
| ) | |
| sta_text = ( | |
| f"Cyclomatic complexity : {sta.get('cyclomatic_complexity', 'N/A')} (rank {sta.get('complexity_rank', '?')})\n" | |
| f"Total test functions : {sta.get('total_test_functions', 'N/A')}\n" | |
| f"Total assertions : {sta.get('total_assertions', 'N/A')}\n" | |
| f"Assertion density : {sta.get('assertion_density', 'N/A')} per test\n" | |
| f"Dead tests (0 asserts) : {sta.get('dead_tests', [])}\n" | |
| f"Coverage adequacy ratio: {sta.get('coverage_adequacy_ratio', 'N/A')}" | |
| ) | |
| if "maintainability_index" in sta: | |
| sta_text += f"\nMaintainability index : {sta['maintainability_index']}" | |
| prompt = f"""You are a senior software engineer reviewing test quality. | |
| You are given a Python function, its test suite, and automated evaluation results. | |
| Your job: | |
| A) Give the test suite a quality SCORE from 0 to 100 | |
| B) List AT LEAST 3 specific PROBLEMS with the test suite | |
| C) Write AT LEAST 3 SUGGESTIONS as simple one-line test instructions that a junior | |
| developer could copy directly. Each suggestion must say exactly what to call and | |
| what to assert. Example format: "test that multiply(0, 5) returns 0" | |
| D) List AT LEAST 3 specific INPUT VALUES not yet tested | |
| Be strict. Score above 70 only if coverage, mutation score, and assertion density are all strong. | |
| FUNCTION: {fn_name} | |
| {fn_code} | |
| CURRENT TESTS: | |
| {test_code} | |
| [1] COVERAGE | |
| {cov_text} | |
| [2] MUTATION TESTING | |
| {mut_text} | |
| [3] STATIC ANALYSIS | |
| {sta_text} | |
| Respond ONLY with valid JSON, no extra text or markdown fences: | |
| {{ | |
| "score": <integer 0-100>, | |
| "score_reasoning": "<one sentence>", | |
| "problems": ["<problem 1>", "<problem 2>", "<problem 3>"], | |
| "suggestions": [ | |
| "test that {fn_name}(<specific input>) returns <expected value>", | |
| "test that {fn_name}(<specific input>) returns <expected value>", | |
| "test that {fn_name}(<specific input>) raises <exception> when <condition>" | |
| ], | |
| "missing_cases": ["<input value 1>", "<input value 2>", "<input value 3>"] | |
| }}""" | |
| messages = [ | |
| {"role": "system", "content": "You are a test quality expert. Respond with valid JSON only — no markdown, no extra text."}, | |
| {"role": "user", "content": prompt}, | |
| ] | |
| def _call(): | |
| return client.chat.completions.create( | |
| model=MODEL, messages=messages, max_tokens=1024, temperature=0.2 | |
| ) | |
| for attempt in range(2): | |
| try: | |
| # hard timeout — never hangs forever | |
| response = _run_with_timeout(_call, timeout=API_TIMEOUT) | |
| raw = response.choices[0].message.content.strip() | |
| if raw.startswith("```"): | |
| raw = raw.split("```")[1] | |
| if raw.startswith("json"): | |
| raw = raw[4:] | |
| return json.loads(raw.strip()) | |
| except TimeoutError as e: | |
| return {"error": f"timeout: {e}"} | |
| except json.JSONDecodeError as e: | |
| if attempt == 0: | |
| time.sleep(1) | |
| else: | |
| return {"error": f"JSON parse failed: {e}"} | |
| except Exception as e: | |
| return {"error": str(e)} | |
| return {"error": "failed after 2 attempts"} | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("--input", default="evaluation_results.json") | |
| parser.add_argument("--out", default="analysis_results.json") | |
| args = parser.parse_args() | |
| client = get_client() | |
| print(f"Model : {MODEL}") | |
| print(f"Timeout : {API_TIMEOUT}s per call\n") | |
| entries = json.loads(Path(args.input).read_text()) | |
| print(f"Loaded {len(entries)} functions from {args.input}\n") | |
| results = [] | |
| for i, entry in enumerate(entries): | |
| fn_name = entry.get("function_name", f"function_{i+1}") | |
| print(f"[{i+1}/{len(entries)}] {fn_name} ...") | |
| analysis = analyze_one( | |
| client, | |
| fn_name, | |
| entry["function_code"], | |
| entry["test_code"], | |
| entry["coverage"], | |
| entry["mutation"], | |
| entry["static"], | |
| ) | |
| if "error" in analysis: | |
| print(f" ERROR: {analysis['error']}") | |
| else: | |
| print(f" Score: {analysis.get('score', '?')}/100 | " | |
| f"Problems: {len(analysis.get('problems', []))}") | |
| for s in analysis.get("suggestions", []): | |
| print(f" - {s}") | |
| results.append({**entry, "llm_analysis": analysis}) | |
| time.sleep(0.5) | |
| Path(args.out).write_text(json.dumps(results, indent=2)) | |
| print(f"\nSaved -> {args.out}") | |
| print("\n" + "=" * 65) | |
| print(f" {'Function':<25} {'Score':>7} {'Problems':>9} {'Suggestions':>12}") | |
| print(" " + "-" * 63) | |
| for r in results: | |
| a = r["llm_analysis"] | |
| if "error" in a: | |
| print(f" {r.get('function_name', '?'):<25} ERROR: {a['error'][:40]}") | |
| continue | |
| print(f" {r.get('function_name', '?'):<25}" | |
| f" {a.get('score', '?'):>5}/100" | |
| f" {len(a.get('problems', [])):>8}" | |
| f" {len(a.get('suggestions', [])):>11}") | |
| print("=" * 65) |