feat(audit): replace hour-trunc dedupe with LAG() session rollup (120-min gap)

Rewrites the activity feed query to group consecutive events on the same
(actor, document, kind) into sessions separated by >120 min gaps. A session
becomes one row with count = events-in-session and happenedAtUntil = last
event timestamp. Singletons keep count=1 / happenedAtUntil=null.

Algorithm: LAG() to get the previous event's timestamp in the same partition,
mark a new session when gap > 7200s, then SUM() over an unbounded preceding
window yields a running session_id. Aggregation groups by session_id.

COMMENT_ADDED and MENTION_CREATED always start a new session — these kinds
never roll up so each event stays its own row.

Also adds BLOCK_REVIEWED to the eligible-kinds WHERE clause (Chronik spec §02)
so reviewed blocks appear in the activity feed.

Five new integration tests cover combine-within-2h, split-at-boundary,
no-hard-cap-on-long-session, never-rolls-up-comments/mentions, and the
count/happenedAtUntil contract on both singletons and rollups.

Part of #285.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Marcel
2026-04-20 16:02:16 +02:00
committed by marcel
parent 22ddf8c12a
commit feefa682b3
2 changed files with 231 additions and 25 deletions

View File

@@ -23,33 +23,77 @@ public interface AuditLogQueryRepository extends JpaRepository<AuditLog, UUID> {
Optional<UUID> findMostRecentDocumentIdByActor(@Param("userId") UUID userId);
@Query(value = """
SELECT * FROM (
SELECT DISTINCT ON (a.actor_id, a.document_id, a.kind, date_trunc('hour', a.happened_at))
a.kind AS kind,
a.actor_id AS actorId,
CASE
WHEN u.first_name IS NOT NULL AND u.last_name IS NOT NULL
THEN UPPER(LEFT(u.first_name, 1)) || UPPER(LEFT(u.last_name, 1))
WHEN u.first_name IS NOT NULL THEN UPPER(LEFT(u.first_name, 1))
WHEN u.last_name IS NOT NULL THEN UPPER(LEFT(u.last_name, 1))
ELSE '?'
END AS actorInitials,
COALESCE(u.color, '') AS actorColor,
CONCAT_WS(' ', u.first_name, u.last_name) AS actorName,
a.document_id AS documentId,
a.happened_at AS happened_at,
(a.kind = 'MENTION_CREATED'
AND a.payload->>'mentionedUserId' = :currentUserId) AS youMentioned,
1 AS count,
CAST(NULL AS TIMESTAMPTZ) AS happenedAtUntil
WITH events AS (
SELECT
a.kind,
a.actor_id,
a.document_id,
a.happened_at,
a.payload,
LAG(a.happened_at) OVER (
PARTITION BY a.actor_id, a.document_id, a.kind
ORDER BY a.happened_at
) AS prev_happened_at
FROM audit_log a
LEFT JOIN users u ON u.id = a.actor_id
WHERE a.kind IN ('TEXT_SAVED','FILE_UPLOADED','ANNOTATION_CREATED','COMMENT_ADDED','MENTION_CREATED')
WHERE a.kind IN ('TEXT_SAVED','FILE_UPLOADED','ANNOTATION_CREATED',
'BLOCK_REVIEWED','COMMENT_ADDED','MENTION_CREATED')
AND a.document_id IS NOT NULL
ORDER BY a.actor_id, a.document_id, a.kind,
date_trunc('hour', a.happened_at), a.happened_at DESC
) deduped
ORDER BY happened_at DESC
),
sessions_marked AS (
SELECT
kind, actor_id, document_id, happened_at, payload,
CASE
WHEN kind IN ('COMMENT_ADDED','MENTION_CREATED') THEN 1
WHEN prev_happened_at IS NULL THEN 1
WHEN EXTRACT(EPOCH FROM (happened_at - prev_happened_at)) > 7200 THEN 1
ELSE 0
END AS is_new_session
FROM events
),
sessions AS (
SELECT
kind, actor_id, document_id, happened_at, payload,
SUM(is_new_session) OVER (
PARTITION BY actor_id, document_id, kind
ORDER BY happened_at
ROWS UNBOUNDED PRECEDING
) AS session_id
FROM sessions_marked
),
aggregated AS (
SELECT
s.kind,
s.actor_id,
s.document_id,
s.session_id,
MIN(s.happened_at) AS happened_at,
CASE WHEN COUNT(*) > 1 THEN MAX(s.happened_at) ELSE NULL END AS happened_at_until,
COUNT(*)::int AS count,
BOOL_OR(s.kind = 'MENTION_CREATED'
AND s.payload->>'mentionedUserId' = :currentUserId) AS you_mentioned
FROM sessions s
GROUP BY s.kind, s.actor_id, s.document_id, s.session_id
)
SELECT
ag.kind AS kind,
ag.actor_id AS actorId,
CASE
WHEN u.first_name IS NOT NULL AND u.last_name IS NOT NULL
THEN UPPER(LEFT(u.first_name, 1)) || UPPER(LEFT(u.last_name, 1))
WHEN u.first_name IS NOT NULL THEN UPPER(LEFT(u.first_name, 1))
WHEN u.last_name IS NOT NULL THEN UPPER(LEFT(u.last_name, 1))
ELSE '?'
END AS actorInitials,
COALESCE(u.color, '') AS actorColor,
CONCAT_WS(' ', u.first_name, u.last_name) AS actorName,
ag.document_id AS documentId,
ag.happened_at AS happened_at,
ag.you_mentioned AS youMentioned,
ag.count AS count,
ag.happened_at_until AS happenedAtUntil
FROM aggregated ag
LEFT JOIN users u ON u.id = ag.actor_id
ORDER BY ag.happened_at DESC
LIMIT :limit
""", nativeQuery = true)
List<ActivityFeedRow> findDedupedActivityFeed(

View File

@@ -0,0 +1,162 @@
package org.raddatz.familienarchiv.dashboard;
import org.junit.jupiter.api.Test;
import org.raddatz.familienarchiv.PostgresContainerConfig;
import org.raddatz.familienarchiv.audit.ActivityFeedRow;
import org.raddatz.familienarchiv.audit.AuditLogQueryRepository;
import org.raddatz.familienarchiv.config.FlywayConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.data.jpa.test.autoconfigure.DataJpaTest;
import org.springframework.boot.jdbc.test.autoconfigure.AutoConfigureTestDatabase;
import org.springframework.context.annotation.Import;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.transaction.annotation.Transactional;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
@DataJpaTest
@AutoConfigureTestDatabase(replace = AutoConfigureTestDatabase.Replace.NONE)
@Import({PostgresContainerConfig.class, FlywayConfig.class})
@Transactional
class AuditLogQueryRepositoryRolledUpTest {
static final UUID USER_ID = UUID.fromString("dddddddd-dddd-dddd-dddd-dddddddddddd");
static final UUID DOC_ID = UUID.fromString("eeeeeeee-eeee-eeee-eeee-eeeeeeeeeeee");
static final UUID OTHER_DOC_ID = UUID.fromString("ffffffff-ffff-ffff-ffff-ffffffffffff");
@Autowired AuditLogQueryRepository auditLogQueryRepository;
@Autowired JdbcTemplate jdbcTemplate;
private NamedParameterJdbcTemplate named() {
return new NamedParameterJdbcTemplate(jdbcTemplate);
}
private void insertUserAndDocs() {
jdbcTemplate.update(
"INSERT INTO users (id, enabled, email, password) VALUES (?, true, ?, 'pw')",
USER_ID, "rollup-" + USER_ID + "@test.com");
jdbcTemplate.update(
"INSERT INTO documents (id, title, original_filename, status) VALUES (?, 'Brief A', 'a.pdf', 'PLACEHOLDER')",
DOC_ID);
jdbcTemplate.update(
"INSERT INTO documents (id, title, original_filename, status) VALUES (?, 'Brief B', 'b.pdf', 'PLACEHOLDER')",
OTHER_DOC_ID);
}
private void insertAuditEvent(UUID actorId, UUID docId, String kind, Instant happenedAt) {
MapSqlParameterSource params = new MapSqlParameterSource()
.addValue("kind", kind)
.addValue("actor", actorId)
.addValue("doc", docId)
.addValue("t", OffsetDateTime.ofInstant(happenedAt, java.time.ZoneOffset.UTC));
named().update(
"INSERT INTO audit_log (kind, actor_id, document_id, happened_at) "
+ "VALUES (:kind, :actor, :doc, :t)",
params);
}
@Test
void rolledUpFeed_combines_same_actor_same_doc_within_2h() {
insertUserAndDocs();
Instant base = Instant.parse("2026-04-20T09:00:00Z");
for (int i = 0; i < 20; i++) {
insertAuditEvent(USER_ID, DOC_ID, "TEXT_SAVED", base.plusSeconds(i * 480L));
}
List<ActivityFeedRow> rows = auditLogQueryRepository.findDedupedActivityFeed(USER_ID.toString(), 40);
assertThat(rows).hasSize(1);
ActivityFeedRow row = rows.get(0);
assertThat(row.getKind()).isEqualTo("TEXT_SAVED");
assertThat(row.getDocumentId()).isEqualTo(DOC_ID);
assertThat(row.getCount()).isEqualTo(20);
assertThat(row.getHappenedAt()).isEqualTo(base);
assertThat(row.getHappenedAtUntil()).isEqualTo(base.plusSeconds(19 * 480L));
}
@Test
void rolledUpFeed_splits_at_2h_boundary() {
insertUserAndDocs();
Instant sessionOneStart = Instant.parse("2026-04-20T08:00:00Z");
Instant sessionOneLast = sessionOneStart.plusSeconds(600);
insertAuditEvent(USER_ID, DOC_ID, "TEXT_SAVED", sessionOneStart);
insertAuditEvent(USER_ID, DOC_ID, "TEXT_SAVED", sessionOneLast);
Instant sessionTwoStart = sessionOneLast.plusSeconds(2L * 60L * 60L + 60L);
insertAuditEvent(USER_ID, DOC_ID, "TEXT_SAVED", sessionTwoStart);
insertAuditEvent(USER_ID, DOC_ID, "TEXT_SAVED", sessionTwoStart.plusSeconds(300));
List<ActivityFeedRow> rows = auditLogQueryRepository.findDedupedActivityFeed(USER_ID.toString(), 40);
assertThat(rows).hasSize(2);
assertThat(rows.get(0).getCount()).isEqualTo(2);
assertThat(rows.get(0).getHappenedAt()).isEqualTo(sessionTwoStart);
assertThat(rows.get(1).getCount()).isEqualTo(2);
assertThat(rows.get(1).getHappenedAt()).isEqualTo(sessionOneStart);
}
@Test
void rolledUpFeed_has_no_hard_cap_on_long_session() {
insertUserAndDocs();
Instant base = Instant.parse("2026-04-20T06:00:00Z");
for (int i = 0; i < 30; i++) {
insertAuditEvent(USER_ID, DOC_ID, "ANNOTATION_CREATED", base.plusSeconds(i * 60L * 30L));
}
List<ActivityFeedRow> rows = auditLogQueryRepository.findDedupedActivityFeed(USER_ID.toString(), 40);
assertThat(rows).hasSize(1);
assertThat(rows.get(0).getCount()).isEqualTo(30);
assertThat(rows.get(0).getHappenedAt()).isEqualTo(base);
assertThat(rows.get(0).getHappenedAtUntil()).isEqualTo(base.plusSeconds(29 * 60L * 30L));
}
@Test
void rolledUpFeed_never_rolls_up_COMMENT_ADDED_or_MENTION_CREATED() {
insertUserAndDocs();
Instant base = Instant.parse("2026-04-20T10:00:00Z");
insertAuditEvent(USER_ID, DOC_ID, "COMMENT_ADDED", base);
insertAuditEvent(USER_ID, DOC_ID, "COMMENT_ADDED", base.plusSeconds(60));
insertAuditEvent(USER_ID, DOC_ID, "COMMENT_ADDED", base.plusSeconds(120));
List<ActivityFeedRow> rows = auditLogQueryRepository.findDedupedActivityFeed(USER_ID.toString(), 40);
assertThat(rows).hasSize(3);
assertThat(rows).allSatisfy(r -> {
assertThat(r.getKind()).isEqualTo("COMMENT_ADDED");
assertThat(r.getCount()).isEqualTo(1);
assertThat(r.getHappenedAtUntil()).isNull();
});
}
@Test
void rolledUpFeed_exposes_count_and_happenedAtUntil_on_singletons_and_rollups() {
insertUserAndDocs();
Instant rollupStart = Instant.parse("2026-04-20T11:00:00Z");
insertAuditEvent(USER_ID, DOC_ID, "FILE_UPLOADED", rollupStart);
insertAuditEvent(USER_ID, DOC_ID, "FILE_UPLOADED", rollupStart.plusSeconds(300));
insertAuditEvent(USER_ID, OTHER_DOC_ID, "FILE_UPLOADED", rollupStart.plusSeconds(900));
List<ActivityFeedRow> rows = auditLogQueryRepository.findDedupedActivityFeed(USER_ID.toString(), 40);
assertThat(rows).hasSize(2);
assertThat(rows).anySatisfy(r -> {
assertThat(r.getDocumentId()).isEqualTo(DOC_ID);
assertThat(r.getCount()).isEqualTo(2);
assertThat(r.getHappenedAt()).isEqualTo(rollupStart);
assertThat(r.getHappenedAtUntil()).isEqualTo(rollupStart.plusSeconds(300));
});
assertThat(rows).anySatisfy(r -> {
assertThat(r.getDocumentId()).isEqualTo(OTHER_DOC_ID);
assertThat(r.getCount()).isEqualTo(1);
assertThat(r.getHappenedAt()).isEqualTo(rollupStart.plusSeconds(900));
assertThat(r.getHappenedAtUntil()).isNull();
});
}
}