Files
familienarchiv/docs/superpowers/plans/2026-05-25-personendatei-importer.md
Marcel b37fd1728b docs(importer): add Personendatei importer implementation plan
9-task TDD plan for persons_tree.py — year extraction, name index,
deduplication, SPOUSE_OF/PARENT_OF extraction, CLI + JSON output.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-25 20:38:14 +02:00

1330 lines
43 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Personendatei Importer Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Add `tools/import-normalizer/persons_tree.py` — a CLI tool that reads `import/Personendatei 2.xlsx` and writes `out/canonical-persons-tree.json` with 163 normalized person records, SPOUSE_OF/PARENT_OF relationship edges, and an `unresolved[]` list for manual review.
**Architecture:** Two-pass approach: pass 1 parses all rows into person dicts and builds a name-lookup index; pass 2 resolves `verheiratet mit` (SPOUSE_OF) and parses `Bemerkung` for parent/child patterns (PARENT_OF). Reuses `ingest.read_sheet()`, `ingest.build_header_map()`, `dates.parse_date()`, and `persons._strip_accents` from the existing normalizer. No backend required.
**Tech Stack:** Python 3.12, openpyxl (already in `.venv`), pytest (already in `.venv`), `dates.py`/`ingest.py`/`config.py`/`persons.py` from `tools/import-normalizer/`.
---
## Context you need before starting
**Run environment:**
```bash
cd tools/import-normalizer
source .venv/bin/activate # or: .venv/bin/python / .venv/bin/pytest directly
```
**Key existing modules (read these before coding):**
- `config.py``PERSON_WORKBOOK`, `PERSON_SHEET`, `PERSON_HEADER_MAP`, `OUT_DIR`
- `ingest.py``read_sheet(path, sheet_name) -> list[list[str]]` and `build_header_map(header_row, field_map, required)`
- `dates.py``parse_date(raw: str) -> ParsedDate` with `.iso` (ISO string or None) and `.precision`
- `persons.py``_strip_accents(s)` (diacritic normalization)
**How ingest works:** `read_sheet()` opens the workbook with openpyxl and converts every cell to a string via `_cell_to_str()`. Date-formatted cells become ISO strings (`"1920-09-20"`). Cells stored as plain numbers (like the date serials in this file) become numeric strings (`"7568"`). All values arrive in `persons_tree.py` as strings.
**PERSON_HEADER_MAP** (already in `config.py`):
```python
{
"generation": "generation",
"familienname": "last_name",
"vorname": "first_name",
"geb als": "maiden_name",
"geburtsdatum": "birth_date",
"geburtsort": "birth_place",
"todesdatum": "death_date",
"sterbeort": "death_place",
"verheiratet mit": "spouse",
"bemerkung": "notes",
}
```
**File structure:**
- Create: `tools/import-normalizer/persons_tree.py`
- Create: `tools/import-normalizer/tests/test_persons_tree.py`
---
## Task 1: Year extraction from cell string
**Files:**
- Create: `tools/import-normalizer/persons_tree.py`
- Create: `tools/import-normalizer/tests/test_persons_tree.py`
The trickiest part of this tool. Birth/death cells arrive as strings from `ingest.read_sheet()`:
- Date-formatted cells: ISO string `"1920-09-20"``parse_date()` handles it
- Plain number cells (the majority): numeric string `"7568"``parse_date("7568")` returns UNKNOWN (7568 > 2100 so `expand_year()` rejects it) → we must detect this and apply Excel serial conversion: `date(1899,12,30) + timedelta(days=7568)` → 1920
- German string dates: `"30.8.1862"``parse_date()` handles it
- Year-only: `"1930"``parse_date()` handles it
- Free text: `"August 1941"``parse_date()` handles it
- Unresolvable: `"2.9.196"`, `"4.3.1023"` → return None
- [ ] **Step 1: Write the failing tests**
Create `tools/import-normalizer/tests/test_persons_tree.py`:
```python
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
import persons_tree
def test_parse_year_iso_string():
assert persons_tree._parse_year("1920-09-20") == 1920
def test_parse_year_excel_serial_birth():
# 7568 days from 1899-12-30 = 1920-09-19 or -20 depending on leap counting
assert persons_tree._parse_year("7568") == 1920
def test_parse_year_excel_serial_death():
# 36222 days from 1899-12-30 ≈ 1999
assert persons_tree._parse_year("36222") == 1999
def test_parse_year_excel_serial_small():
# 177 days from 1899-12-30 = 1900-06-25
assert persons_tree._parse_year("177") == 1900
def test_parse_year_german_date_string():
assert persons_tree._parse_year("30.8.1862") == 1862
def test_parse_year_year_only():
assert persons_tree._parse_year("1930") == 1930
def test_parse_year_free_text():
assert persons_tree._parse_year("August 1941") == 1941
def test_parse_year_none():
assert persons_tree._parse_year(None) is None
def test_parse_year_empty():
assert persons_tree._parse_year("") is None
def test_parse_year_unresolvable_truncated():
# "2.9.196" has no valid 4-digit year — returns None
assert persons_tree._parse_year("2.9.196") is None
def test_parse_year_typo_year():
# "4.3.1023" — year 1023 outside 1500-2100 guard — returns None
assert persons_tree._parse_year("4.3.1023") is None
```
- [ ] **Step 2: Run tests — verify they all fail with ImportError or NameError**
```bash
cd tools/import-normalizer
.venv/bin/pytest tests/test_persons_tree.py -v
```
Expected: `ImportError: No module named 'persons_tree'`
- [ ] **Step 3: Create `persons_tree.py` with `_parse_year`**
```python
"""Normalize Personendatei 2.xlsx into canonical-persons-tree.json."""
import argparse
import datetime
import json
import re
import sys
from pathlib import Path
import config
import dates
from persons import _strip_accents
def _parse_year(raw: str | None) -> int | None:
"""Extract a birth/death year from an Excel cell string.
Handles four cases:
1. ISO string (openpyxl date-formatted cell) → parse_date()
2. Numeric string that is an Excel serial (1-80000) → timedelta conversion
3. Any other string → parse_date()
4. Unresolvable → None
"""
if raw is None:
return None
s = str(raw).strip()
if not s:
return None
# Try parse_date first (handles ISO, DD.MM.YYYY, year-only, month+year, etc.)
result = dates.parse_date(s)
if result.iso:
return int(result.iso[:4])
# If it's a pure integer string, try Excel serial conversion.
# parse_date() returns UNKNOWN for serials like "7568" because 7568 > 2100.
if re.fullmatch(r"\d+", s):
n = int(s)
if 1 <= n <= 80_000:
d = datetime.date(1899, 12, 30) + datetime.timedelta(days=n)
if 1500 <= d.year <= 2100:
return d.year
return None
```
- [ ] **Step 4: Run tests — verify they pass**
```bash
.venv/bin/pytest tests/test_persons_tree.py -v
```
Expected: all 11 tests PASS.
- [ ] **Step 5: Commit**
```bash
git add tools/import-normalizer/persons_tree.py tools/import-normalizer/tests/test_persons_tree.py
git commit -m "feat(normalizer): add persons_tree skeleton + year extraction"
```
---
## Task 2: Generation number parsing
**Files:**
- Modify: `tools/import-normalizer/persons_tree.py`
- Modify: `tools/import-normalizer/tests/test_persons_tree.py`
Column A has values like `"G 3"`, `"G3"`, `"G 0"`, `"G 2 de Gruyter"`, `"G 0"`. Extract the first digit sequence.
- [ ] **Step 1: Write failing tests**
Append to `tests/test_persons_tree.py`:
```python
def test_parse_generation_space():
assert persons_tree._parse_generation("G 3") == 3
def test_parse_generation_no_space():
assert persons_tree._parse_generation("G3") == 3
def test_parse_generation_extra_spaces():
assert persons_tree._parse_generation("G 0") == 0
def test_parse_generation_trailing_garbage():
assert persons_tree._parse_generation("G 2 de Gruyter") == 2
def test_parse_generation_empty():
assert persons_tree._parse_generation("") is None
def test_parse_generation_none():
assert persons_tree._parse_generation(None) is None
```
- [ ] **Step 2: Run — expect NameError**
```bash
.venv/bin/pytest tests/test_persons_tree.py::test_parse_generation_space -v
```
Expected: `AttributeError: module 'persons_tree' has no attribute '_parse_generation'`
- [ ] **Step 3: Implement `_parse_generation`**
Add to `persons_tree.py` after `_parse_year`:
```python
def _parse_generation(raw: str | None) -> int | None:
"""Extract the generation integer from column A values like 'G 3', 'G3', 'G 0'."""
if not raw:
return None
m = re.search(r"\d+", str(raw))
return int(m.group()) if m else None
```
- [ ] **Step 4: Run — expect all generation tests pass**
```bash
.venv/bin/pytest tests/test_persons_tree.py -v
```
Expected: all 17 tests PASS.
- [ ] **Step 5: Commit**
```bash
git add tools/import-normalizer/persons_tree.py tools/import-normalizer/tests/test_persons_tree.py
git commit -m "feat(normalizer): add generation parser to persons_tree"
```
---
## Task 3: Name normalization and lookup index
**Files:**
- Modify: `tools/import-normalizer/persons_tree.py`
- Modify: `tools/import-normalizer/tests/test_persons_tree.py`
The lookup index maps normalized name strings to lists of `rowId`s. `_norm_tree` extends `persons._norm` with parenthetical stripping and geographic suffix removal. The index is built with four keys per person: `"first last"`, `"last first"`, `"first maiden"`, and `last` alone (for single-token fallback).
- [ ] **Step 1: Write failing tests**
Append to `tests/test_persons_tree.py`:
```python
def test_norm_tree_basic():
assert persons_tree._norm_tree("Werner Allemeyer") == "werner allemeyer"
def test_norm_tree_diacritics():
assert persons_tree._norm_tree("Wöhler") == "woehler"
def test_norm_tree_strips_parens():
assert persons_tree._norm_tree("Otto (Herbert)") == "otto"
def test_norm_tree_strips_quotes():
assert persons_tree._norm_tree('"Tante Lolly"') == "tante lolly"
def test_norm_tree_strips_geographic_suffix():
assert persons_tree._norm_tree("Walter Cram Aachen") == "walter cram"
def test_norm_tree_strips_mexiko():
assert persons_tree._norm_tree("Hans Cram Mexiko") == "hans cram"
def test_norm_tree_collapses_whitespace():
assert persons_tree._norm_tree(" Clara de Gruyter ") == "clara de gruyter"
def test_build_index_forward_lookup():
persons = [{"rowId": "row_002", "firstName": "Werner", "lastName": "Allemeyer", "maidenName": None}]
idx = persons_tree._build_index(persons)
assert "werner allemeyer" in idx
assert idx["werner allemeyer"] == ["row_002"]
def test_build_index_reversed_lookup():
persons = [{"rowId": "row_002", "firstName": "Werner", "lastName": "Allemeyer", "maidenName": None}]
idx = persons_tree._build_index(persons)
# col I uses reversed order: "Allemeyer Werner"
assert idx.get("allemeyer werner") == ["row_002"]
def test_build_index_maiden_name_lookup():
persons = [{"rowId": "row_002", "firstName": "Elsgard", "lastName": "Allemeyer", "maidenName": "Wöhler"}]
idx = persons_tree._build_index(persons)
# maiden-name form: "Elsgard Wöhler" -> "elsgard woehler"
assert idx.get("elsgard woehler") == ["row_002"]
def test_build_index_single_token_fallback():
persons = [{"rowId": "row_028", "firstName": "Herbert", "lastName": "Cram", "maidenName": None}]
idx = persons_tree._build_index(persons)
assert idx.get("cram") == ["row_028"]
def test_build_index_ambiguous_single_token():
persons = [
{"rowId": "row_028", "firstName": "Herbert", "lastName": "Cram", "maidenName": None},
{"rowId": "row_019", "firstName": "Clara", "lastName": "Cram", "maidenName": None},
]
idx = persons_tree._build_index(persons)
# "cram" alone is ambiguous — both rows map to it
assert set(idx["cram"]) == {"row_028", "row_019"}
def test_resolve_one_found():
persons = [{"rowId": "row_003", "firstName": "Werner", "lastName": "Allemeyer", "maidenName": None}]
idx = persons_tree._build_index(persons)
row_id, reason = persons_tree._resolve_one("Allemeyer Werner", idx)
assert row_id == "row_003"
assert reason is None
def test_resolve_one_not_found():
idx = {}
row_id, reason = persons_tree._resolve_one("Nobody Unknown", idx)
assert row_id is None
assert reason == "not_found"
def test_resolve_one_ambiguous():
persons = [
{"rowId": "row_028", "firstName": "Herbert", "lastName": "Cram", "maidenName": None},
{"rowId": "row_019", "firstName": "Clara", "lastName": "Cram", "maidenName": None},
]
idx = persons_tree._build_index(persons)
row_id, reason = persons_tree._resolve_one("Cram", idx)
assert row_id is None
assert reason == "ambiguous"
```
- [ ] **Step 2: Run — expect failures**
```bash
.venv/bin/pytest tests/test_persons_tree.py -v -k "norm_tree or build_index or resolve_one"
```
Expected: `AttributeError: module 'persons_tree' has no attribute '_norm_tree'`
- [ ] **Step 3: Implement `_norm_tree`, `_build_index`, `_resolve_one`**
Add to `persons_tree.py`:
```python
_GEO_SUFFIXES = {"aachen", "mex", "mexiko", "sen", "jun", "jr"}
def _norm_tree(s: str) -> str:
"""Normalize a name string for tree matching.
- Lowercase + diacritic → ASCII (uses persons._strip_accents logic)
- Strip surrounding quote characters
- Remove parenthetical substrings: "(Herbert)"""
- Replace dots with spaces (e.g. "Jr.""Jr ")
- Remove known geographic/honorific suffix tokens
- Collapse whitespace
"""
s = (s or "").strip().strip("\"'")
s = re.sub(r"\([^)]*\)", "", s)
s = _strip_accents(s).lower().replace(".", " ")
tokens = [t for t in s.split() if t and t not in _GEO_SUFFIXES]
return " ".join(tokens).strip("., ")
def _build_index(persons: list[dict]) -> dict[str, list[str]]:
"""Build a name → [rowId, …] lookup index with four keys per person."""
index: dict[str, list[str]] = {}
def _add(key: str, row_id: str) -> None:
if key:
index.setdefault(key, []).append(row_id)
for p in persons:
row_id = p["rowId"]
first = p.get("firstName") or ""
last = p.get("lastName") or ""
maiden = p.get("maidenName") or ""
_add(_norm_tree(f"{first} {last}"), row_id) # "Werner Allemeyer"
_add(_norm_tree(f"{last} {first}"), row_id) # "Allemeyer Werner" (col I order)
if maiden:
_add(_norm_tree(f"{first} {maiden}"), row_id) # maiden-name reference
_add(_norm_tree(last), row_id) # single-token fallback
return index
def _resolve_one(raw: str, index: dict[str, list[str]]) -> tuple[str | None, str | None]:
"""Return (row_id, None) on unique match, (None, reason) otherwise."""
key = _norm_tree(raw)
if not key:
return None, "empty"
hits = index.get(key, [])
if len(hits) == 1:
return hits[0], None
if len(hits) == 0:
return None, "not_found"
return None, "ambiguous"
```
- [ ] **Step 4: Run — all tests pass**
```bash
.venv/bin/pytest tests/test_persons_tree.py -v
```
Expected: all 36 tests PASS.
- [ ] **Step 5: Commit**
```bash
git add tools/import-normalizer/persons_tree.py tools/import-normalizer/tests/test_persons_tree.py
git commit -m "feat(normalizer): add name normalization + lookup index to persons_tree"
```
---
## Task 4: Row-level person parsing (pass 1)
**Files:**
- Modify: `tools/import-normalizer/persons_tree.py`
- Modify: `tools/import-normalizer/tests/test_persons_tree.py`
`_parse_row(row_num, fields)` takes a 1-based row number and a field dict (from `build_header_map`) and produces the person record. Unresolvable date raw values are appended to notes. Internal keys `_spouse_raw` and `_bemerkung_raw` carry forward to pass 2 and are stripped before JSON output.
- [ ] **Step 1: Write failing tests**
Append to `tests/test_persons_tree.py`:
```python
def test_parse_row_serial_dates():
fields = {
"generation": "G 3", "last_name": "Allemeyer", "first_name": "Elsgard",
"maiden_name": "Wöhler", "birth_date": "7568", "birth_place": "Garz",
"death_date": "36222", "death_place": "Espelkamp",
"spouse": "Allemeyer Werner", "notes": "Nichte von Herbert",
}
p = persons_tree._parse_row(2, fields)
assert p["rowId"] == "row_002"
assert p["firstName"] == "Elsgard"
assert p["lastName"] == "Allemeyer"
assert p["maidenName"] == "Wöhler"
assert p["birthYear"] == 1920
assert p["deathYear"] == 1999
assert p["birthPlace"] == "Garz"
assert p["deathPlace"] == "Espelkamp"
assert p["generation"] == 3
assert p["familyMember"] is True
assert p["_spouse_raw"] == "Allemeyer Werner"
assert p["_bemerkung_raw"] == "Nichte von Herbert"
# no date annotation in notes because both dates resolved
assert "[Geburtsdatum" not in (p["notes"] or "")
def test_parse_row_string_birth_date():
fields = {
"generation": "G 2", "last_name": "Cram", "first_name": "Herbert",
"maiden_name": "", "birth_date": "25.6.1890", "birth_place": "Texas",
"death_date": "", "death_place": "", "spouse": "", "notes": "",
}
p = persons_tree._parse_row(28, fields)
assert p["birthYear"] == 1890
assert p["deathYear"] is None
assert p["notes"] is None or p["notes"] == ""
def test_parse_row_unresolvable_date_goes_to_notes():
fields = {
"generation": "G 3", "last_name": "Heydrich", "first_name": "Dieter",
"maiden_name": "", "birth_date": "28.9.", "birth_place": "",
"death_date": "", "death_place": "", "spouse": "", "notes": "Bruder v Ingrid",
}
p = persons_tree._parse_row(96, fields)
assert p["birthYear"] is None
assert "[Geburtsdatum: 28.9.]" in p["notes"]
assert "Bruder v Ingrid" in p["notes"]
def test_parse_row_empty_spouse_and_notes():
fields = {
"generation": "G 4", "last_name": "Allemeyer", "first_name": "Jürgen",
"maiden_name": "", "birth_date": "", "birth_place": "",
"death_date": "", "death_place": "", "spouse": "", "notes": "",
}
p = persons_tree._parse_row(4, fields)
assert p["_spouse_raw"] is None
assert p["_bemerkung_raw"] is None
```
- [ ] **Step 2: Run — expect NameError**
```bash
.venv/bin/pytest tests/test_persons_tree.py -k "parse_row" -v
```
Expected: `AttributeError: module 'persons_tree' has no attribute '_parse_row'`
- [ ] **Step 3: Implement `_parse_row`**
Add to `persons_tree.py`:
```python
def _parse_row(row_num: int, fields: dict) -> dict:
"""Produce one person record from a header-mapped row dict.
Internal keys prefixed with '_' are stripped before JSON output in main().
"""
def s(key: str) -> str:
return (fields.get(key) or "").strip()
birth_raw = s("birth_date")
death_raw = s("death_date")
birth_year = _parse_year(birth_raw)
death_year = _parse_year(death_raw)
notes_parts = []
if birth_raw and birth_year is None:
notes_parts.append(f"[Geburtsdatum: {birth_raw}]")
if death_raw and death_year is None:
notes_parts.append(f"[Todesdatum: {death_raw}]")
bemerkung = s("notes")
if bemerkung:
notes_parts.append(bemerkung)
maiden = s("maiden_name") or None
spouse = s("spouse") or None
bemerkung_out = bemerkung or None
return {
"rowId": f"row_{row_num:03d}",
"firstName": s("first_name"),
"lastName": s("last_name"),
"maidenName": maiden,
"alias": None,
"notes": " ".join(notes_parts) or None,
"birthYear": birth_year,
"deathYear": death_year,
"birthPlace": s("birth_place") or None,
"deathPlace": s("death_place") or None,
"generation": _parse_generation(s("generation")),
"familyMember": True,
"_spouse_raw": spouse,
"_bemerkung_raw": bemerkung_out,
}
```
- [ ] **Step 4: Run — all tests pass**
```bash
.venv/bin/pytest tests/test_persons_tree.py -v
```
Expected: all 40 tests PASS.
- [ ] **Step 5: Commit**
```bash
git add tools/import-normalizer/persons_tree.py tools/import-normalizer/tests/test_persons_tree.py
git commit -m "feat(normalizer): add row parser to persons_tree"
```
---
## Task 5: Deduplication
**Files:**
- Modify: `tools/import-normalizer/persons_tree.py`
- Modify: `tools/import-normalizer/tests/test_persons_tree.py`
Two-stage deduplication:
1. Exact `(firstName, lastName, birthYear)` match — catches rows 127/138 (same name + serial).
2. `(firstName, lastName)` match where the later entry has `birthYear=None` and an earlier entry has a birthYear — catches rows 129/139 (one has a date, the other doesn't).
- [ ] **Step 1: Write failing tests**
Append to `tests/test_persons_tree.py`:
```python
def test_deduplicate_no_duplicates():
persons = [
{"rowId": "row_002", "firstName": "Elsgard", "lastName": "Allemeyer", "birthYear": 1920},
{"rowId": "row_003", "firstName": "Werner", "lastName": "Allemeyer", "birthYear": 1923},
]
result, skipped = persons_tree._deduplicate(persons)
assert len(result) == 2
assert skipped == []
def test_deduplicate_exact_match():
# rows 127/138: same firstName, lastName, birthYear
persons = [
{"rowId": "row_127", "firstName": "Christa", "lastName": "Schütz", "birthYear": 1951},
{"rowId": "row_138", "firstName": "Christa", "lastName": "Schütz", "birthYear": 1951},
]
result, skipped = persons_tree._deduplicate(persons)
assert [p["rowId"] for p in result] == ["row_127"]
assert len(skipped) == 1
assert "row_138" in skipped[0]
def test_deduplicate_none_birth_year_after_known():
# rows 129/139: row 129 has birthYear=1964, row 139 has birthYear=None
persons = [
{"rowId": "row_129", "firstName": "Christoph", "lastName": "Seils", "birthYear": 1964},
{"rowId": "row_139", "firstName": "Christoph", "lastName": "Seils", "birthYear": None},
]
result, skipped = persons_tree._deduplicate(persons)
assert [p["rowId"] for p in result] == ["row_129"]
assert len(skipped) == 1
def test_deduplicate_both_none_birth_year_kept():
# Two people with no birth year but same name: keep first only
persons = [
{"rowId": "row_A", "firstName": "Hans", "lastName": "Cram", "birthYear": None},
{"rowId": "row_B", "firstName": "Hans", "lastName": "Cram", "birthYear": None},
]
result, skipped = persons_tree._deduplicate(persons)
assert [p["rowId"] for p in result] == ["row_A"]
assert len(skipped) == 1
```
- [ ] **Step 2: Run — expect NameError**
```bash
.venv/bin/pytest tests/test_persons_tree.py -k "deduplicate" -v
```
Expected: `AttributeError: module 'persons_tree' has no attribute '_deduplicate'`
- [ ] **Step 3: Implement `_deduplicate`**
Add to `persons_tree.py`:
```python
def _deduplicate(persons: list[dict]) -> tuple[list[dict], list[str]]:
"""Remove duplicate rows. Two-stage:
1. Exact (firstName, lastName, birthYear) match.
2. (firstName, lastName) where the later entry has birthYear=None and an earlier
entry already has a known birthYear.
"""
seen_full: dict[tuple, str] = {} # (first, last, year) -> rowId
seen_name: dict[tuple, str] = {} # (first, last) -> rowId of first entry with a year
result: list[dict] = []
skipped: list[str] = []
for p in persons:
first, last, year = p["firstName"], p["lastName"], p["birthYear"]
key_full = (first, last, year)
key_name = (first, last)
if key_full in seen_full:
skipped.append(f"{p['rowId']} duplicates {seen_full[key_full]} ({first} {last}, year={year})")
continue
if year is None and key_name in seen_name:
skipped.append(f"{p['rowId']} duplicates {seen_name[key_name]} ({first} {last}, no birth year)")
continue
seen_full[key_full] = p["rowId"]
if year is not None:
seen_name[key_name] = p["rowId"]
result.append(p)
return result, skipped
```
- [ ] **Step 4: Run — all tests pass**
```bash
.venv/bin/pytest tests/test_persons_tree.py -v
```
Expected: all 44 tests PASS.
- [ ] **Step 5: Commit**
```bash
git add tools/import-normalizer/persons_tree.py tools/import-normalizer/tests/test_persons_tree.py
git commit -m "feat(normalizer): add deduplication to persons_tree"
```
---
## Task 6: SPOUSE_OF relationship extraction
**Files:**
- Modify: `tools/import-normalizer/persons_tree.py`
- Modify: `tools/import-normalizer/tests/test_persons_tree.py`
Walk every person's `_spouse_raw`, resolve via the name index, and emit one `SPOUSE_OF` edge per matched pair. Skip if an identical edge (either direction) already exists. Unresolved entries go to `unresolved[]`.
- [ ] **Step 1: Write failing tests**
Append to `tests/test_persons_tree.py`:
```python
def _make_persons(*args):
"""Helper: args are (rowId, firstName, lastName, maidenName, spouse_raw) tuples."""
return [
{"rowId": a[0], "firstName": a[1], "lastName": a[2], "maidenName": a[3],
"_spouse_raw": a[4], "_bemerkung_raw": None,
"birthYear": None, "deathYear": None, "birthPlace": None, "deathPlace": None,
"generation": None, "familyMember": True, "alias": None, "notes": None}
for a in args
]
def test_resolve_spouses_success():
persons = _make_persons(
("row_002", "Elsgard", "Allemeyer", "Wöhler", "Allemeyer Werner"),
("row_003", "Werner", "Allemeyer", None, "Elsgard Wöhler"),
)
idx = persons_tree._build_index(persons)
rels, unres = persons_tree._resolve_spouses(persons, idx)
# Both rows reference each other, but only ONE edge should be emitted
assert len(rels) == 1
assert rels[0]["type"] == "SPOUSE_OF"
assert set([rels[0]["personId"], rels[0]["relatedPersonId"]]) == {"row_002", "row_003"}
assert unres == []
def test_resolve_spouses_not_found():
persons = _make_persons(
("row_007", "Charlotte", "Blomquist", "Ruge", '"Tante Lolly"'),
)
idx = persons_tree._build_index(persons)
rels, unres = persons_tree._resolve_spouses(persons, idx)
assert rels == []
assert len(unres) == 1
assert unres[0]["rowId"] == "row_007"
assert unres[0]["reason"] == "not_found"
def test_resolve_spouses_empty_spouse_field():
persons = _make_persons(
("row_004", "Jürgen", "Allemeyer", None, None),
)
idx = persons_tree._build_index(persons)
rels, unres = persons_tree._resolve_spouses(persons, idx)
assert rels == [] and unres == []
```
- [ ] **Step 2: Run — expect NameError**
```bash
.venv/bin/pytest tests/test_persons_tree.py -k "resolve_spouses" -v
```
Expected: `AttributeError: module 'persons_tree' has no attribute '_resolve_spouses'`
- [ ] **Step 3: Implement `_resolve_spouses`**
Add to `persons_tree.py`:
```python
def _resolve_spouses(
persons: list[dict], index: dict[str, list[str]]
) -> tuple[list[dict], list[dict]]:
"""Emit SPOUSE_OF edges from each person's _spouse_raw field."""
relationships: list[dict] = []
unresolved: list[dict] = []
emitted: set[frozenset] = set()
for p in persons:
raw = (p.get("_spouse_raw") or "").strip()
if not raw:
continue
row_id = p["rowId"]
matched_id, reason = _resolve_one(raw, index)
if matched_id:
edge = frozenset([row_id, matched_id])
if edge not in emitted:
emitted.add(edge)
relationships.append({
"personId": row_id,
"relatedPersonId": matched_id,
"type": "SPOUSE_OF",
"source": "verheiratet_mit",
})
else:
unresolved.append({
"rowId": row_id,
"field": "verheiratet_mit",
"raw": raw,
"reason": reason,
})
return relationships, unresolved
```
- [ ] **Step 4: Run — all tests pass**
```bash
.venv/bin/pytest tests/test_persons_tree.py -v
```
Expected: all 47 tests PASS.
- [ ] **Step 5: Commit**
```bash
git add tools/import-normalizer/persons_tree.py tools/import-normalizer/tests/test_persons_tree.py
git commit -m "feat(normalizer): add SPOUSE_OF resolution to persons_tree"
```
---
## Task 7: PARENT_OF extraction from Bemerkung
**Files:**
- Modify: `tools/import-normalizer/persons_tree.py`
- Modify: `tools/import-normalizer/tests/test_persons_tree.py`
Two patterns anchored at start-of-string:
- `Sohn|Tochter + v(on)? + names` → named persons are parents of this row's person
- `Vater|Mutter + v(on)? + names` → this row's person is parent of named persons
Names after the keyword may be two people joined by ` u ` or ` und `. Each part is resolved independently. Unmatched parts go to `unresolved[]`. The matched portion is stripped from `notes`; the remainder of the Bemerkung stays in `notes`.
Everything that doesn't match any parent pattern goes to `notes` unchanged (no unresolved entry).
- [ ] **Step 1: Write failing tests**
Append to `tests/test_persons_tree.py`:
```python
def _register(*args):
"""Build index from (rowId, first, last, maiden) tuples."""
persons = [
{"rowId": a[0], "firstName": a[1], "lastName": a[2], "maidenName": a[3]}
for a in args
]
return persons, persons_tree._build_index(persons)
def test_parse_bemerkung_sohn_two_parents():
_, idx = _register(
("row_019", "Clara", "Cram", "de Gruyter"),
("row_028", "Herbert", "Cram", None),
)
rels, unres, notes = persons_tree._parse_bemerkung(
"row_021", "Sohn v Clara u Herbert", idx
)
assert len(rels) == 2
assert all(r["type"] == "PARENT_OF" for r in rels)
# Both parents point to the child
child_ids = {r["relatedPersonId"] for r in rels}
parent_ids = {r["personId"] for r in rels}
assert child_ids == {"row_021"}
assert "row_019" in parent_ids and "row_028" in parent_ids
assert unres == []
assert notes == ""
def test_parse_bemerkung_tochter_von():
_, idx = _register(("row_019", "Clara", "Cram", None))
rels, unres, notes = persons_tree._parse_bemerkung(
"row_036", "Tochter von Clara Cram", idx
)
assert len(rels) == 1
assert rels[0] == {
"personId": "row_019",
"relatedPersonId": "row_036",
"type": "PARENT_OF",
"source": "bemerkung",
"rawBemerkung": "Tochter von Clara Cram",
}
assert notes == ""
def test_parse_bemerkung_vater():
_, idx = _register(("row_028", "Herbert", "Cram", None))
rels, unres, notes = persons_tree._parse_bemerkung(
"row_031", "Vater v Herbert", idx
)
assert len(rels) == 1
assert rels[0]["personId"] == "row_031" # this person is the parent
assert rels[0]["relatedPersonId"] == "row_028"
assert rels[0]["type"] == "PARENT_OF"
def test_parse_bemerkung_unmatched_parent_name():
_, idx = _register() # empty index
rels, unres, notes = persons_tree._parse_bemerkung(
"row_004", "Sohn v Elsgard A.", idx
)
assert rels == []
assert len(unres) == 1
assert unres[0]["reason"] == "not_found"
# notes should be empty after stripping the matched pattern
assert notes == ""
def test_parse_bemerkung_skip_nichte():
_, idx = _register(("row_028", "Herbert", "Cram", None))
rels, unres, notes = persons_tree._parse_bemerkung(
"row_002", "Nichte von Herbert", idx
)
assert rels == []
assert unres == []
assert notes == "Nichte von Herbert"
def test_parse_bemerkung_skip_bruder():
_, idx = _register(("row_028", "Herbert", "Cram", None))
rels, unres, notes = persons_tree._parse_bemerkung(
"row_033", "Bruder v Herbert", idx
)
assert rels == []
assert unres == []
assert notes == "Bruder v Herbert"
def test_parse_bemerkung_empty():
_, idx = _register()
rels, unres, notes = persons_tree._parse_bemerkung("row_004", "", idx)
assert rels == [] and unres == [] and notes == ""
def test_parse_bemerkung_plain_remark():
_, idx = _register()
rels, unres, notes = persons_tree._parse_bemerkung(
"row_029", "Verfasserin der Cram-Chronik !!", idx
)
assert rels == [] and unres == []
assert notes == "Verfasserin der Cram-Chronik !!"
```
- [ ] **Step 2: Run — expect NameError**
```bash
.venv/bin/pytest tests/test_persons_tree.py -k "parse_bemerkung" -v
```
Expected: `AttributeError: module 'persons_tree' has no attribute '_parse_bemerkung'`
- [ ] **Step 3: Implement `_parse_bemerkung`**
Add to `persons_tree.py`:
```python
_CHILD_RE = re.compile(r"^(?:Sohn|Tochter)\s+v(?:on)?\s+(.+)", re.I)
_PARENT_RE = re.compile(r"^(?:Vater|Mutter)\s+v(?:on)?\s+(.+)", re.I)
_AND_RE = re.compile(r"\s+u(?:nd)?\s+", re.I)
def _parse_bemerkung(
row_id: str, bemerkung: str, index: dict[str, list[str]]
) -> tuple[list[dict], list[dict], str]:
"""Extract PARENT_OF edges from a Bemerkung cell.
Returns (relationships, unresolved, remaining_notes).
Text that doesn't match a parent pattern goes to remaining_notes unchanged.
"""
if not bemerkung or not bemerkung.strip():
return [], [], ""
s = bemerkung.strip()
for pattern, direction in ((_CHILD_RE, "child"), (_PARENT_RE, "parent")):
m = pattern.match(s)
if not m:
continue
name_part = m.group(1).strip().rstrip("!., ")
parts = [p.strip() for p in _AND_RE.split(name_part) if p.strip()]
rels: list[dict] = []
unres: list[dict] = []
for part in parts:
part = part.rstrip("!., ")
matched_id, reason = _resolve_one(part, index)
if matched_id:
if direction == "child":
# named person is parent of this row
rels.append({
"personId": matched_id,
"relatedPersonId": row_id,
"type": "PARENT_OF",
"source": "bemerkung",
"rawBemerkung": bemerkung,
})
else:
# this row is parent of named person
rels.append({
"personId": row_id,
"relatedPersonId": matched_id,
"type": "PARENT_OF",
"source": "bemerkung",
"rawBemerkung": bemerkung,
})
else:
unres.append({
"rowId": row_id,
"field": "bemerkung",
"raw": bemerkung,
"reason": reason,
})
remainder = s[m.end():].strip().lstrip(".,! ")
return rels, unres, remainder
# No pattern matched — full text goes to notes, nothing to unresolved
return [], [], s
```
- [ ] **Step 4: Run — all tests pass**
```bash
.venv/bin/pytest tests/test_persons_tree.py -v
```
Expected: all 55 tests PASS.
- [ ] **Step 5: Commit**
```bash
git add tools/import-normalizer/persons_tree.py tools/import-normalizer/tests/test_persons_tree.py
git commit -m "feat(normalizer): add PARENT_OF Bemerkung extraction to persons_tree"
```
---
## Task 8: main() — CLI, two-pass loop, JSON output
**Files:**
- Modify: `tools/import-normalizer/persons_tree.py`
- Modify: `tools/import-normalizer/tests/test_persons_tree.py`
Wire the two passes into `main()`. Pass 1: read sheet → parse rows → deduplicate → build index. Pass 2: resolve spouses + parse Bemerkung → collect relationships + unresolved → strip internal `_` keys → write JSON.
- [ ] **Step 1: Write failing test for dry-run**
Append to `tests/test_persons_tree.py`:
```python
import subprocess
def test_dry_run_exits_zero(tmp_path):
"""dry-run should complete without writing any file and exit 0."""
input_path = Path(__file__).parent.parent.parent.parent / "import" / "Personendatei 2.xlsx"
if not input_path.exists():
import pytest
pytest.skip("source Excel file not present")
result = subprocess.run(
[
sys.executable, str(Path(__file__).parent.parent / "persons_tree.py"),
"--input", str(input_path),
"--output", str(tmp_path / "out.json"),
"--dry-run",
],
capture_output=True, text=True,
)
assert result.returncode == 0, result.stderr
assert not (tmp_path / "out.json").exists()
assert "persons parsed" in result.stdout
```
- [ ] **Step 2: Run — expect NameError/AttributeError**
```bash
.venv/bin/pytest tests/test_persons_tree.py::test_dry_run_exits_zero -v
```
Expected: `AttributeError: module 'persons_tree' has no attribute 'main'` or exit code != 0.
- [ ] **Step 3: Implement `main()`**
Add to `persons_tree.py`:
```python
def main() -> None:
parser = argparse.ArgumentParser(
description="Normalize Personendatei 2.xlsx → canonical-persons-tree.json"
)
parser.add_argument(
"--input", default=str(config.PERSON_WORKBOOK),
help="Path to Personendatei 2.xlsx"
)
parser.add_argument(
"--output", default=str(config.OUT_DIR / "canonical-persons-tree.json"),
help="Path for output JSON"
)
parser.add_argument("--dry-run", action="store_true", help="Print stats, skip write")
args = parser.parse_args()
from ingest import read_sheet, build_header_map
rows = read_sheet(Path(args.input), config.PERSON_SHEET)
if not rows:
print("ERROR: sheet is empty", file=sys.stderr)
sys.exit(1)
header_row = [str(v) for v in rows[0]]
fields_map, _ = build_header_map(header_row, config.PERSON_HEADER_MAP, config.PERSON_REQUIRED_FIELDS)
# --- Pass 1: parse rows ---
persons_raw: list[dict] = []
for row_num, row in enumerate(rows[1:], start=2):
field_dict = {field: (row[col] if col < len(row) else "") for field, col in fields_map.items()}
if not field_dict.get("last_name", "").strip():
continue
persons_raw.append(_parse_row(row_num, field_dict))
persons, skipped_msgs = _deduplicate(persons_raw)
for msg in skipped_msgs:
print(f" SKIP {msg}", file=sys.stderr)
index = _build_index(persons)
# --- Pass 2: resolve relationships ---
all_rels: list[dict] = []
all_unresolved: list[dict] = []
spouse_rels, spouse_unres = _resolve_spouses(persons, index)
all_rels.extend(spouse_rels)
all_unresolved.extend(spouse_unres)
for p in persons:
bemerkung = p.pop("_bemerkung_raw", None) or ""
p.pop("_spouse_raw", None)
rels, unres, remaining = _parse_bemerkung(p["rowId"], bemerkung, index)
all_rels.extend(rels)
all_unresolved.extend(unres)
if remaining:
existing = p.get("notes") or ""
# avoid duplicating the bemerkung that was already put in notes during _parse_row
if remaining not in existing:
p["notes"] = (existing + " " + remaining).strip() if existing else remaining
# --- Stats output ---
spouse_count = sum(1 for r in all_rels if r["type"] == "SPOUSE_OF")
parent_count = sum(1 for r in all_rels if r["type"] == "PARENT_OF")
print(f"{len(persons)} persons parsed")
print(f"{len(all_rels)} relationships emitted ({spouse_count} SPOUSE_OF, {parent_count} PARENT_OF)")
if all_unresolved:
print(f"{len(all_unresolved)} unresolved (see unresolved[] in output)")
if args.dry_run:
print("\n--- dry-run: first 5 unresolved ---")
for u in all_unresolved[:5]:
print(f" {u}")
return
output = {
"generated_at": datetime.datetime.now().isoformat(),
"source": Path(args.input).name,
"stats": {
"persons": len(persons),
"relationships": len(all_rels),
"unresolved": len(all_unresolved),
},
"persons": persons,
"relationships": all_rels,
"unresolved": all_unresolved,
}
out_path = Path(args.output)
out_path.parent.mkdir(exist_ok=True)
out_path.write_text(json.dumps(output, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"{args.output}")
if __name__ == "__main__":
main()
```
- [ ] **Step 4: Run dry-run test**
```bash
.venv/bin/pytest tests/test_persons_tree.py::test_dry_run_exits_zero -v
```
Expected: PASS. (If the Excel file is absent the test is skipped, not failed.)
- [ ] **Step 5: Run all tests**
```bash
.venv/bin/pytest tests/test_persons_tree.py -v
```
Expected: all 56 tests PASS (or 55 + 1 skipped if Excel file absent).
- [ ] **Step 6: Commit**
```bash
git add tools/import-normalizer/persons_tree.py tools/import-normalizer/tests/test_persons_tree.py
git commit -m "feat(normalizer): add main() CLI to persons_tree"
```
---
## Task 9: Integration run against the real file
**Files:** none (read-only validation)
- [ ] **Step 1: Run with `--dry-run` and inspect output**
```bash
cd tools/import-normalizer
.venv/bin/python persons_tree.py --dry-run
```
Expected output (approximate — exact numbers will differ once resolved):
```
✓ ~161 persons parsed (163 rows minus 2 duplicates)
✓ ~N relationships emitted (X SPOUSE_OF, Y PARENT_OF)
⚠ ~Z unresolved (see unresolved[] in output)
--- dry-run: first 5 unresolved ---
{'rowId': '...', 'field': '...', 'raw': '...', 'reason': '...'}
...
```
If you see `ERROR` or a Python traceback, investigate before continuing.
- [ ] **Step 2: Write the output file**
```bash
.venv/bin/python persons_tree.py
```
Expected: `→ out/canonical-persons-tree.json`
- [ ] **Step 3: Spot-check the output**
```bash
python3 -c "
import json
data = json.load(open('out/canonical-persons-tree.json'))
print('persons:', data['stats']['persons'])
print('relationships:', data['stats']['relationships'])
print('unresolved:', data['stats']['unresolved'])
# Check Herbert Cram
herbert = next(p for p in data['persons'] if p['firstName'] == 'Herbert' and p['lastName'] == 'Cram')
print('Herbert:', herbert)
# Check a SPOUSE_OF edge involving Clara and Herbert
clara = next(p for p in data['persons'] if p['firstName'] == 'Clara' and p['lastName'] == 'Cram')
spouse_edge = next((r for r in data['relationships']
if r['type'] == 'SPOUSE_OF'
and {r['personId'], r['relatedPersonId']} == {herbert['rowId'], clara['rowId']}), None)
print('Herbert-Clara SPOUSE_OF edge:', spouse_edge)
"
```
Verify:
- `persons` ≈ 161 (163 2 duplicates)
- Herbert Cram has `birthYear: 1890`, `generation: 2`
- A `SPOUSE_OF` edge exists between Herbert and Clara
- [ ] **Step 4: Commit the output file**
```bash
git add out/canonical-persons-tree.json
git commit -m "feat(normalizer): add canonical-persons-tree.json output"
```
---
## Self-Review Checklist
- **§4 date parsing** → Task 1 (`_parse_year`) covers Excel serial, ISO, German string, year-only, free text, unresolvable ✓
- **§5 generation** → Task 2 (`_parse_generation`) covers all format variants ✓
- **§5 notes construction** → Task 4 (`_parse_row`) appends unresolvable date raws and bemerkung ✓
- **§6 name index** → Task 3 (`_norm_tree`, `_build_index`, `_resolve_one`) covers forward, reversed, maiden, single-token, ambiguous ✓
- **§12 OQ-01 deduplication** → Task 5 (`_deduplicate`) handles same-year + no-year cases ✓
- **§7.1 SPOUSE_OF** → Task 6 (`_resolve_spouses`) with dedup of bidirectional edges ✓
- **§7.2 PARENT_OF** → Task 7 (`_parse_bemerkung`) with Sohn/Tochter/Vater/Mutter + multi-parent split ✓
- **§9 CLI** → Task 8 (`main()`) with `--input`, `--output`, `--dry-run`
- **§10 module reuse** → `ingest.read_sheet`, `ingest.build_header_map`, `dates.parse_date`, `persons._strip_accents` all used ✓
- **§11 non-goals** → no API calls, no alias records, no SIBLING_OF, no dedup vs canonical-persons.xlsx ✓
- **§8 JSON schema** → all fields present: `rowId`, `firstName`, `lastName`, `maidenName`, `alias`, `notes`, `birthYear`, `deathYear`, `birthPlace`, `deathPlace`, `generation`, `familyMember`