Compare commits

...

18 Commits

Author SHA1 Message Date
Marcel
2e59c0ef5b chore(normalizer): unignore canonical-persons-tree.json from out/ exclusion
All checks were successful
CI / Unit & Component Tests (pull_request) Successful in 3m33s
CI / OCR Service Tests (pull_request) Successful in 22s
CI / Backend Unit Tests (pull_request) Successful in 3m42s
CI / fail2ban Regex (pull_request) Successful in 47s
CI / Semgrep Security Scan (pull_request) Successful in 21s
CI / Compose Bucket Idempotency (pull_request) Successful in 1m3s
2026-05-25 21:19:02 +02:00
Marcel
309436b9a4 feat(normalizer): generate canonical-persons-tree.json from Personendatei 2.xlsx
157 persons, 43 relationships (29 SPOUSE_OF + 14 PARENT_OF), 89 unresolved references.
6 duplicate rows skipped (Seils family block + Christa Schütz).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-25 21:18:24 +02:00
Marcel
e326630318 feat(normalizer): add main() CLI to persons_tree
Wires the two-pass pipeline (parse → deduplicate → index → resolve)
into a runnable CLI with --input, --output, and --dry-run flags.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-25 21:16:21 +02:00
Marcel
34c40cb0ee fix(normalizer): preserve trailing Bemerkung text after parent pattern
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-25 21:12:45 +02:00
Marcel
ace41ad209 fix(normalizer): remove unauthorized first-name index key from _build_index
Remove the 5th unauthorized index key (_norm_tree(first)) from _build_index.
The spec requires exactly 4 keys per person:
1. forward (first last)
2. reversed (last first)
3. maiden name (first maiden) if maiden set
4. lastName only (last)

Update test data to use full names in Bemerkung fields (e.g., 'Clara Cram'
instead of 'Clara') since single first names alone are no longer resolvable.
All 52 tests pass.
2026-05-25 21:08:49 +02:00
Marcel
6f55489ec2 feat(normalizer): add PARENT_OF Bemerkung extraction to persons_tree 2026-05-25 21:06:24 +02:00
Marcel
fa4b6b5fc2 feat(normalizer): add SPOUSE_OF resolution to persons_tree 2026-05-25 21:03:46 +02:00
Marcel
1f2351e3c0 feat(normalizer): add _deduplicate() to persons_tree 2026-05-25 21:02:02 +02:00
Marcel
7012234e6a feat(normalizer): add row parser to persons_tree 2026-05-25 20:59:49 +02:00
Marcel
306f3b6fe6 feat(normalizer): add name normalization + lookup index to persons_tree 2026-05-25 20:56:47 +02:00
Marcel
47a0770758 feat(normalizer): add generation parser to persons_tree 2026-05-25 20:54:38 +02:00
Marcel
889d301f16 fix(normalizer): correct _MIN_YEAR comment in test (1700 not 1500) 2026-05-25 20:53:16 +02:00
Marcel
443c7a48db fix(normalizer): don't convert plausible typo years as Excel serials 2026-05-25 20:46:42 +02:00
Marcel
9ae1196d1c feat(normalizer): add persons_tree skeleton + year extraction 2026-05-25 20:41:25 +02:00
Marcel
b37fd1728b docs(importer): add Personendatei importer implementation plan
9-task TDD plan for persons_tree.py — year extraction, name index,
deduplication, SPOUSE_OF/PARENT_OF extraction, CLI + JSON output.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-25 20:38:14 +02:00
Marcel
6103d5d229 docs(importer): resolve open questions in Personendatei importer spec
OQ-01: tool deduplicates rows with identical (firstName, lastName, birthYear)
OQ-02: birthPlace/deathPlace kept as separate JSON fields
OQ-03: multi-name firstName stored verbatim

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-25 20:28:45 +02:00
Marcel
7b483d357a docs(importer): add Personendatei importer design spec
Two-pass Python tool (persons_tree.py) that normalizes import/Personendatei 2.xlsx
into canonical-persons-tree.json with persons, SPOUSE_OF/PARENT_OF relationships,
and an unresolved[] list for manual review.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-25 20:26:30 +02:00
Marcel
94a40237f4 feat(normalizer): generate structured tags from Schlagwort + Inhalt fields
Adds tags.py module implementing a three-outcome heuristic:
- Individual-to-individual correspondence tags ("Clara an Herbert") → dropped
- Group/collective correspondence ("Clara an Kinder", "Walter an Geschwister") → Briefwechsel/<value>
- Semantic/event tags ("Brautbriefe", "Alltag", "zur Hochzeit") → Themen/<value>

Three correspondence patterns detected: space-an-space, starts-with-"an ",
and abbreviated-sender form ("Maria W.an Clara").

COLLECTIVE_TERMS in config.py extended with 17 plural/group relational terms
(söhne, brüder, schwiegereltern, cousinen, etc.) confirmed against the full Excel.

Also adds two-phase summary mining: every run emits review/tag-candidates.csv;
subsequent runs apply keywords from overrides/approved-themes.csv as Themen tags.

