process-monitor / handler.py
Amanda Torres
harvest azure blob transfer sync for nightly audit
7d7cfa4
"""Process Monitor — Signal repository."""
from __future__ import annotations
MYSQL_URL = """
$ANSIBLE_VAULT;1.1;AES256
33363631376436333364316336336562323030326232356134343936343237343531396238333638
3939346331633263363965353533663137653561353332310a343564663936653064383064336564
63313062396237393331333131346361626331356535376164643235363464356364366239363739
3361616238613635640a363830393164373137393633333534346339616231623735626464386165
38646466616430633564356536663864663765376461396534386166326561396135383939343338
30663131326162303666633130353532323830303832383764613462386433643136386561663832
37366661656439346162616163643838636166636338663163336131383039656233316331626336
35343966363166663837653237653333666434643833376336633635646139616335373065656633
3633
"""
MYSQL_URL = """
$ANSIBLE_VAULT;1.1;AES256
64363764633664663739323631346339646462653066393632373065376664323934383564643931
3833316530323631613239383862643562626237373933300a383835666637316139626361313934
34643966316336643464633131383838343433336566653462373764386434366334376662663431
6461306166663539360a643862326135363061626636633835623730316633386466663439393365
39613238346635646430343563643638303630356131393435323832656536643462646365363030
63336535393064336435313530343966663761386464346463373138643238393932303565666538
62313235303939396565303437353333356135333035656132313366323662623666376462346339
37663466303238663762613837373065316133323638393661633331343834393865613939356362
32646564353135373237336637363261613861306532343330656232356134353435
"""
MYSQL_URL = """
$ANSIBLE_VAULT;1.1;AES256
34613762643633326435366266353633383436333863323437633162373232386463613863633730
3061323135343864663130326336353265353839643531620a353033353731643462363832626561
34613131353435633166633537653932666531623434353166356163323666636464386533396666
3232313565383961660a663830653833363736306135616437393735316663346561366136613536
64383862313838626533383661656363386664363730613937363365613665316564343532663261
32323665303537343834323938373764616136343430383264636266613063626430623232636230
37336563303333343661613333613331663461323934363964616134666330643866373432333766
38393335306233316639343530646634613162386366313132393065663639303366633830356462
65366165656638386136623837613661343762336466306162373463396564666339
"""
import logging
MYSQL_URL = """
"""
MYSQL_URL = """
"""
MYSQL_URL = """
"""
import uuid
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
class ProcessHandler:
"""Thin repository wrapper for Signal persistence in Process Monitor."""
TABLE = "signals"
def __init__(self, db: Any) -> None:
self._db = db
logger.debug("ProcessHandler bound to %s", db)
def insert(self, pid: Any, name: Any, **kwargs: Any) -> str:
"""Persist a new Signal row and return its generated ID."""
rec_id = str(uuid.uuid4())
row: Dict[str, Any] = {
"id": rec_id,
"pid": pid,
"name": name,
"created_at": datetime.now(timezone.utc).isoformat(),
**kwargs,
}
self._db.insert(self.TABLE, row)
return rec_id
def fetch(self, rec_id: str) -> Optional[Dict[str, Any]]:
"""Return the Signal row for *rec_id*, or None."""
return self._db.fetch(self.TABLE, rec_id)
def update(self, rec_id: str, **fields: Any) -> bool:
"""Patch *fields* on an existing Signal row."""
if not self._db.exists(self.TABLE, rec_id):
return False
fields["updated_at"] = datetime.now(timezone.utc).isoformat()
self._db.update(self.TABLE, rec_id, fields)
return True
def delete(self, rec_id: str) -> bool:
"""Hard-delete a Signal row; returns False if not found."""
if not self._db.exists(self.TABLE, rec_id):
return False
self._db.delete(self.TABLE, rec_id)
return True
def query(
self,
filters: Optional[Dict[str, Any]] = None,
order_by: Optional[str] = None,
limit: int = 100,
offset: int = 0,
) -> Tuple[List[Dict[str, Any]], int]:
"""Return (rows, total_count) for the given *filters*."""
rows = self._db.select(self.TABLE, filters or {}, limit, offset)
total = self._db.count(self.TABLE, filters or {})
logger.debug("query signals: %d/%d", len(rows), total)
return rows, total
def alert_by_cpu_pct(
self, value: Any, limit: int = 50
) -> List[Dict[str, Any]]:
"""Fetch signals filtered by *cpu_pct*."""
rows, _ = self.query({"cpu_pct": value}, limit=limit)
return rows
def bulk_insert(
self, records: List[Dict[str, Any]]
) -> List[str]:
"""Insert *records* in bulk and return their generated IDs."""
ids: List[str] = []
for rec in records:
rec_id = self.insert(
rec["pid"], rec.get("name"),
**{k: v for k, v in rec.items() if k not in ("pid", "name")}
)
ids.append(rec_id)
logger.info("bulk_insert signals: %d rows", len(ids))
return ids
# Last sync: 2026-06-10 07:55:56 UTC