""" evaluate.py =========== Evaluates a single function + test pair. Can be run standalone or imported by pipeline.py. Standalone usage: python evaluate.py --csv functions.csv --out evaluation_results.json pip install coverage radon pytest """ import csv, json, sys, ast, copy, subprocess, tempfile, argparse from pathlib import Path ASSERT_METHODS = { "assertEqual", "assertNotEqual", "assertTrue", "assertFalse", "assertIn", "assertAlmostEqual", "assertRaises", "assertGreater", "assertLess", "assertIsNone", "assertIsNotNone", "assertNotIn" } AOR = {ast.Add: ast.Sub, ast.Sub: ast.Add, ast.Mult: ast.Div, ast.Div: ast.Mult, ast.Mod: ast.Add} ROR = {ast.Eq: ast.NotEq, ast.NotEq: ast.Eq, ast.Lt: ast.Gt, ast.Gt: ast.Lt, ast.LtE: ast.GtE, ast.GtE: ast.LtE} LCR = {ast.And: ast.Or, ast.Or: ast.And} def make_mutants(fn_code): tree = ast.parse(fn_code) mutants = [] def mutate(match_fn, replace_fn): for node in ast.walk(copy.deepcopy(tree)): if not match_fn(node): continue m = copy.deepcopy(tree) for n in ast.walk(m): if match_fn(n) and getattr(n, "lineno", -1) == getattr(node, "lineno", -2): replace_fn(n) break ast.fix_missing_locations(m) try: mutants.append(ast.unparse(m)) except Exception: pass mutate(lambda n: isinstance(n, ast.BinOp) and type(n.op) in AOR, lambda n: setattr(n, "op", AOR[type(n.op)]())) mutate(lambda n: isinstance(n, ast.Compare) and n.ops and type(n.ops[0]) in ROR, lambda n: n.ops.__setitem__(0, ROR[type(n.ops[0])]())) mutate(lambda n: isinstance(n, ast.BoolOp) and type(n.op) in LCR, lambda n: setattr(n, "op", LCR[type(n.op)]())) mutate(lambda n: isinstance(n, ast.Return) and n.value is not None, lambda n: setattr(n, "value", ast.Constant(value=None))) mutate(lambda n: isinstance(n, ast.Constant) and n.value in (0, 1), lambda n: setattr(n, "value", 1 - n.value)) return mutants def run_mutant(mutant_src, test_code, timeout=5): script = f""" import sys, unittest, io, types mod = types.ModuleType("target") exec(compile({repr(mutant_src)}, "", "exec"), mod.__dict__) sys.modules["target"] = mod tm = types.ModuleType("_tm") exec(compile({repr(test_code)}, "", "exec"), tm.__dict__) sys.modules["_tm"] = tm suite = unittest.TestLoader().loadTestsFromModule(tm) r = unittest.TextTestRunner(stream=io.StringIO(), verbosity=0).run(suite) print("KILLED" if (r.failures or r.errors) else "SURVIVED") """ try: r = subprocess.run([sys.executable, "-c", script], capture_output=True, text=True, timeout=timeout) if "KILLED" in r.stdout: return "killed" if "SURVIVED" in r.stdout: return "survived" return "killed" except subprocess.TimeoutExpired: return "timeout" def evaluate_one(fn_code, test_code): """ Evaluate a single function + test pair. Returns (cov, mut, sta, score). Importable by pipeline.py. """ with tempfile.TemporaryDirectory() as tmp: Path(tmp, "target.py").write_text(fn_code, encoding="utf-8") Path(tmp, "test_target.py").write_text(test_code, encoding="utf-8") # coverage subprocess.run( [sys.executable, "-m", "coverage", "run", "--branch", "--include=target.py", "-m", "pytest", "test_target.py", "-q", "--tb=no"], capture_output=True, text=True, cwd=tmp ) r2 = subprocess.run( [sys.executable, "-m", "coverage", "json", "-o", "cov.json"], capture_output=True, text=True, cwd=tmp ) cov = {} cov_file = Path(tmp, "cov.json") if not cov_file.exists(): cov["error"] = r2.stderr.strip() or "coverage failed" else: data = json.loads(cov_file.read_text()) fd = next((v for k, v in data["files"].items() if k.endswith("target.py")), None) if fd is None: cov["error"] = "target.py not in coverage report" else: s = fd["summary"] missing_br = fd.get("missing_branches", []) total_br = s.get("num_branches", 0) branch_pct = round(100 * (total_br - len(missing_br)) / total_br, 1) if total_br else 100.0 cov = { "line_coverage_pct": round(s.get("percent_covered", 0), 1), "branch_coverage_pct": branch_pct, "missing_lines": fd.get("missing_lines", []), "missing_branches": missing_br, } # mutation mutants = make_mutants(fn_code) killed, survived_list = 0, [] for mutant in mutants: outcome = run_mutant(mutant, test_code) if outcome in ("killed", "timeout"): killed += 1 else: survived_list.append(mutant) mut = { "total_mutants": len(mutants), "killed": killed, "survived": len(mutants) - killed, "mutation_score": round(100 * killed / len(mutants), 1) if mutants else 100.0, "survived_mutants": survived_list, } # static fn_tree = ast.parse(fn_code) fn_defs = [n for n in ast.walk(fn_tree) if isinstance(n, ast.FunctionDef)] cc_out = subprocess.run( [sys.executable, "-m", "radon", "cc", str(Path(tmp, "target.py")), "-j", "-s"], capture_output=True, text=True ) mi_out = subprocess.run( [sys.executable, "-m", "radon", "mi", str(Path(tmp, "target.py")), "-j"], capture_output=True, text=True ) cc = rank = mi = None try: entries = next(iter(json.loads(cc_out.stdout).values()), []) if entries: cc, rank = entries[0]["complexity"], entries[0]["rank"] except Exception: pass try: val = next(iter(json.loads(mi_out.stdout).values()), {}) mi = round(val.get("mi", 0), 1) except Exception: pass if cc is None: BRANCH_TYPES = (ast.If, ast.For, ast.While, ast.ExceptHandler, ast.comprehension) cc = 1 for fn in fn_defs: for n in ast.walk(fn): if isinstance(n, BRANCH_TYPES): cc += 1 elif isinstance(n, ast.BoolOp): cc += len(n.values) - 1 rank = "A" if cc <= 5 else "B" if cc <= 10 else "C" test_tree = ast.parse(test_code) test_fns = [n for n in ast.walk(test_tree) if isinstance(n, ast.FunctionDef) and n.name.startswith("test")] assertions = {} for fn in test_fns: count = 0 for n in ast.walk(fn): if isinstance(n, ast.Assert): count += 1 elif isinstance(n, ast.Call) and isinstance(n.func, ast.Attribute): if n.func.attr in ASSERT_METHODS: count += 1 assertions[fn.name] = count total_asrt = sum(assertions.values()) sta = { "cyclomatic_complexity": cc, "complexity_rank": rank, "num_parameters": len(fn_defs[0].args.args) if fn_defs else 0, "num_raises": sum(1 for n in ast.walk(fn_tree) if isinstance(n, ast.Raise)), "num_loops": sum(1 for n in ast.walk(fn_tree) if isinstance(n, (ast.For, ast.While))), "total_test_functions": len(test_fns), "total_assertions": total_asrt, "assertion_density": round(total_asrt / len(test_fns), 2) if test_fns else 0.0, "assertions_per_test": assertions, "dead_tests": [name for name, count in assertions.items() if count == 0], "coverage_adequacy_ratio": round(len(test_fns) / cc, 2) if cc else 1.0, } if mi is not None: sta["maintainability_index"] = mi # composite score: line 35% + branch 35% + mutation 20% + assertion density 10% line_pct = cov.get("line_coverage_pct", 0) if "error" not in cov else 0 branch_pct = cov.get("branch_coverage_pct", 0) if "error" not in cov else 0 score = round( line_pct * 0.35 + branch_pct * 0.35 + mut["mutation_score"] * 0.20 + min(sta["assertion_density"] * 10, 10), 1 ) return cov, mut, sta, score if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--csv", default="functions.csv") parser.add_argument("--out", default="evaluation_results.json") args = parser.parse_args() rows = [] with open(args.csv, newline="", encoding="utf-8") as f: for row in csv.DictReader(f): rows.append({ "function_code": row["function_code"].strip(), "test_code": row["test_code"].strip(), }) print(f"Loaded {len(rows)} functions from {args.csv}\n") results = [] for i, row in enumerate(rows): fn_code = row["function_code"] test_code = row["test_code"] try: fn_name = next(n.name for n in ast.walk(ast.parse(fn_code)) if isinstance(n, ast.FunctionDef)) except Exception: fn_name = f"function_{i + 1}" print(f"[{i+1}/{len(rows)}] {fn_name}") cov, mut, sta, score = evaluate_one(fn_code, test_code) if "error" in cov: print(f" coverage : ERROR - {cov['error'][:80]}") else: print(f" coverage : line={cov['line_coverage_pct']}% branch={cov['branch_coverage_pct']}%") print(f" mutation : {mut['mutation_score']}% ({mut['killed']}/{mut['total_mutants']} killed)") print(f" static : CC={sta['cyclomatic_complexity']} rank={sta['complexity_rank']}") print(f" score : {score}/100\n") results.append({ "function_name": fn_name, "function_code": fn_code, "test_code": test_code, "coverage": cov, "mutation": mut, "static": sta, "score": score, }) Path(args.out).write_text(json.dumps(results, indent=2)) print(f"Saved -> {args.out}") print("\n" + "=" * 65) print(f" {'function':<22} {'line%':>6} {'branch%':>8} {'mut%':>6} {'score':>6}") print(" " + "-" * 63) for r in results: c = r["coverage"] line_pct = c.get("line_coverage_pct", "ERR") if "error" not in c else "ERR" branch_pct = c.get("branch_coverage_pct", "ERR") if "error" not in c else "ERR" print(f" {r['function_name']:<22}" f" {line_pct:>5}%" f" {branch_pct:>6}%" f" {r['mutation']['mutation_score']:>5}%" f" {r['score']:>5}/100") print("=" * 65)