Outputs: canonical-documents.xlsx gets pipe-separated "Parent/Child" tag paths;
canonical-tag-tree.xlsx provides the full tag hierarchy for backend pre-import.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-25 19:47:36 +02:00
15 changed files with 5912 additions and 6 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,292 @@
# Personendatei Importer — Design Spec
**Date:** 2026-05-25
**Source file:** `import/Personendatei 2.xlsx`
**Output:** `tools/import-normalizer/out/canonical-persons-tree.json`
**Tool location:** `tools/import-normalizer/persons_tree.py`
---
## 1. Purpose
Normalize the 163-person family register in `Personendatei 2.xlsx` into a machine-readable JSON file that a future backend importer can consume to seed the `persons` and `person_relationships` tables. The tool is offline (no backend required) and produces a reviewable artifact with an explicit `unresolved[]` list for manual follow-up.
---
## 2. Source Data — Column Map
Sheet: `Tabelle1` (rows 2164; row 1 is the header).
| Col | Header | Content | Notes |
|-----|--------|---------|-------|
| A | Generation | `G 0``G 5` | Generation relative to Herbert & Clara Cram (G 2). Inconsistent formatting: `"G3"`, `"G 0"`, `"G 2 de Gruyter"` — strip non-digit chars and parse the integer. |
| B | Familienname | Last name | Sometimes compound: `"de Gruyter"`, `"Cram Heydrich"`, `"Burkhard- Meier"` |
| C | Vorname | First name | Sometimes multiple: `"Charlotte,Meta,Jacobi"`, nicknames in parens: `"Otto (Herbert)"` |
| D | geb als | Maiden name | Used as a name alias for matching |
| E | Geburtsdatum | Birth date | **Mixed types** — see §4 |
| F | Geburtsort | Birth place | Free-text string, stored verbatim |
| G | Todesdatum | Death date | Same mixed types as col E |
| H | Sterbeort | Death place | Free-text string, stored verbatim |
| I | verheiratet mit | Spouse name | Partial name in either `"Firstname Lastname"` or `"Lastname Firstname"` order |
| J | Bemerkung | German relationship notes | `"Sohn v Clara u Herbert"`, `"Nichte v Herbert"`, free text |
---
## 3. Two-Pass Architecture
### Pass 1 — Parse & Normalize (rows → person records)
For each row:
1. Read all 10 columns.
2. Assign a stable `rowId`: `"row_{i:03d}"` where `i` is the 1-based row number (e.g. `row_002`).
3. Normalize fields per §4 and §5.
4. Build the **name-lookup index** (see §6).
5. Emit a person record.
### Pass 2 — Resolve Relationships
Walk every person record:
1. Resolve col I (spouse) → emit `SPOUSE_OF` edge or `unresolved` entry.
2. Parse col J (Bemerkung) for parent/child patterns → emit `PARENT_OF` edges or `unresolved` entries.
3. Append unmatched Bemerkung text to `person.notes`.
---
## 4. Date Parsing
Both col E (birth) and col G (death) arrive as either an Excel numeric serial or a string.
### Excel serial conversion
When the cell value is an integer (or a float with no string representation):
```
date = datetime(1899, 12, 30) + timedelta(days=int(value))
year = date.year
```
Excel's epoch is 1899-12-30 (accounts for the Lotus 1-2-3 leap-year bug).
### String fallback — reuse existing `dates.parse_date()`
Pass the raw string to the existing `tools/import-normalizer/dates.parse_date()`. It already handles:
- `DD.MM.YYYY` and `D.M.YY`
- Year-only (`1930`)
- Month + year (`August 1941`, `Sept. 1913`)
- Partial/approximate markers
Extract `.year` from the returned `ParsedDate.iso` if `iso` is not `None`.
### Unresolvable dates
If both paths yield `None` (e.g. `"2.9.196"`, `"4.3.1023"`, `".12.1955"`):
- Set `birthYear`/`deathYear` to `null`.
- Append the raw value to `person.notes` as `"[Geburtsdatum: <raw>]"` or `"[Todesdatum: <raw>]"` for human review.
---
## 5. Person Record Normalization
### Name fields
- **lastName** = col B, stripped.
- **firstName** = col C. Keep as-is (including multi-name strings and parenthetical nicknames) — the backend can split later.
- **maidenName** = col D, stripped. Stored in the JSON; the backend maps this to a `PersonNameAlias` of type `BIRTH_NAME`.
- **alias** = `null` (the tool does not invent aliases; maiden name is the alias).
### Generation
Extract the first digit sequence from col A:
```python
import re
m = re.search(r"\d+", raw_generation)
generation = int(m.group()) if m else None
```
Handles all observed variants: `"G 3"`, `"G3"`, `"G 0"`, `"G 2 de Gruyter"`, `"G 0"`.
Stored as `generation: int | null` in the JSON (informational; not mapped to a backend field directly).
### familyMember
Set `true` for all records. Every person in this register is part of the family network. The backend can refine this.
### notes
Constructed by concatenation:
1. Unmatched Bemerkung text (after relationship pattern is stripped).
2. Unresolvable date raw values (prefixed with field name).
---
## 6. Name Lookup Index
After pass 1, build a `dict[str, list[str]]` mapping normalized name keys → list of `rowId`s.
### Normalization function `_norm(s) -> str`
1. Lowercase.
2. Strip surrounding `"` and `'`.
3. Remove parenthetical substrings: `r"\([^)]*\)"`.
4. Collapse internal whitespace.
5. Strip geographic/honorific suffixes: `aachen`, `mex.`, `mexiko`, `sen`, `jun`, `jr`.
6. Strip trailing commas, dots.
### Keys indexed per person
For a person with firstName `F`, lastName `L`, maidenName `M`:
- `_norm(f"{F} {L}")` — canonical order
- `_norm(f"{L} {F}")` — reversed order (col I uses this heavily)
- `_norm(f"{F} {M}")` if maidenName is set — maiden-name reference
- `_norm(L)` alone — single-token fallback
### Match resolution
Given a raw name string from col I or col J:
1. `_norm(raw)` → look up in index.
2. **Exactly one hit** → match confirmed, use that `rowId`.
3. **Zero hits**`reason: "not_found"``unresolved[]`.
4. **Multiple hits**`reason: "ambiguous"``unresolved[]`.
---
## 7. Relationship Extraction
### 7.1 SPOUSE_OF (col I — `verheiratet mit`)
1. Normalize col I value.
2. Resolve via name index (§6).
3. If matched: emit one edge `{ personId, relatedPersonId, type: "SPOUSE_OF", source: "verheiratet_mit" }`.
- Skip if an identical edge (regardless of direction) already exists in the relationship list.
4. If unresolved: add to `unresolved[]`.
### 7.2 PARENT_OF (col J — `Bemerkung`)
Apply these regex patterns in order, case-insensitive, with optional whitespace:
| Pattern | Direction | Note |
|---------|-----------|------|
| `(Sohn\|Tochter)\s+v(?:on)?\s+(.+)` | Named person(s) → this person | "Sohn v Clara u Herbert" |
| `(Vater\|Mutter)\s+v(?:on)?\s+(.+)` | This person → named person(s) | "Vater v Herbert" |
**Multi-parent extraction:** The parent string may contain two parents joined by `\s+u(?:nd)?\s+`. Split on this pattern, resolve each part independently.
**Emit** one `PARENT_OF` edge per resolved parent:
```json
{
"personId": "<parent_rowId>",
"relatedPersonId": "<child_rowId>",
"type": "PARENT_OF",
"source": "bemerkung",
"rawBemerkung": "<original col J value>"
}
```
**Skip** (do not emit, do not add to `unresolved[]`, leave in notes):
- Patterns starting with `Neffe`, `Nichte`, `Enkel`, `Enkelin`, `Urenkel`, `Urenkelin` — too indirect.
- Patterns starting with `Bruder`, `Schwester` — SIBLING_OF is out of scope for this tool.
- Any other Bemerkung text that does not match the parent patterns.
**After extraction:** the matched portion of the Bemerkung is removed; the remainder goes into `person.notes`.
---
## 8. Output JSON Schema
File: `tools/import-normalizer/out/canonical-persons-tree.json`
```json
{
"generated_at": "<ISO-8601 timestamp>",
"source": "Personendatei 2.xlsx",
"stats": {
"persons": 163,
"relationships": 87,
"unresolved": 12
},
"persons": [
{
"rowId": "row_002",
"firstName": "Elsgard",
"lastName": "Allemeyer",
"maidenName": "Wöhler",
"alias": null,
"notes": "Nichte von Herbert",
"birthYear": 1920,
"deathYear": 1999,
"birthPlace": "Garz",
"deathPlace": "Espelkamp",
"generation": 3,
"familyMember": true
}
],
"relationships": [
{
"personId": "row_002",
"relatedPersonId": "row_003",
"type": "SPOUSE_OF",
"source": "verheiratet_mit"
},
{
"personId": "row_019",
"relatedPersonId": "row_021",
"type": "PARENT_OF",
"source": "bemerkung",
"rawBemerkung": "Tochter v Clara u Herbert"
}
],
"unresolved": [
{
"rowId": "row_007",
"field": "verheiratet_mit",
"raw": "\"Tante Lolly\"",
"reason": "not_found"
},
{
"rowId": "row_042",
"field": "bemerkung",
"raw": "Zwillingsbruder v Herbert",
"reason": "not_found"
}
]
}
```
---
## 9. CLI Interface
```
python3 persons_tree.py [--input PATH] [--output PATH] [--dry-run]
```
| Flag | Default | Description |
|------|---------|-------------|
| `--input` | `../../import/Personendatei 2.xlsx` | Source Excel file |
| `--output` | `out/canonical-persons-tree.json` | Output JSON file |
| `--dry-run` | off | Print stats + first 5 unresolved entries; do not write file |
On success, print:
```
✓ 163 persons parsed
✓ 87 relationships emitted (52 SPOUSE_OF, 35 PARENT_OF)
⚠ 12 unresolved (see unresolved[] in output)
→ out/canonical-persons-tree.json
```
---
## 10. Module Reuse
| Existing module | What we reuse |
|-----------------|---------------|
| `dates.parse_date()` | String date parsing — handles DD.MM.YYYY, year-only, month+year, approximate markers |
| `config.MONTHS` | Month name → integer mapping (German + Spanish month names already present) |
The Excel serial conversion is new logic added directly in `persons_tree.py` (3 lines).
---
## 11. What This Tool Does NOT Do
- Does not call the backend API or touch the database.
- Does not create `PersonNameAlias` records — it emits `maidenName` as a field; the future backend importer maps it.
- Does not infer SIBLING_OF edges (requires symmetric lookup across multiple rows — deferred).
- Does not deduplicate persons that appear in both this file and `canonical-persons.xlsx` — deduplication is the backend importer's responsibility.
- Does produce `birthPlace` / `deathPlace` as top-level fields in the JSON (see §8) — they are free-text strings and informational only. The `Person` entity has no corresponding columns; the future backend importer decides whether to add columns or fold the values into `notes`.
---
## 12. Resolved Decisions
| OQ | Question | Decision |
|----|----------|----------|
| OQ-01 | Duplicate rows (127/138 — Christa Schütz; 129/139 — Christoph Seils). | **Tool deduplicates.** On pass 1, after building the person list, detect rows with identical `(firstName, lastName, birthYear)` and keep only the first occurrence. Log skipped row ids to stdout. |
| OQ-02 | `birthPlace` / `deathPlace` absent from `Person` entity. | **Keep as separate top-level fields** in the JSON (`birthPlace`, `deathPlace`). The future backend importer may add columns to the `persons` table; the field is preserved here to avoid data loss. |
| OQ-03 | `firstName` = `"Charlotte,Meta,Jacobi"` (multi-name comma string). | **Store verbatim as `firstName`.** No splitting. |

View File

@@ -1,5 +1,6 @@
.venv/ .venv/
out/ out/
!out/canonical-persons-tree.json
review/ review/
__pycache__/ __pycache__/
*.pyc *.pyc

View File

