fix(audit): submit afterCommit write to executor to avoid transaction sync conflict
AuditService.logAfterCommit() called writeLog() inline inside the afterCommit() callback. At that point Spring's transaction synchronizations are still active on the thread, so SimpleJpaRepository.save() throws IllegalStateException which the catch block silently swallowed — leaving audit_log permanently empty. Fix: submit writeLog() to auditExecutor so it runs on a fresh thread with no active synchronization context. Also switch auditExecutor from CallerRunsPolicy to AbortPolicy to prevent the bug from silently recurring when the queue fills under load. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -2,6 +2,8 @@ package org.raddatz.familienarchiv.audit;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.core.task.TaskExecutor;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.support.TransactionSynchronization;
|
||||
@@ -16,6 +18,8 @@ import java.util.UUID;
|
||||
public class AuditService {
|
||||
|
||||
private final AuditLogRepository auditLogRepository;
|
||||
@Qualifier("auditExecutor")
|
||||
private final TaskExecutor auditExecutor;
|
||||
|
||||
@Async("auditExecutor")
|
||||
public void log(AuditKind kind, UUID actorId, UUID documentId, Map<String, Object> payload) {
|
||||
@@ -27,7 +31,10 @@ public class AuditService {
|
||||
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
|
||||
@Override
|
||||
public void afterCommit() {
|
||||
writeLog(kind, actorId, documentId, payload);
|
||||
// Run on a separate thread: the afterCommit() callback fires while Spring's
|
||||
// transaction synchronizations are still active on the current thread, which
|
||||
// prevents SimpleJpaRepository.save() from starting a new transaction inline.
|
||||
auditExecutor.execute(() -> writeLog(kind, actorId, documentId, payload));
|
||||
}
|
||||
});
|
||||
} else {
|
||||
|
||||
@@ -31,7 +31,10 @@ public class AsyncConfig {
|
||||
executor.setMaxPoolSize(2);
|
||||
executor.setQueueCapacity(50);
|
||||
executor.setThreadNamePrefix("Audit-");
|
||||
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
|
||||
// AbortPolicy instead of CallerRunsPolicy: if CallerRunsPolicy ran the task on the
|
||||
// afterCommit() callback thread, Spring's transaction synchronizations would still be
|
||||
// active on that thread and SimpleJpaRepository.save() would throw IllegalStateException.
|
||||
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
|
||||
return executor;
|
||||
}
|
||||
}
|
||||
@@ -19,4 +19,7 @@ CREATE INDEX idx_audit_log_kind ON audit_log (kind);
|
||||
|
||||
-- Enforce append-only at the database layer: the application role may INSERT
|
||||
-- but must not UPDATE or DELETE audit rows.
|
||||
-- NOTE: This REVOKE is a no-op when the current user is the table owner.
|
||||
-- PostgreSQL owners retain all privileges regardless of REVOKE. The append-only
|
||||
-- guarantee is enforced at the application layer only.
|
||||
REVOKE UPDATE, DELETE ON audit_log FROM CURRENT_USER;
|
||||
|
||||
@@ -0,0 +1,53 @@
|
||||
package org.raddatz.familienarchiv.audit;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.raddatz.familienarchiv.PostgresContainerConfig;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.context.annotation.Import;
|
||||
import org.springframework.test.annotation.DirtiesContext;
|
||||
import org.springframework.test.context.ActiveProfiles;
|
||||
import org.springframework.test.context.bean.override.mockito.MockitoBean;
|
||||
import org.springframework.transaction.support.TransactionTemplate;
|
||||
import software.amazon.awssdk.services.s3.S3Client;
|
||||
|
||||
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.awaitility.Awaitility.await;
|
||||
|
||||
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE)
|
||||
@ActiveProfiles("test")
|
||||
@Import(PostgresContainerConfig.class)
|
||||
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
|
||||
class AuditServiceIntegrationTest {
|
||||
|
||||
@MockitoBean S3Client s3Client;
|
||||
@Autowired AuditService auditService;
|
||||
@Autowired AuditLogRepository auditLogRepository;
|
||||
@Autowired TransactionTemplate transactionTemplate;
|
||||
|
||||
@Test
|
||||
void logAfterCommit_writes_ANNOTATION_CREATED_row_after_transaction_commits() {
|
||||
transactionTemplate.execute(status -> {
|
||||
auditService.logAfterCommit(AuditKind.ANNOTATION_CREATED, null, null, null);
|
||||
return null;
|
||||
});
|
||||
|
||||
await().atMost(5, SECONDS).until(() -> auditLogRepository.count() > 0);
|
||||
assertThat(auditLogRepository.findAll())
|
||||
.extracting(AuditLog::getKind)
|
||||
.containsExactly(AuditKind.ANNOTATION_CREATED);
|
||||
}
|
||||
|
||||
@Test
|
||||
void logAfterCommit_writes_no_row_when_transaction_rolls_back() {
|
||||
try {
|
||||
transactionTemplate.execute(status -> {
|
||||
auditService.logAfterCommit(AuditKind.ANNOTATION_CREATED, null, null, null);
|
||||
throw new RuntimeException("force rollback");
|
||||
});
|
||||
} catch (RuntimeException ignored) {}
|
||||
|
||||
assertThat(auditLogRepository.count()).isZero();
|
||||
}
|
||||
}
|
||||
@@ -7,6 +7,7 @@ import org.mockito.InjectMocks;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.MockedStatic;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
import org.springframework.core.task.TaskExecutor;
|
||||
import org.springframework.transaction.support.TransactionSynchronization;
|
||||
import org.springframework.transaction.support.TransactionSynchronizationManager;
|
||||
|
||||
@@ -24,6 +25,7 @@ import static org.mockito.Mockito.*;
|
||||
class AuditServiceTest {
|
||||
|
||||
@Mock AuditLogRepository auditLogRepository;
|
||||
@Mock TaskExecutor auditExecutor;
|
||||
@InjectMocks AuditService auditService;
|
||||
|
||||
@Test
|
||||
@@ -94,9 +96,7 @@ class AuditServiceTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
void logAfterCommit_registersCallback_andSavesOnlyAfterCommit_whenTransactionIsActive() {
|
||||
when(auditLogRepository.save(any())).thenAnswer(inv -> inv.getArgument(0));
|
||||
|
||||
void logAfterCommit_registersCallback_andSubmitsToExecutor_afterCommit() {
|
||||
try (MockedStatic<TransactionSynchronizationManager> mocked =
|
||||
mockStatic(TransactionSynchronizationManager.class)) {
|
||||
mocked.when(TransactionSynchronizationManager::isActualTransactionActive).thenReturn(true);
|
||||
@@ -106,15 +106,16 @@ class AuditServiceTest {
|
||||
|
||||
auditService.logAfterCommit(AuditKind.TEXT_SAVED, null, null, null);
|
||||
|
||||
// Callback registered but repo not yet called
|
||||
// Callback registered but executor not yet invoked
|
||||
assertThat(captured).hasSize(1);
|
||||
verify(auditLogRepository, never()).save(any());
|
||||
verify(auditExecutor, never()).execute(any());
|
||||
|
||||
// Simulate transaction commit
|
||||
captured.get(0).afterCommit();
|
||||
|
||||
// Now the row should be saved
|
||||
verify(auditLogRepository).save(any());
|
||||
// Write submitted to executor — not called inline
|
||||
verify(auditExecutor).execute(any());
|
||||
verify(auditLogRepository, never()).save(any());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user