File size: 8,138 Bytes
356fd47
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
"""
Guardrails Pipeline - Core Engine
====================================
Pipeline utama yang menggabungkan semua guards.

Author: Jekardah AI Lab
"""

from typing import Dict, List, Optional
from .guards import ToxicDetector, PIIDetector, InjectionDetector, TopicFilter, LanguageDetector


class GuardrailsPipeline:
    """
    Pipeline guardrails untuk Bahasa Indonesia.

    Menggabungkan berbagai guard untuk mengecek keamanan
    input dan output dari AI model.

    Usage:
        pipeline = GuardrailsPipeline()

        # Cek input
        result = pipeline.check_input("Apa itu fotosintesis?")
        print(result["safe"])  # True

        # Cek output
        result = pipeline.check_output(
            output_text="Nama saya AI assistant.",
            input_text="Siapa namamu?"
        )

        # Full pipeline
        result = pipeline.run(
            input_text="Jelaskan demokrasi",
            output_text="Demokrasi adalah..."
        )
    """

    def __init__(
        self,
        enable_toxic: bool = True,
        enable_pii: bool = True,
        enable_injection: bool = True,
        enable_topic: bool = True,
        enable_language: bool = True,
        sensitivity: str = "medium",
        language: str = "id",
    ):
        self.guards = {}
        self.sensitivity = sensitivity
        self.language = language

        if enable_toxic:
            self.guards["toxic"] = ToxicDetector(sensitivity=sensitivity)
        if enable_pii:
            self.guards["pii"] = PIIDetector()
        if enable_injection:
            self.guards["injection"] = InjectionDetector(sensitivity=sensitivity)
        if enable_topic:
            self.guards["topic"] = TopicFilter(sensitivity=sensitivity)
        if enable_language:
            self.guards["language"] = LanguageDetector()

    def check_input(self, text: str) -> Dict:
        """
        Cek keamanan input dari user.

        Returns:
            {
                "safe": bool,
                "input": str,
                "sanitized_input": str (PII removed),
                "violations": list,
                "guard_results": dict,
                "summary": str,
            }
        """
        results = {}
        all_violations = []
        is_safe = True

        # Run each guard
        if "toxic" in self.guards:
            r = self.guards["toxic"].check(text)
            results["toxic"] = r
            if not r["safe"]:
                is_safe = False
                all_violations.extend(r["violations"])

        if "injection" in self.guards:
            r = self.guards["injection"].check(text)
            results["injection"] = r
            if not r["safe"]:
                is_safe = False
                all_violations.extend(r["violations"])

        if "topic" in self.guards:
            r = self.guards["topic"].check(text)
            results["topic"] = r
            if not r["safe"]:
                is_safe = False
                all_violations.extend(r["violations"])

        if "language" in self.guards:
            r = self.guards["language"].check(text)
            results["language"] = r

        # PII detection (detect but don't block — just flag)
        sanitized = text
        if "pii" in self.guards:
            r = self.guards["pii"].scrub(text)
            results["pii"] = {
                "has_pii": r["pii_found"],
                "replacements": r["replacements"],
            }
            sanitized = r["scrubbed"]
            if r["pii_found"]:
                all_violations.append({
                    "type": "pii_detected",
                    "severity": "warning",
                    "detail": f"Data pribadi terdeteksi: {len(r['replacements'])} item",
                })

        # Build summary
        if is_safe and not all_violations:
            summary = "✅ Input aman — tidak ada masalah terdeteksi."
        elif is_safe and all_violations:
            summary = "⚠️ Input diterima dengan peringatan."
        else:
            violation_types = list(set(v["type"] for v in all_violations if v.get("severity") != "warning"))
            summary = f"⛔ Input diblokir — {', '.join(violation_types)}"

        # Check for self-harm (special handling)
        if "topic" in results and results["topic"].get("is_self_harm"):
            summary += "\n\n" + results["topic"]["help_message"]

        return {
            "safe": is_safe,
            "input": text,
            "sanitized_input": sanitized,
            "violations": all_violations,
            "guard_results": results,
            "summary": summary,
        }

    def check_output(self, output_text: str, input_text: str = "") -> Dict:
        """
        Cek keamanan output dari AI.

        Returns:
            {
                "safe": bool,
                "output": str,
                "sanitized_output": str,
                "violations": list,
                "guard_results": dict,
                "summary": str,
            }
        """
        results = {}
        all_violations = []
        is_safe = True

        # Check toxic content in output
        if "toxic" in self.guards:
            r = self.guards["toxic"].check(output_text)
            results["toxic"] = r
            if not r["safe"]:
                is_safe = False
                all_violations.extend(r["violations"])

        # PII scrubbing in output (important!)
        sanitized = output_text
        if "pii" in self.guards:
            r = self.guards["pii"].scrub(output_text)
            results["pii"] = {
                "has_pii": r["pii_found"],
                "replacements": r["replacements"],
            }
            sanitized = r["scrubbed"]
            if r["pii_found"]:
                all_violations.append({
                    "type": "pii_in_output",
                    "severity": "high",
                    "detail": f"PII ditemukan dalam output: {len(r['replacements'])} item — otomatis di-mask",
                })

        # Relevance check (basic)
        if input_text and output_text:
            input_words = set(input_text.lower().split())
            output_words = set(output_text.lower().split())
            overlap = len(input_words & output_words)
            if len(input_words) > 3 and overlap == 0:
                all_violations.append({
                    "type": "low_relevance",
                    "severity": "warning",
                    "detail": "Output mungkin tidak relevan dengan input",
                })

        # Length check
        if len(output_text) > 5000:
            all_violations.append({
                "type": "output_too_long",
                "severity": "warning",
                "detail": f"Output terlalu panjang ({len(output_text)} chars)",
            })
        elif len(output_text.strip()) < 5:
            all_violations.append({
                "type": "output_too_short",
                "severity": "warning",
                "detail": "Output terlalu pendek",
            })

        # Summary
        if is_safe and not all_violations:
            summary = "✅ Output aman."
        elif is_safe:
            summary = "⚠️ Output diterima dengan peringatan."
        else:
            summary = "⛔ Output memiliki masalah keamanan."

        return {
            "safe": is_safe,
            "output": output_text,
            "sanitized_output": sanitized,
            "violations": all_violations,
            "guard_results": results,
            "summary": summary,
        }

    def run(self, input_text: str, output_text: str = "") -> Dict:
        """
        Full pipeline: cek input + output sekaligus.
        """
        input_result = self.check_input(input_text)

        output_result = None
        if output_text:
            output_result = self.check_output(output_text, input_text)

        overall_safe = input_result["safe"]
        if output_result:
            overall_safe = overall_safe and output_result["safe"]

        return {
            "safe": overall_safe,
            "input_check": input_result,
            "output_check": output_result,
        }