@@ -116,6 +116,10 @@ RELATIONAL_TERMS = {
COLLECTIVE_TERMS = { COLLECTIVE_TERMS = {
"familie", "fam", "kinder", "eltern", "geschwister", "großeltern", "familie", "fam", "kinder", "eltern", "geschwister", "großeltern",
"grosseltern", "alle", "diverse", "div", "gebrüder", "gebr", "grosseltern", "alle", "diverse", "div", "gebrüder", "gebr",
# Plural/group relational terms — added for tag generation heuristic
"söhne", "töchter", "brüder", "schwestern", "schwiegereltern",
"vettern", "kusinen", "cousinen", "nichten", "neffen", "tanten",
"freunde", "bekannte", "geschw", "enkelkinder", "jungens", "verwandten",
} }
# Markers of an unknown/illegible name (the literal "?" is handled separately in code). # Markers of an unknown/illegible name (the literal "?" is handled separately in code).
# All long enough to be safe as SUBSTRING matches — do NOT add short tokens like "nn" # All long enough to be safe as SUBSTRING matches — do NOT add short tokens like "nn"

View File

@@ -3,6 +3,7 @@ from dataclasses import dataclass, field
from enum import Enum, auto from enum import Enum, auto
import dates as _dates import dates as _dates
import tags as _tags
class Triage(Enum): class Triage(Enum):
@@ -88,7 +89,7 @@ def index_file_mismatch(index: str, file_path: str) -> bool:
return stem != index return stem != index
def to_canonical(raw, ctx, date_overrides: dict) -> CanonicalDocument: def to_canonical(raw, ctx, date_overrides: dict, approved_themes: frozenset = frozenset()) -> CanonicalDocument:
pd = _dates.parse_date(raw.date, date_overrides) pd = _dates.parse_date(raw.date, date_overrides)
flags = [] flags = []
@@ -113,6 +114,6 @@ def to_canonical(raw, ctx, date_overrides: dict) -> CanonicalDocument:
receiver_person_ids=[r[0] for r in receivers], receiver_person_ids=[r[0] for r in receivers],
receiver_names=[r[1] for r in receivers], receiver_names=[r[1] for r in receivers],
date_iso=pd.iso or "", date_raw=raw.date, date_precision=str(pd.precision), date_iso=pd.iso or "", date_raw=raw.date, date_precision=str(pd.precision),
location=raw.location, tags=[raw.tags] if raw.tags else [], summary=raw.summary, location=raw.location, tags=_tags.generate_tags(raw.tags, raw.summary, approved_themes), summary=raw.summary,
source_row=raw.source_row, needs_review=flags, source_row=raw.source_row, needs_review=flags,
) )

View File

@@ -8,13 +8,17 @@ import ingest
import persons import persons
import documents import documents
import overrides as overrides_mod import overrides as overrides_mod
import tags as _tags
import writers import writers
def run(*, document_workbook, document_sheet, person_workbook, person_sheet, def run(*, document_workbook, document_sheet, person_workbook, person_sheet,
out_dir, review_dir, date_overrides, name_overrides) -> dict: out_dir, review_dir, date_overrides, name_overrides,
approved_themes_path=None) -> dict:
out_dir, review_dir = Path(out_dir), Path(review_dir) out_dir, review_dir = Path(out_dir), Path(review_dir)
approved_themes = _tags.load_approved_themes(Path(approved_themes_path)) if approved_themes_path else set()
# --- persons --- # --- persons ---
person_rows = ingest.read_sheet(person_workbook, person_sheet) person_rows = ingest.read_sheet(person_workbook, person_sheet)
p_fields, _ = ingest.build_header_map(person_rows[0], config.PERSON_HEADER_MAP, config.PERSON_REQUIRED_FIELDS) p_fields, _ = ingest.build_header_map(person_rows[0], config.PERSON_HEADER_MAP, config.PERSON_REQUIRED_FIELDS)
@@ -52,7 +56,7 @@ def run(*, document_workbook, document_sheet, person_workbook, person_sheet,
seen_index[raw.index] += 1 seen_index[raw.index] += 1
if raw.date.strip() and raw.date.strip() in date_overrides: if raw.date.strip() and raw.date.strip() in date_overrides:
dates_by_override += 1 dates_by_override += 1
doc = documents.to_canonical(raw, ctx, date_overrides) doc = documents.to_canonical(raw, ctx, date_overrides, frozenset(approved_themes))
if "unparsed_date" in doc.needs_review: if "unparsed_date" in doc.needs_review:
unparsed_by_raw.setdefault(raw.date, []).append(source_row) unparsed_by_raw.setdefault(raw.date, []).append(source_row)
if "index_file_mismatch" in doc.needs_review: if "index_file_mismatch" in doc.needs_review:
@@ -74,6 +78,9 @@ def run(*, document_workbook, document_sheet, person_workbook, person_sheet,
writers.write_documents_xlsx(canon_docs, out_dir / "canonical-documents.xlsx") writers.write_documents_xlsx(canon_docs, out_dir / "canonical-documents.xlsx")
writers.write_persons_xlsx(all_people, out_dir / "canonical-persons.xlsx") writers.write_persons_xlsx(all_people, out_dir / "canonical-persons.xlsx")
all_tag_paths = [path for doc in canon_docs for path in doc.tags]
writers.write_tag_tree_xlsx(_tags.build_tag_tree(all_tag_paths), out_dir / "canonical-tag-tree.xlsx")
# --- review files --- # --- review files ---
# unparsed dates: most-frequent first, with example source rows + blank override cells so a # unparsed dates: most-frequent first, with example source rows + blank override cells so a
# corrected row can be pasted straight into overrides/dates.csv (same raw,iso,precision shape). # corrected row can be pasted straight into overrides/dates.csv (same raw,iso,precision shape).
@@ -97,6 +104,11 @@ def run(*, document_workbook, document_sheet, person_workbook, person_sheet,
["category", "raw", "count", "example_rows"], unresolved_rows) ["category", "raw", "count", "example_rows"], unresolved_rows)
writers.write_review_csv(review_dir / "index-file-mismatch.csv", ["source_row", "index", "file"], mismatches) writers.write_review_csv(review_dir / "index-file-mismatch.csv", ["source_row", "index", "file"], mismatches)
all_summaries = [doc.summary for doc in canon_docs if doc.summary]
candidates = _tags.mine_summary_candidates(all_summaries)
writers.write_review_csv(review_dir / "tag-candidates.csv", ["candidate", "count"],
[[c, n] for c, n in candidates])
dated = sum(1 for d in canon_docs if d.date_raw.strip()) dated = sum(1 for d in canon_docs if d.date_raw.strip())
unknown = sum(1 for d in canon_docs if d.date_raw.strip() and d.date_precision == "UNKNOWN") unknown = sum(1 for d in canon_docs if d.date_raw.strip() and d.date_precision == "UNKNOWN")
unknown_rate = f"{(100 * unknown / dated):.1f}%" if dated else "0.0%" unknown_rate = f"{(100 * unknown / dated):.1f}%" if dated else "0.0%"
@@ -148,7 +160,8 @@ def main():
document_workbook=config.DOCUMENT_WORKBOOK, document_sheet=config.DOCUMENT_SHEET, document_workbook=config.DOCUMENT_WORKBOOK, document_sheet=config.DOCUMENT_SHEET,
person_workbook=config.PERSON_WORKBOOK, person_sheet=config.PERSON_SHEET, person_workbook=config.PERSON_WORKBOOK, person_sheet=config.PERSON_SHEET,
out_dir=config.OUT_DIR, review_dir=config.REVIEW_DIR, out_dir=config.OUT_DIR, review_dir=config.REVIEW_DIR,
date_overrides=date_overrides, name_overrides=name_overrides) date_overrides=date_overrides, name_overrides=name_overrides,
approved_themes_path=config.OVERRIDES_DIR / "approved-themes.csv")
print("Normalization complete:") print("Normalization complete:")
for k, v in stats.items(): for k, v in stats.items():
print(f" {k}: {v}") print(f" {k}: {v}")

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1 @@
candidate
1 candidate

View File

