|
| 1 | +from typing import NamedTuple, Dict, Set |
| 2 | +import spacy |
| 3 | +from .patterns import PATTERNS |
| 4 | +from .transforms import generalize_money, generalize_dates |
| 5 | +from .profile_config import PROFILES, DEFAULT_PROFILE |
| 6 | +from utils.log import logger |
| 7 | + |
| 8 | +class Entity(NamedTuple): |
| 9 | + text: str |
| 10 | + type: str |
| 11 | + start: int |
| 12 | + end: int |
| 13 | + |
| 14 | +# Ładowanie modelu spaCy |
| 15 | +try: |
| 16 | + nlp = spacy.load("pl_core_news_lg") |
| 17 | +except OSError: |
| 18 | + logger.error("Nie znaleziono modelu 'pl_core_news_lg'. Uruchom:\npython -m spacy download pl_core_news_lg") |
| 19 | + nlp = None |
| 20 | + |
| 21 | +def safe_substitute(text: str, substitution_dict: dict) -> str: |
| 22 | + sorted_keys = sorted(substitution_dict.keys(), key=len, reverse=True) |
| 23 | + for original in sorted_keys: |
| 24 | + replacement = substitution_dict[original] |
| 25 | + text = text.replace(original, replacement) |
| 26 | + return text |
| 27 | + |
| 28 | +def anonymize_text(text: str, profile: str = DEFAULT_PROFILE, custom_classes: list[str] = None) -> tuple[str, dict]: |
| 29 | + profile_config = PROFILES.get(profile, PROFILES[DEFAULT_PROFILE]) |
| 30 | + enabled_classes = custom_classes if custom_classes is not None else profile_config["classes"] |
| 31 | + transformations = profile_config.get("transformations", {}) |
| 32 | + |
| 33 | + all_entities: list[Entity] = [] |
| 34 | + |
| 35 | + # Krok 1: Znajdź wszystkie encje z REGEX i zapisz ich pozycje |
| 36 | + logger.debug("--- Regex Entity Recognition ---") |
| 37 | + matched_pos: Set[range] = set() |
| 38 | + |
| 39 | + for class_name, pattern in PATTERNS: |
| 40 | + for match in pattern.finditer(text): |
| 41 | + start, end = match.span() |
| 42 | + # Sprawdź, czy zakres się nie nakłada |
| 43 | + if not any(start in r or end - 1 in r for r in matched_pos): |
| 44 | + entity = Entity(match.group(0), class_name, start, end) |
| 45 | + all_entities.append(entity) |
| 46 | + matched_pos.add(range(start, end)) |
| 47 | + logger.debug(f"Regex found: {entity}") |
| 48 | + |
| 49 | + # Krok 2: Znajdź encje z NER, ignorując te, które nakładają się na regex |
| 50 | + logger.debug("--- NER Entity Recognition ---") |
| 51 | + if nlp: |
| 52 | + doc = nlp(text) |
| 53 | + ner_map = {"persName": "PERSON", "placeName": "LOCATION", "orgName": "ORGANIZATION"} |
| 54 | + for ent in doc.ents: |
| 55 | + start, end = ent.start_char, ent.end_char |
| 56 | + if not any(start in r or end - 1 in r for r in matched_pos): |
| 57 | + entity_type = ner_map.get(ent.label_) |
| 58 | + if entity_type: |
| 59 | + entity = Entity(ent.text.strip(), entity_type, start, end) |
| 60 | + all_entities.append(entity) |
| 61 | + matched_pos.add(range(start, end)) |
| 62 | + logger.debug(f"NER found: {entity}") |
| 63 | + |
| 64 | + # Krok 3: Sortuj wszystkie znalezione encje wg pozycji startowej |
| 65 | + all_entities.sort(key=lambda x: x.start) |
| 66 | + logger.debug(f"All entities sorted by position: {[e.text for e in all_entities]}") |
| 67 | + |
| 68 | + # Krok 4: Zbuduj słownik zamian |
| 69 | + substitution_dict: Dict[str, str] = {} |
| 70 | + token_counters: Dict[str, int] = {} |
| 71 | + |
| 72 | + for entity in all_entities: |
| 73 | + if entity.type in enabled_classes: |
| 74 | + transform_type = transformations.get(entity.type, "tokenize") |
| 75 | + |
| 76 | + if transform_type == "tokenize": |
| 77 | + count = token_counters.get(entity.type, 0) |
| 78 | + substitution_dict[entity.text] = f"<{entity.type}_{count}>" |
| 79 | + token_counters[entity.type] = count + 1 |
| 80 | + elif transform_type == "remove": |
| 81 | + substitution_dict[entity.text] = "" |
| 82 | + elif transform_type == "generalize_loc": |
| 83 | + substitution_dict[entity.text] = "[MIASTO WOJEWÓDZKIE]" |
| 84 | + elif transform_type == "generalize": |
| 85 | + if entity.type == "MONEY": |
| 86 | + substitution_dict.update(generalize_money(entity.text)) |
| 87 | + elif entity.type == "DATE": |
| 88 | + substitution_dict.update(generalize_dates(entity.text)) |
| 89 | + |
| 90 | + if profile == 'gdpr': |
| 91 | + # Dodatkowe generalizacje dla GDPR |
| 92 | + temp_text = safe_substitute(text, substitution_dict) |
| 93 | + substitution_dict.update(generalize_dates(temp_text)) |
| 94 | + substitution_dict.update(generalize_money(temp_text)) |
| 95 | + |
| 96 | + anonymized_text = safe_substitute(text, substitution_dict) |
| 97 | + token_map = {v: k for k, v in substitution_dict.items() if v.startswith('<')} |
| 98 | + return anonymized_text, token_map |
0 commit comments