@@ -0,0 +1,409 @@
"""Normalize Personendatei 2.xlsx into canonical-persons-tree.json."""
import argparse
import datetime
import json
import re
import sys
from pathlib import Path
import config
import dates
from persons import _strip_accents
_MIN_YEAR = 1700
_MAX_YEAR = 2100
# Threshold: if parse_date parses a pure-digit string as a year outside [_MIN_YEAR, _MAX_YEAR],
# but the year is a plausible typo (1000-3000), don't try serial conversion.
# Years outside this range (e.g., 7568) are implausible and should try serial conversion.
_PLAUSIBLE_TYPO_MIN = 1000
_PLAUSIBLE_TYPO_MAX = 3000
def _parse_year(raw: str | None) -> int | None:
"""Extract a birth/death year from an Excel cell string.
Handles three cases:
1. ISO / German / text string parseable by parse_date() → extract year if in range
2. Pure-integer string (out-of-range or unparseable) → try Excel serial conversion
(unless it's a plausible typo year, e.g., "1023" for "1923")
3. Mixed-format or unresolvable → None
Serial conversion only fires for pure-digit strings and implausible years,
preventing typo years like "1023" from being mis-converted as serials.
"""
if raw is None:
return None
s = str(raw).strip()
if not s:
return None
# Check if it's a pure-digit string (candidate for serial conversion)
is_pure_digit = re.fullmatch(r"\d+", s) is not None
# Try parse_date first (handles ISO, DD.MM.YYYY, year-only, month+year, etc.)
result = dates.parse_date(s)
if result.iso:
year = int(result.iso[:4])
if _MIN_YEAR <= year <= _MAX_YEAR:
return year
# Year is out of range. Only try serial conversion if it's an implausible year.
# Plausible typos (e.g., 1023 for 1923) should not be converted as serials.
if is_pure_digit and not (_PLAUSIBLE_TYPO_MIN <= year <= _PLAUSIBLE_TYPO_MAX):
n = int(s)
if 1 <= n <= 80_000:
d = datetime.date(1899, 12, 30) + datetime.timedelta(days=n)
if _MIN_YEAR <= d.year <= _MAX_YEAR:
return d.year
return None
# parse_date() found nothing. Try serial conversion only for pure-digit strings.
if is_pure_digit:
n = int(s)
if 1 <= n <= 80_000:
d = datetime.date(1899, 12, 30) + datetime.timedelta(days=n)
if _MIN_YEAR <= d.year <= _MAX_YEAR:
return d.year
return None
def _parse_generation(raw: str | None) -> int | None:
"""Extract the generation integer from column A values like 'G 3', 'G3', 'G 0'."""
if not raw:
return None
m = re.search(r"\d+", str(raw))
return int(m.group()) if m else None
_GEO_SUFFIXES = {"aachen", "mex", "mexiko", "sen", "jun", "jr"}
def _norm_tree(s: str) -> str:
"""Normalize a name string for tree matching.
- Strip surrounding quotes, remove parenthetical substrings
- Diacritic → ASCII (ä→ae etc.), lowercase, dots → spaces
- Remove known geographic/honorific suffix tokens
- Collapse whitespace
"""
s = (s or "").strip().strip("\"'")
s = re.sub(r"\([^)]*\)", "", s)
s = _strip_accents(s).lower().replace(".", " ")
tokens = [t for t in s.split() if t and t not in _GEO_SUFFIXES]
return " ".join(tokens).strip("., ")
def _build_index(persons: list[dict]) -> dict[str, list[str]]:
"""Build a name → [rowId, …] lookup index with four keys per person."""
index: dict[str, list[str]] = {}
def _add(key: str, row_id: str) -> None:
if key:
index.setdefault(key, []).append(row_id)
for p in persons:
row_id = p["rowId"]
first = p.get("firstName") or ""
last = p.get("lastName") or ""
maiden = p.get("maidenName") or ""
_add(_norm_tree(f"{first} {last}"), row_id)
_add(_norm_tree(f"{last} {first}"), row_id)
if maiden:
_add(_norm_tree(f"{first} {maiden}"), row_id)
_add(_norm_tree(last), row_id)
return index
def _resolve_one(raw: str, index: dict[str, list[str]]) -> tuple[str | None, str | None]:
"""Return (row_id, None) on unique match, (None, reason) otherwise."""
key = _norm_tree(raw)
if not key:
return None, "empty"
hits = index.get(key, [])
if len(hits) == 1:
return hits[0], None
if len(hits) == 0:
return None, "not_found"
return None, "ambiguous"
def _parse_row(row_num: int, fields: dict) -> dict:
"""Produce one person record from a header-mapped row dict.
Internal keys prefixed with '_' are stripped before JSON output in main().
"""
def s(key: str) -> str:
return (fields.get(key) or "").strip()
birth_raw = s("birth_date")
death_raw = s("death_date")
birth_year = _parse_year(birth_raw)
death_year = _parse_year(death_raw)
notes_parts = []
if birth_raw and birth_year is None:
notes_parts.append(f"[Geburtsdatum: {birth_raw}]")
if death_raw and death_year is None:
notes_parts.append(f"[Todesdatum: {death_raw}]")
bemerkung = s("notes")
if bemerkung:
notes_parts.append(bemerkung)
maiden = s("maiden_name") or None
spouse = s("spouse") or None
bemerkung_out = bemerkung or None
return {
"rowId": f"row_{row_num:03d}",
"firstName": s("first_name"),
"lastName": s("last_name"),
"maidenName": maiden,
"alias": None,
"notes": " ".join(notes_parts) or None,
"birthYear": birth_year,
"deathYear": death_year,
"birthPlace": s("birth_place") or None,
"deathPlace": s("death_place") or None,
"generation": _parse_generation(s("generation")),
"familyMember": True,
"_spouse_raw": spouse,
"_bemerkung_raw": bemerkung_out,
}
def _deduplicate(persons: list[dict]) -> tuple[list[dict], list[str]]:
"""Remove duplicate rows. Two-stage:
1. Exact (firstName, lastName, birthYear) match.
2. (firstName, lastName) where the later entry has birthYear=None and an earlier
entry already has a known birthYear.
"""
seen_full: dict[tuple, str] = {} # (first, last, year) -> rowId
seen_name: dict[tuple, str] = {} # (first, last) -> rowId of first entry with a year
result: list[dict] = []
skipped: list[str] = []
for p in persons:
first, last, year = p["firstName"], p["lastName"], p["birthYear"]
key_full = (first, last, year)
key_name = (first, last)
if key_full in seen_full:
skipped.append(f"{p['rowId']} duplicates {seen_full[key_full]} ({first} {last}, year={year})")
continue
if year is None and key_name in seen_name:
skipped.append(f"{p['rowId']} duplicates {seen_name[key_name]} ({first} {last}, no birth year)")
continue
seen_full[key_full] = p["rowId"]
if year is not None:
seen_name[key_name] = p["rowId"]
result.append(p)
return result, skipped
def _resolve_spouses(
persons: list[dict], index: dict[str, list[str]]
) -> tuple[list[dict], list[dict]]:
"""Emit SPOUSE_OF edges from each person's _spouse_raw field."""
relationships: list[dict] = []
unresolved: list[dict] = []
emitted: set[frozenset] = set()
for p in persons:
raw = (p.get("_spouse_raw") or "").strip()
if not raw:
continue
row_id = p["rowId"]
matched_id, reason = _resolve_one(raw, index)
if matched_id:
edge = frozenset([row_id, matched_id])
if edge not in emitted:
emitted.add(edge)
relationships.append({
"personId": row_id,
"relatedPersonId": matched_id,
"type": "SPOUSE_OF",
"source": "verheiratet_mit",
})
else:
unresolved.append({
"rowId": row_id,
"field": "verheiratet_mit",
"raw": raw,
"reason": reason,
})
return relationships, unresolved
_CHILD_RE = re.compile(r"^(?:Sohn|Tochter)\s+v(?:on)?\s+(.+)", re.I)
_PARENT_RE = re.compile(r"^(?:Vater|Mutter)\s+v(?:on)?\s+(.+)", re.I)
_AND_RE = re.compile(r"\s+u(?:nd)?\s+", re.I)
def _parse_bemerkung(
row_id: str, bemerkung: str, index: dict[str, list[str]]
) -> tuple[list[dict], list[dict], str]:
"""Extract PARENT_OF edges from a Bemerkung cell.
Returns (relationships, unresolved, remaining_notes).
Text that doesn't match a parent pattern goes to remaining_notes unchanged.
"""
if not bemerkung or not bemerkung.strip():
return [], [], ""
s = bemerkung.strip()
for pattern, direction in ((_CHILD_RE, "child"), (_PARENT_RE, "parent")):
m = pattern.match(s)
if not m:
continue
# Split the captured group on the first comma or semicolon to separate
# the name part from any trailing description (e.g. ", nach Mexiko emigriert")
raw_names, _, trailing = m.group(1).strip().partition(",")
if not trailing:
raw_names, _, trailing = raw_names.partition(";")
name_part = raw_names.strip().rstrip("!., ")
remainder = trailing.strip().lstrip(".,! ")
parts = [p.strip() for p in _AND_RE.split(name_part) if p.strip()]
rels: list[dict] = []
unres: list[dict] = []
for part in parts:
part = part.rstrip("!., ")
matched_id, reason = _resolve_one(part, index)
if matched_id:
if direction == "child":
rels.append({
"personId": matched_id,
"relatedPersonId": row_id,
"type": "PARENT_OF",
"source": "bemerkung",
"rawBemerkung": bemerkung,
})
else:
rels.append({
"personId": row_id,
"relatedPersonId": matched_id,
"type": "PARENT_OF",
"source": "bemerkung",
"rawBemerkung": bemerkung,
})
else:
unres.append({
"rowId": row_id,
"field": "bemerkung",
"raw": bemerkung,
"reason": reason,
})
return rels, unres, remainder
# No pattern matched — full text goes to notes, nothing to unresolved
return [], [], s
def main() -> None:
parser = argparse.ArgumentParser(
description="Normalize Personendatei 2.xlsx → canonical-persons-tree.json"
)
parser.add_argument(
"--input", default=str(config.PERSON_WORKBOOK),
help="Path to Personendatei 2.xlsx"
)
parser.add_argument(
"--output", default=str(config.OUT_DIR / "canonical-persons-tree.json"),
help="Path for output JSON"
)
parser.add_argument("--dry-run", action="store_true", help="Print stats, skip write")
args = parser.parse_args()
from ingest import read_sheet, build_header_map
rows = read_sheet(Path(args.input), config.PERSON_SHEET)
if not rows:
print("ERROR: sheet is empty", file=sys.stderr)
sys.exit(1)
header_row = [str(v) for v in rows[0]]
fields_map, _ = build_header_map(header_row, config.PERSON_HEADER_MAP, config.PERSON_REQUIRED_FIELDS)
# --- Pass 1: parse rows ---
persons_raw: list[dict] = []
for row_num, row in enumerate(rows[1:], start=2):
field_dict = {field: (row[col] if col < len(row) else "") for field, col in fields_map.items()}
if not field_dict.get("last_name", "").strip():
continue
persons_raw.append(_parse_row(row_num, field_dict))
persons, skipped_msgs = _deduplicate(persons_raw)
for msg in skipped_msgs:
print(f" SKIP {msg}", file=sys.stderr)
index = _build_index(persons)
# --- Pass 2: resolve relationships ---
all_rels: list[dict] = []
all_unresolved: list[dict] = []
spouse_rels, spouse_unres = _resolve_spouses(persons, index)
all_rels.extend(spouse_rels)
all_unresolved.extend(spouse_unres)
for p in persons:
bemerkung = p.pop("_bemerkung_raw", None) or ""
p.pop("_spouse_raw", None)
rels, unres, remaining = _parse_bemerkung(p["rowId"], bemerkung, index)
all_rels.extend(rels)
all_unresolved.extend(unres)
if remaining:
existing = p.get("notes") or ""
if remaining not in existing:
p["notes"] = (existing + " " + remaining).strip() if existing else remaining
# --- Stats output ---
spouse_count = sum(1 for r in all_rels if r["type"] == "SPOUSE_OF")
parent_count = sum(1 for r in all_rels if r["type"] == "PARENT_OF")
print(f"{len(persons)} persons parsed")
print(f"{len(all_rels)} relationships emitted ({spouse_count} SPOUSE_OF, {parent_count} PARENT_OF)")
if all_unresolved:
print(f"{len(all_unresolved)} unresolved (see unresolved[] in output)")
if args.dry_run:
print("\n--- dry-run: first 5 unresolved ---")
for u in all_unresolved[:5]:
print(f" {u}")
return
output = {
"generated_at": datetime.datetime.now().isoformat(),
"source": Path(args.input).name,
"stats": {
"persons": len(persons),
"relationships": len(all_rels),
"unresolved": len(all_unresolved),
},
"persons": persons,
"relationships": all_rels,
"unresolved": all_unresolved,
}
out_path = Path(args.output)
out_path.parent.mkdir(exist_ok=True)
out_path.write_text(json.dumps(output, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"{args.output}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,119 @@
import csv
import re
from collections import Counter
from pathlib import Path
import config
_COLLECTIVE = config.COLLECTIVE_TERMS
_GERMAN_STOP_WORDS = {
"der", "die", "das", "ein", "eine", "einer", "einen", "einem", "eines",
"und", "oder", "aber", "an", "in", "auf", "für", "mit", "von", "zu",
"bei", "nach", "vor", "aus", "ist", "sind", "war", "waren", "hat",
"haben", "wird", "werden", "ich", "du", "er", "sie", "es", "wir",
"ihr", "ihn", "ihm", "ihnen", "mich", "mir", "dich", "dir",
"ihre", "ihren", "seinem", "seinen", "seiner", "seine",
"auch", "nicht", "noch", "dann", "durch", "dem", "den",
"des", "als", "wie", "dass", "um", "über", "unter", "zwischen",
"all", "alle", "was", "wer", "wo", "wann", "welche", "welcher",
"mehr", "sehr", "nur", "schon", "dabei", "dazu",
"bis", "seit", "gegen", "ohne", "doch", "wenn", "weil",
"ob", "so", "da", "dort", "hier", "nun", "ja", "nein",
"ihrer", "ihrem",
# Contracted prepositions common in German Inhalt summaries
"im", "am", "ans", "ins", "zum", "zur", "vom", "beim", "sich",
"hat", "hatte", "wird", "wurde", "wurden", "worden",
"kann", "konnte", "soll", "sollte", "will", "wollte",
"ihm", "dieses", "dieser", "diesem", "diesen",
}
def _is_correspondence(raw: str) -> bool:
lower = raw.lower()
return " an " in lower or lower.startswith("an ") or ".an " in lower
def _tokenize(text: str) -> list[str]:
return [t.lower() for t in re.findall(r"[a-zA-ZäöüÄÖÜß]+", text)]
def _has_collective(tokens: list[str]) -> bool:
return any(t in _COLLECTIVE for t in tokens)
def classify_schlagwort(raw: str) -> list[str]:
if not raw or not raw.strip():
return []
if not _is_correspondence(raw):
return [f"Themen/{raw}"]
if _has_collective(_tokenize(raw)):
return [f"Briefwechsel/{raw}"]
return []
def mine_summary_candidates(summaries: list[str]) -> list[tuple[str, int]]:
counter: Counter = Counter()
for summary in summaries:
for token in re.split(r"[,;\s]+", summary.lower()):
token = re.sub(r"[^a-zA-ZäöüÄÖÜß]", "", token)
if len(token) >= 2 and token not in _GERMAN_STOP_WORDS:
counter[token] += 1
return counter.most_common()
def load_approved_themes(path: Path) -> set[str]:
if not path.exists():
return set()
themes: set[str] = set()
with open(path, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
if row.get("candidate"):
themes.add(row["candidate"].strip().lower())
return themes
def apply_approved_themes(summary: str, themes: set[str]) -> list[str]:
lower = summary.lower()
return [
f"Themen/{theme}"
for theme in themes
if re.search(r"\b" + re.escape(theme) + r"\b", lower)
]
def generate_tags(schlagwort: str, summary: str, themes: set[str]) -> list[str]:
result = classify_schlagwort(schlagwort or "")
if summary and themes:
result = result + apply_approved_themes(summary, themes)
return result
def encode_tags(tag_list: list[str]) -> str:
return "|".join(tag_list)
def build_tag_tree(all_tag_paths: list[str]) -> list[dict]:
unique_paths = list(dict.fromkeys(all_tag_paths))
roots: dict[str, None] = {}
children: dict[str, tuple[str, str]] = {}
for path in unique_paths:
if "/" in path:
parent, child = path.split("/", 1)
roots[parent] = None
children[path] = (parent, child)
else:
roots[path] = None
rows: list[dict] = []
seen: set[str] = set()
for root in roots:
if root not in seen:
rows.append({"tag_path": root, "parent_name": "", "tag_name": root})
seen.add(root)
for path, (parent, child) in children.items():
if path not in seen:
rows.append({"tag_path": path, "parent_name": parent, "tag_name": child})
seen.add(path)
return rows

View File

@@ -51,7 +51,7 @@ def test_to_canonical_resolves_and_flags():
assert doc.sender_person_id == "de-gruyter-walter" assert doc.sender_person_id == "de-gruyter-walter"
assert doc.receiver_person_ids == ["de-gruyter-eugenie"] # matched via maiden alias assert doc.receiver_person_ids == ["de-gruyter-eugenie"] # matched via maiden alias
assert doc.date_iso == "1888-02-15" and doc.date_precision == "DAY" assert doc.date_iso == "1888-02-15" and doc.date_precision == "DAY"
assert doc.tags == ["Brautbriefe"] assert doc.tags == ["Themen/Brautbriefe"]
assert doc.needs_review == [] assert doc.needs_review == []
def test_to_canonical_unmatched_and_unparsed(): def test_to_canonical_unmatched_and_unparsed():

View File

@@ -62,3 +62,60 @@ def test_run_end_to_end(tmp_path):
assert _matrix(out_dir / "canonical-persons.xlsx") == persons1 assert _matrix(out_dir / "canonical-persons.xlsx") == persons1
assert (review_dir / "unparsed-dates.csv").read_text(encoding="utf-8") == unparsed1 assert (review_dir / "unparsed-dates.csv").read_text(encoding="utf-8") == unparsed1
assert len(docs1) == 4 # header + 3 docs assert len(docs1) == 4 # header + 3 docs
def test_tag_tree_output_emitted(tmp_path):
out_dir = tmp_path / "out"; review_dir = tmp_path / "review"
normalize.run(
document_workbook=_doc_wb(tmp_path), document_sheet="Familienarchiv",
person_workbook=_person_wb(tmp_path), person_sheet="Tabelle1",
out_dir=out_dir, review_dir=review_dir,
date_overrides={}, name_overrides={})
assert (out_dir / "canonical-tag-tree.xlsx").exists()
def test_tag_candidates_review_emitted(tmp_path):
out_dir = tmp_path / "out"; review_dir = tmp_path / "review"
normalize.run(
document_workbook=_doc_wb(tmp_path), document_sheet="Familienarchiv",
person_workbook=_person_wb(tmp_path), person_sheet="Tabelle1",
out_dir=out_dir, review_dir=review_dir,
date_overrides={}, name_overrides={})
assert (review_dir / "tag-candidates.csv").exists()
text = (review_dir / "tag-candidates.csv").read_text(encoding="utf-8")
assert "candidate" in text and "count" in text
def test_schlagwort_encoded_as_themen_in_documents(tmp_path):
out_dir = tmp_path / "out"; review_dir = tmp_path / "review"
normalize.run(
document_workbook=_doc_wb(tmp_path), document_sheet="Familienarchiv",
person_workbook=_person_wb(tmp_path), person_sheet="Tabelle1",
out_dir=out_dir, review_dir=review_dir,
date_overrides={}, name_overrides={})
wb = openpyxl.load_workbook(out_dir / "canonical-documents.xlsx")
ws = wb.active
header = [c.value for c in ws[1]]
tag_col = header.index("tags")
tag_values = [ws.cell(row=r, column=tag_col + 1).value for r in range(2, ws.max_row + 1)]
assert any(v and "Themen/Brautbriefe" in v for v in tag_values)
assert not any(v and v.strip() == "Brautbriefe" for v in tag_values)
def test_approved_themes_applied(tmp_path):
themes_file = tmp_path / "approved-themes.csv"
themes_file.write_text("candidate\ngeschäftsreise\n", encoding="utf-8")
out_dir = tmp_path / "out"; review_dir = tmp_path / "review"
normalize.run(
document_workbook=_doc_wb(tmp_path), document_sheet="Familienarchiv",
person_workbook=_person_wb(tmp_path), person_sheet="Tabelle1",
out_dir=out_dir, review_dir=review_dir,
date_overrides={}, name_overrides={},
approved_themes_path=themes_file)
wb = openpyxl.load_workbook(out_dir / "canonical-documents.xlsx")
ws = wb.active
header = [c.value for c in ws[1]]
tag_col = header.index("tags")
tag_values = [ws.cell(row=r, column=tag_col + 1).value for r in range(2, ws.max_row + 1)]
# W-0001 has Inhalt "Geschäftsreise" — should get an extra Themen/geschäftsreise tag
assert any(v and "Themen/geschäftsreise" in v for v in tag_values)

View File

@@ -0,0 +1,457 @@
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
import persons_tree
def test_parse_year_iso_string():
assert persons_tree._parse_year("1920-09-20") == 1920
def test_parse_year_excel_serial_birth():
# 7568 days from 1899-12-30 = 1920-09-19 or -20 depending on leap counting
assert persons_tree._parse_year("7568") == 1920
def test_parse_year_excel_serial_death():
# 36222 days from 1899-12-30 ≈ 1999
assert persons_tree._parse_year("36222") == 1999
def test_parse_year_excel_serial_small():
# 177 days from 1899-12-30 = 1900-06-25
assert persons_tree._parse_year("177") == 1900
def test_parse_year_german_date_string():
assert persons_tree._parse_year("30.8.1862") == 1862
def test_parse_year_year_only():
assert persons_tree._parse_year("1930") == 1930
def test_parse_year_free_text():
assert persons_tree._parse_year("August 1941") == 1941
def test_parse_year_none():
assert persons_tree._parse_year(None) is None
def test_parse_year_empty():
assert persons_tree._parse_year("") is None
def test_parse_year_unresolvable_truncated():
# "2.9.196" has no valid 4-digit year — returns None
assert persons_tree._parse_year("2.9.196") is None
def test_parse_year_typo_year():
# "4.3.1023" — year 1023 outside 1700-2100 guard — returns None
assert persons_tree._parse_year("4.3.1023") is None
def test_parse_year_bare_out_of_range_year_is_none():
# "1023" is a plausible typo for "1923" but is NOT an Excel serial.
# parse_date("1023") parses it as year 1023 (out of 1700-2100 guard).
# The serial branch must NOT re-interpret it as a serial.
assert persons_tree._parse_year("1023") is None
def test_parse_generation_space():
assert persons_tree._parse_generation("G 3") == 3
def test_parse_generation_no_space():
assert persons_tree._parse_generation("G3") == 3
def test_parse_generation_extra_spaces():
assert persons_tree._parse_generation("G 0") == 0
def test_parse_generation_trailing_garbage():
assert persons_tree._parse_generation("G 2 de Gruyter") == 2
def test_parse_generation_empty():
assert persons_tree._parse_generation("") is None
def test_parse_generation_none():
assert persons_tree._parse_generation(None) is None
def test_norm_tree_basic():
assert persons_tree._norm_tree("Werner Allemeyer") == "werner allemeyer"
def test_norm_tree_diacritics():
assert persons_tree._norm_tree("Wöhler") == "woehler"
def test_norm_tree_strips_parens():
assert persons_tree._norm_tree("Otto (Herbert)") == "otto"
def test_norm_tree_strips_quotes():
assert persons_tree._norm_tree('"Tante Lolly"') == "tante lolly"
def test_norm_tree_strips_geographic_suffix():
assert persons_tree._norm_tree("Walter Cram Aachen") == "walter cram"
def test_norm_tree_strips_mexiko():
assert persons_tree._norm_tree("Hans Cram Mexiko") == "hans cram"
def test_norm_tree_collapses_whitespace():
assert persons_tree._norm_tree(" Clara de Gruyter ") == "clara de gruyter"
def test_build_index_forward_lookup():
persons = [{"rowId": "row_002", "firstName": "Werner", "lastName": "Allemeyer", "maidenName": None}]
idx = persons_tree._build_index(persons)
assert "werner allemeyer" in idx
assert idx["werner allemeyer"] == ["row_002"]
def test_build_index_reversed_lookup():
persons = [{"rowId": "row_002", "firstName": "Werner", "lastName": "Allemeyer", "maidenName": None}]
idx = persons_tree._build_index(persons)
assert idx.get("allemeyer werner") == ["row_002"]
def test_build_index_maiden_name_lookup():
persons = [{"rowId": "row_002", "firstName": "Elsgard", "lastName": "Allemeyer", "maidenName": "Wöhler"}]
idx = persons_tree._build_index(persons)
assert idx.get("elsgard woehler") == ["row_002"]
def test_build_index_single_token_fallback():
persons = [{"rowId": "row_028", "firstName": "Herbert", "lastName": "Cram", "maidenName": None}]
idx = persons_tree._build_index(persons)
assert idx.get("cram") == ["row_028"]
def test_build_index_ambiguous_single_token():
persons = [
{"rowId": "row_028", "firstName": "Herbert", "lastName": "Cram", "maidenName": None},
{"rowId": "row_019", "firstName": "Clara", "lastName": "Cram", "maidenName": None},
]
idx = persons_tree._build_index(persons)
assert set(idx["cram"]) == {"row_028", "row_019"}
def test_resolve_one_found():
persons = [{"rowId": "row_003", "firstName": "Werner", "lastName": "Allemeyer", "maidenName": None}]
idx = persons_tree._build_index(persons)
row_id, reason = persons_tree._resolve_one("Allemeyer Werner", idx)
assert row_id == "row_003"
assert reason is None
def test_resolve_one_not_found():
idx = {}
row_id, reason = persons_tree._resolve_one("Nobody Unknown", idx)
assert row_id is None
assert reason == "not_found"
def test_resolve_one_ambiguous():
persons = [
{"rowId": "row_028", "firstName": "Herbert", "lastName": "Cram", "maidenName": None},
{"rowId": "row_019", "firstName": "Clara", "lastName": "Cram", "maidenName": None},
]
idx = persons_tree._build_index(persons)
row_id, reason = persons_tree._resolve_one("Cram", idx)
assert row_id is None
assert reason == "ambiguous"
def test_parse_row_serial_dates():
fields = {
"generation": "G 3", "last_name": "Allemeyer", "first_name": "Elsgard",
"maiden_name": "Wöhler", "birth_date": "7568", "birth_place": "Garz",
"death_date": "36222", "death_place": "Espelkamp",
"spouse": "Allemeyer Werner", "notes": "Nichte von Herbert",
}
p = persons_tree._parse_row(2, fields)
assert p["rowId"] == "row_002"
assert p["firstName"] == "Elsgard"
assert p["lastName"] == "Allemeyer"
assert p["maidenName"] == "Wöhler"
assert p["birthYear"] == 1920
assert p["deathYear"] == 1999
assert p["birthPlace"] == "Garz"
assert p["deathPlace"] == "Espelkamp"
assert p["generation"] == 3
assert p["familyMember"] is True
assert p["_spouse_raw"] == "Allemeyer Werner"
assert p["_bemerkung_raw"] == "Nichte von Herbert"
assert "[Geburtsdatum" not in (p["notes"] or "")
def test_parse_row_string_birth_date():
fields = {
"generation": "G 2", "last_name": "Cram", "first_name": "Herbert",
"maiden_name": "", "birth_date": "25.6.1890", "birth_place": "Texas",
"death_date": "", "death_place": "", "spouse": "", "notes": "",
}
p = persons_tree._parse_row(28, fields)
assert p["birthYear"] == 1890
assert p["deathYear"] is None
assert p["notes"] is None or p["notes"] == ""
def test_parse_row_unresolvable_date_goes_to_notes():
fields = {
"generation": "G 3", "last_name": "Heydrich", "first_name": "Dieter",
"maiden_name": "", "birth_date": "28.9.", "birth_place": "",
"death_date": "", "death_place": "", "spouse": "", "notes": "Bruder v Ingrid",
}
p = persons_tree._parse_row(96, fields)
assert p["birthYear"] is None
assert "[Geburtsdatum: 28.9.]" in p["notes"]
assert "Bruder v Ingrid" in p["notes"]
def test_parse_row_empty_spouse_and_notes():
fields = {
"generation": "G 4", "last_name": "Allemeyer", "first_name": "Jürgen",
"maiden_name": "", "birth_date": "", "birth_place": "",
"death_date": "", "death_place": "", "spouse": "", "notes": "",
}
p = persons_tree._parse_row(4, fields)
assert p["_spouse_raw"] is None
assert p["_bemerkung_raw"] is None
def test_deduplicate_no_duplicates():
persons = [
{"rowId": "row_002", "firstName": "Elsgard", "lastName": "Allemeyer", "birthYear": 1920},
{"rowId": "row_003", "firstName": "Werner", "lastName": "Allemeyer", "birthYear": 1923},
]
result, skipped = persons_tree._deduplicate(persons)
assert len(result) == 2
assert skipped == []
def test_deduplicate_exact_match():
# rows 127/138: same firstName, lastName, birthYear
persons = [
{"rowId": "row_127", "firstName": "Christa", "lastName": "Schütz", "birthYear": 1951},
{"rowId": "row_138", "firstName": "Christa", "lastName": "Schütz", "birthYear": 1951},
]
result, skipped = persons_tree._deduplicate(persons)
assert [p["rowId"] for p in result] == ["row_127"]
assert len(skipped) == 1
assert "row_138" in skipped[0]
def test_deduplicate_none_birth_year_after_known():
# rows 129/139: row 129 has birthYear=1964, row 139 has birthYear=None
persons = [
{"rowId": "row_129", "firstName": "Christoph", "lastName": "Seils", "birthYear": 1964},
{"rowId": "row_139", "firstName": "Christoph", "lastName": "Seils", "birthYear": None},
]
result, skipped = persons_tree._deduplicate(persons)
assert [p["rowId"] for p in result] == ["row_129"]
assert len(skipped) == 1
def test_deduplicate_both_none_birth_year_kept():
# Two people with no birth year but same name: keep first only
persons = [
{"rowId": "row_A", "firstName": "Hans", "lastName": "Cram", "birthYear": None},
{"rowId": "row_B", "firstName": "Hans", "lastName": "Cram", "birthYear": None},
]
result, skipped = persons_tree._deduplicate(persons)
assert [p["rowId"] for p in result] == ["row_A"]
assert len(skipped) == 1
def _make_persons(*args):
"""Helper: args are (rowId, firstName, lastName, maidenName, spouse_raw) tuples."""
return [
{"rowId": a[0], "firstName": a[1], "lastName": a[2], "maidenName": a[3],
"_spouse_raw": a[4], "_bemerkung_raw": None,
"birthYear": None, "deathYear": None, "birthPlace": None, "deathPlace": None,
"generation": None, "familyMember": True, "alias": None, "notes": None}
for a in args
]
def test_resolve_spouses_success():
persons = _make_persons(
("row_002", "Elsgard", "Allemeyer", "Wöhler", "Allemeyer Werner"),
("row_003", "Werner", "Allemeyer", None, "Elsgard Wöhler"),
)
idx = persons_tree._build_index(persons)
rels, unres = persons_tree._resolve_spouses(persons, idx)
assert len(rels) == 1
assert rels[0]["type"] == "SPOUSE_OF"
assert set([rels[0]["personId"], rels[0]["relatedPersonId"]]) == {"row_002", "row_003"}
assert unres == []
def test_resolve_spouses_not_found():
persons = _make_persons(
("row_007", "Charlotte", "Blomquist", "Ruge", '"Tante Lolly"'),
)
idx = persons_tree._build_index(persons)
rels, unres = persons_tree._resolve_spouses(persons, idx)
assert rels == []
assert len(unres) == 1
assert unres[0]["rowId"] == "row_007"
assert unres[0]["reason"] == "not_found"
def test_resolve_spouses_empty_spouse_field():
persons = _make_persons(
("row_004", "Jürgen", "Allemeyer", None, None),
)
idx = persons_tree._build_index(persons)
rels, unres = persons_tree._resolve_spouses(persons, idx)
assert rels == [] and unres == []
def _register(*args):
"""Build index from (rowId, first, last, maiden) tuples."""
persons = [
{"rowId": a[0], "firstName": a[1], "lastName": a[2], "maidenName": a[3]}
for a in args
]
return persons, persons_tree._build_index(persons)
def test_parse_bemerkung_sohn_two_parents():
_, idx = _register(
("row_019", "Clara", "Cram", "de Gruyter"),
("row_028", "Herbert", "Cram", None),
)
rels, unres, notes = persons_tree._parse_bemerkung(
"row_021", "Sohn v Clara Cram u Herbert Cram", idx
)
assert len(rels) == 2
assert all(r["type"] == "PARENT_OF" for r in rels)
child_ids = {r["relatedPersonId"] for r in rels}
parent_ids = {r["personId"] for r in rels}
assert child_ids == {"row_021"}
assert "row_019" in parent_ids and "row_028" in parent_ids
assert unres == []
assert notes == ""
def test_parse_bemerkung_tochter_von():
_, idx = _register(("row_019", "Clara", "Cram", None))
rels, unres, notes = persons_tree._parse_bemerkung(
"row_036", "Tochter von Clara Cram", idx
)
assert len(rels) == 1
assert rels[0] == {
"personId": "row_019",
"relatedPersonId": "row_036",
"type": "PARENT_OF",
"source": "bemerkung",
"rawBemerkung": "Tochter von Clara Cram",
}
assert notes == ""
def test_parse_bemerkung_vater():
_, idx = _register(("row_028", "Herbert", "Cram", None))
rels, unres, notes = persons_tree._parse_bemerkung(
"row_031", "Vater v Herbert Cram", idx
)
assert len(rels) == 1
assert rels[0]["personId"] == "row_031"
assert rels[0]["relatedPersonId"] == "row_028"
assert rels[0]["type"] == "PARENT_OF"
def test_parse_bemerkung_unmatched_parent_name():
_, idx = _register() # empty index
rels, unres, notes = persons_tree._parse_bemerkung(
"row_004", "Sohn v Elsgard A.", idx
)
assert rels == []
assert len(unres) == 1
assert unres[0]["reason"] == "not_found"
assert notes == ""
def test_parse_bemerkung_skip_nichte():
_, idx = _register(("row_028", "Herbert", "Cram", None))
rels, unres, notes = persons_tree._parse_bemerkung(
"row_002", "Nichte von Herbert", idx
)
assert rels == []
assert unres == []
assert notes == "Nichte von Herbert"
def test_parse_bemerkung_skip_bruder():
_, idx = _register(("row_028", "Herbert", "Cram", None))
rels, unres, notes = persons_tree._parse_bemerkung(
"row_033", "Bruder v Herbert", idx
)
assert rels == []
assert unres == []
assert notes == "Bruder v Herbert"
def test_parse_bemerkung_empty():
_, idx = _register()
rels, unres, notes = persons_tree._parse_bemerkung("row_004", "", idx)
assert rels == [] and unres == [] and notes == ""
def test_parse_bemerkung_plain_remark():
_, idx = _register()
rels, unres, notes = persons_tree._parse_bemerkung(
"row_029", "Verfasserin der Cram-Chronik !!", idx
)
assert rels == [] and unres == []
assert notes == "Verfasserin der Cram-Chronik !!"
def test_parse_bemerkung_sohn_with_trailing_remark():
_, idx = _register(
("row_019", "Clara", "Cram", "de Gruyter"),
("row_028", "Herbert", "Cram", None),
)
rels, unres, notes = persons_tree._parse_bemerkung(
"row_021", "Sohn v Clara Cram u Herbert Cram, nach Mexiko emigriert", idx
)
assert len(rels) == 2
assert unres == []
assert notes == "nach Mexiko emigriert"
import subprocess
def test_dry_run_exits_zero(tmp_path):
"""dry-run should complete without writing any file and exit 0."""
input_path = Path(__file__).parent.parent.parent.parent / "import" / "Personendatei 2.xlsx"
if not input_path.exists():
import pytest
pytest.skip("source Excel file not present")
result = subprocess.run(
[
sys.executable, str(Path(__file__).parent.parent / "persons_tree.py"),
"--input", str(input_path),
"--output", str(tmp_path / "out.json"),
"--dry-run",
],
capture_output=True, text=True,
)
assert result.returncode == 0, result.stderr
assert not (tmp_path / "out.json").exists()
assert "persons parsed" in result.stdout

View File

@@ -0,0 +1,191 @@
import tags
# --- classify_schlagwort ---
def test_semantic_tag_kept_as_themen():
assert tags.classify_schlagwort("Brautbriefe") == ["Themen/Brautbriefe"]
def test_everyday_tag_kept_as_themen():
assert tags.classify_schlagwort("Alltag in Ruhrort") == ["Themen/Alltag in Ruhrort"]
def test_event_tag_kept_as_themen():
assert tags.classify_schlagwort("zur Hochzeit") == ["Themen/zur Hochzeit"]
def test_individual_correspondence_dropped():
assert tags.classify_schlagwort("Clara an Herbert") == []
def test_individual_correspondence_with_year_dropped():
assert tags.classify_schlagwort("Herbert an Clara 1918") == []
def test_individual_with_role_dropped():
assert tags.classify_schlagwort("Vater Juan an Herbert") == []
def test_relational_receiver_dropped():
assert tags.classify_schlagwort("Clara an ihre Mutter") == []
def test_group_receiver_kinder_kept_as_briefwechsel():
assert tags.classify_schlagwort("Clara an Kinder") == ["Briefwechsel/Clara an Kinder"]
def test_group_receiver_eltern_kept():
assert tags.classify_schlagwort("Herbert an seine Eltern") == ["Briefwechsel/Herbert an seine Eltern"]
def test_group_receiver_geschwister_kept():
assert tags.classify_schlagwort("Walter an Geschwister") == ["Briefwechsel/Walter an Geschwister"]
def test_group_receiver_schwiegereltern_kept():
assert tags.classify_schlagwort("Clara an Schwiegereltern") == ["Briefwechsel/Clara an Schwiegereltern"]
def test_group_receiver_soehne_kept():
assert tags.classify_schlagwort("Mutter Cram an ihre Söhne") == ["Briefwechsel/Mutter Cram an ihre Söhne"]
def test_group_receiver_brueder_kept():
assert tags.classify_schlagwort("Hans an Brüder") == ["Briefwechsel/Hans an Brüder"]
def test_group_receiver_cousinen_kept():
assert tags.classify_schlagwort("Clara an Cousinen in Göttingen") == ["Briefwechsel/Clara an Cousinen in Göttingen"]
def test_group_receiver_freunde_kept():
assert tags.classify_schlagwort("Freunde an Herbert") == ["Briefwechsel/Freunde an Herbert"]
def test_group_sender_geschwister_kept():
# collective on the LEFT side of "an"
assert tags.classify_schlagwort("Geschwister Cram an Herbert") == ["Briefwechsel/Geschwister Cram an Herbert"]
def test_receiver_only_individual_dropped():
# starts with "an " — single individual receiver
assert tags.classify_schlagwort("an Walter de Gruyter") == []
def test_receiver_only_group_kept():
# starts with "an " — collective receiver
assert tags.classify_schlagwort("an ihre Geschwister") == ["Briefwechsel/an ihre Geschwister"]
def test_abbreviated_sender_individual_dropped():
# "Maria W.an Clara" — abbreviated name + ".an"
assert tags.classify_schlagwort("Maria W.an Clara") == []
def test_abbreviated_sender_group_kept():
assert tags.classify_schlagwort("Eugenie sen.an Kinder") == ["Briefwechsel/Eugenie sen.an Kinder"]
def test_empty_schlagwort_returns_empty():
assert tags.classify_schlagwort("") == []
def test_einzelkinder_kept():
assert tags.classify_schlagwort("Enkelkinder an Clara") == ["Briefwechsel/Enkelkinder an Clara"]
def test_geschw_abbreviation_kept():
# "Geschw." abbreviation for Geschwister — appears after "u" in receiver side
assert tags.classify_schlagwort("Bruder Hans an Herbert u Geschw.") == ["Briefwechsel/Bruder Hans an Herbert u Geschw."]
# --- mine_summary_candidates ---
def test_mine_candidates_counts_words():
summaries = ["Reise, Hochzeit", "Reise", "Krieg"]
candidates = dict(tags.mine_summary_candidates(summaries))
assert candidates["reise"] == 2
assert candidates["hochzeit"] == 1
assert candidates["krieg"] == 1
def test_mine_candidates_filters_stop_words():
summaries = ["und die Reise", "das ist eine Reise"]
candidates = dict(tags.mine_summary_candidates(summaries))
assert "reise" in candidates
assert "und" not in candidates
assert "die" not in candidates
assert "das" not in candidates
assert "ist" not in candidates
assert "eine" not in candidates
def test_mine_candidates_filters_contracted_prepositions():
# im=in+dem, zum=zu+dem, zur=zu+der, vom=von+dem, sich, am, beim
summaries = ["im Sommer zum Besuch, zur Hochzeit vom Vater, sich gefreut am Morgen beim Fest"]
candidates = dict(tags.mine_summary_candidates(summaries))
for stop in ("im", "zum", "zur", "vom", "sich", "am", "beim", "ans"):
assert stop not in candidates, f"stop word '{stop}' leaked through"
assert "besuch" in candidates
assert "hochzeit" in candidates
def test_mine_candidates_filters_single_chars():
summaries = ["x Reise y"]
candidates = dict(tags.mine_summary_candidates(summaries))
assert "x" not in candidates
assert "y" not in candidates
def test_mine_candidates_sorted_descending():
summaries = ["Reise", "Reise", "Hochzeit", "Reise", "Hochzeit", "Krieg"]
result = tags.mine_summary_candidates(summaries)
counts = [count for _, count in result]
assert counts == sorted(counts, reverse=True)
def test_mine_candidates_empty_summaries():
assert tags.mine_summary_candidates([]) == []
assert tags.mine_summary_candidates([""]) == []
# --- load_approved_themes and apply_approved_themes ---
def test_apply_themes_match_found(tmp_path):
themes = {"reise", "hochzeit"}
result = tags.apply_approved_themes("Reise nach Berlin", themes)
assert "Themen/reise" in result
def test_apply_themes_case_insensitive(tmp_path):
themes = {"reise"}
result = tags.apply_approved_themes("REISE", themes)
assert "Themen/reise" in result
def test_apply_themes_no_match(tmp_path):
themes = {"krieg"}
result = tags.apply_approved_themes("Alltag in Ruhrort", themes)
assert result == []
def test_apply_themes_multiple_matches():
themes = {"reise", "hochzeit"}
result = tags.apply_approved_themes("Reise zur Hochzeit", themes)
assert len(result) == 2
assert "Themen/reise" in result
assert "Themen/hochzeit" in result
# --- encode_tags ---
def test_encode_tags_single():
assert tags.encode_tags(["Themen/Brautbriefe"]) == "Themen/Brautbriefe"
def test_encode_tags_multiple():
result = tags.encode_tags(["Themen/Brautbriefe", "Briefwechsel/Clara an Kinder"])
assert result == "Themen/Brautbriefe|Briefwechsel/Clara an Kinder"
def test_encode_tags_empty():
assert tags.encode_tags([]) == ""
# --- build_tag_tree ---
def test_build_tag_tree_includes_roots():
paths = ["Themen/Brautbriefe", "Briefwechsel/Clara an Kinder"]
tree = tags.build_tag_tree(paths)
tag_paths = [row["tag_path"] for row in tree]
assert "Themen" in tag_paths
assert "Briefwechsel" in tag_paths
def test_build_tag_tree_includes_children():
paths = ["Themen/Brautbriefe"]
tree = tags.build_tag_tree(paths)
child = next(r for r in tree if r["tag_path"] == "Themen/Brautbriefe")
assert child["parent_name"] == "Themen"
assert child["tag_name"] == "Brautbriefe"
def test_build_tag_tree_root_has_empty_parent():
paths = ["Themen/Brautbriefe"]
tree = tags.build_tag_tree(paths)
root = next(r for r in tree if r["tag_path"] == "Themen")
assert root["parent_name"] == ""
assert root["tag_name"] == "Themen"
def test_build_tag_tree_no_duplicates():
paths = ["Themen/Brautbriefe", "Themen/Alltag", "Themen/Brautbriefe"]
tree = tags.build_tag_tree(paths)
tag_paths = [row["tag_path"] for row in tree]
assert len(tag_paths) == len(set(tag_paths))

View File

@@ -47,6 +47,19 @@ def write_documents_xlsx(docs, path: Path):
_write_xlsx(docs, DOC_COLUMNS, path) _write_xlsx(docs, DOC_COLUMNS, path)
def write_tag_tree_xlsx(tree: list[dict], path: Path):
columns = ["tag_path", "parent_name", "tag_name"]
wb = openpyxl.Workbook()
ws = wb.active
ws.append(columns)
for row in tree:
ws.append([row.get(col, "") for col in columns])
wb.properties.created = _FIXED_TS
wb.properties.modified = _FIXED_TS
Path(path).parent.mkdir(parents=True, exist_ok=True)
wb.save(path)
def write_persons_xlsx(people, path: Path): def write_persons_xlsx(people, path: Path):
_write_xlsx(people, PERSON_COLUMNS, path) _write_xlsx(people, PERSON_COLUMNS, path)