Progress Bar in Spring Batch trotz Chunk Processing

Jump Boat
13 min readApr 6, 2023

--

Spring Batch ist ein Spring Projekt, das uns die Erstellung von Batchjobs (aka Stapelverarbeitung) erleichtert. Hierfür stehen Funktionen bereit wie fehlertolerante Verarbeitung, die spontan an Hystrix erinnert: Fehlerhafte Verarbeitungen können wiederholt und nach mehrmaliger Wiederholung übersprungen werden. Außerdem können zur Optimierung der Performance Verarbeitungseinheiten zu größeren Chunks gruppiert und auf verschiedene Weisen Parallelität konfiguriert werden.
Zur Überwachung der Verarbeitung stellt das Spring Batch Projekt eine Vielzahl von Listeners (1) zur Verfügung. Mit Ihnen lässt sich recht einfach eine Progress Bar implementieren.

Problematisch wird diese Fortschrittsanzeige jedoch in Verbindung mit den genannten Features: Fehlertoleranz und transaktionale Verarbeitung in Blöcken. Dann kann sie nur noch den Fortschritt basierend auf den Blöcken anzeigen, nicht jedoch auf der Ebene jedes verarbeiteten Elements. Wie man dennoch eine kontinuierliche Fortschrittsanzeige implementiert, die sich für jedes verarbeitete Einzelelement aktualisiert, ist Thema dieses Beitrags.

Theoretisches zu transaktionaler, fehlertoleranter Spring Batch Verarbeitung, den Listeners und Parallelität

Zu Beginn wollen wir uns in ein wenig dröge Theorie stürzen 🥳
Batchverarbeitungen folgen in Spring Batch einem festen Schema, das allerdings mit Tasklets durchbrochen werden kann. Ein Batchjob besteht aus beliebig vielen Steps, die in einer oder mehreren Step-Executions ausgeführt werden. Steps führen eine Transformation eins Items durch, das verarbeitet werden soll. Daher besteht ein Step aus einem Item-Reader, einem Item-Processor und einem Item-Writer.
Der Item-Reader liest alle Items, die verarbeitet werden sollen — entweder auf einmal oder in Blöcken, wenn ein Partitioner verwendet wird, der je Block eine eigene StepExecution eröffnet.
Zur Performanceverbesserung können die gelesenen Items einer Step-Execution wiederum in Blöcken, Chunks, verarbeitet werden. Dann durchläuft eine bestimmte Itemmenge erst den oder die Item-Processors und wird abschließend im Item-Writer geschrieben.
Tritt hierbei ein Fehler in einem der Item-Processors auf, hat das Auswirkungen auf den gesamten Chunk: Denn per default bricht nicht nur die Verarbeitung des Chunks ab, sondern des Steps und damit des gesamten Jobs.
Um dies zu verhindern, können Toleranzen festgelegt werden. Hierzu werden das Retry- und das Skip-Limit verwendet.
Über das Retry-Limit kann konfiguriert werden, dass ein fehlerhaft verarbeitets Item erneut die Verarbeitung der Item-Processors durchlaufen soll. Eine fehlerhafte Verarbeitung führt dann zwar noch zum Abbruch des gesamten Chunks, doch die Step-Execution bleibt offen. Der gesamte Chunk wird dann zurückgerollt und von Vorne erneut durchlaufen, wobei versucht wieder, das zuvor fehlerhafte Item diesmal erfolgreich zu verarbeiten. Chunks werden also als Transaktion verarbeitet.
Sollte das Retry-Limit erreicht sein, kann mit einem Skip-Limit angegeben werden, dass das Item im Chunk übersprungen werden soll — anstatt nun Step und Job fehlerhaft zu beenden. Erst wenn auch dieses Skip-Limit überschritten ist, brechen Step und Job wieder ab.

Spring Batch Listeners

Spring Batch kennt eine Vielzahl von Listeners. Zur Implementierung einer Fortschrittsanzeige sind jedoch nur einige von Relevanz.
Der Step-Listener erkennt Beginn und Ende einer Step-Execution. Bei beiden Events hat er Zugriff auf das Objekt StepExecution. Eine Ebene tiefer registriert der Chunk-Listener Beginn, Ende sowie fehlerbedingte Abbrüche eines Chunks. Die beiden Methoden des Read-Listeners werden zu Beginn und Ende des Leseprozesses eines Items aufgerufen. Im Event afterRead wird das gelesene Item übermittelt. Die Verarbeitung eines gelesenen Items wird durch den Item-Listener überwacht. Er beobachtet den Verarbeitungsbeginn, das erfolgreiche sowie das fehlerhafte Verarbeitungsende eines Items, das in allen drei Fällen als Parameter übermittelt wird. Der Retry-Listener umschließt den Item-Listener, lauscht also auch auf dieselben drei Events einer Itemverarbeitung. Allerdings kennt er nicht direkt das verarbeitete Item, sondern einen RetryContext. Der Skip-Listener differenziert einen Skip in den drei Schritten Read, Write und Process. Im Fall eines Skips beim Schreiben oder Verarbeiten eines Items ist ihm das Item bekannt; beim Lesen jedoch nicht, hier wird nur eine Throwable übermittelt. Zum Schluss erkennt der Write-Listener Beginn, das Ende und einen Fehler des Schreibens. Anders als der Read-Listener bezieht er sich dabei jedoch auf eine Liste von Items, die allen drei Events bekannt ist.

+---------------+-----------------+---------------+
| Listenername | Methoden/Events | Datenobjekt |
+---------------+-----------------+---------------+
| StepListener | beforeStep | StepExecution |
| | afterStep | StepExecution |
|-------------------------------------------------|
| ChunkListener | beforeChunk | ChunkContext |
| | afterChunk | ChunkContext |
| | afterChunkError | ChunkContext |
|-------------------------------------------------|
| ReadListener | beforeRead | - |
| | afterRead | Item |
| | onReadError | Exception |
|-------------------------------------------------|
| ItemListener | beforeProcess | Item |
| | afterProcess | Item |
| | onProcessError | Item |
|-------------------------------------------------|
| RetryListener | open | RetryContext |
| | close | RetryContext |
| | onError | RetryContext |
|-------------------------------------------------|
| SkipListener | onSkipInRead | Throwable |
| | onSkipInWrite | Item |
| | onSkipInProcess | Item |
|-------------------------------------------------|
| WriteListener | beforeWrite | Items[] |
| | afterWrite | Items[] |
| | onWriteError | Items[] |
+---------------+-----------------+---------------+

Wir sehen, dass die unterschiedlichen Listener Zugriff auf ganz unterschiedliche Informationen haben. Hierbei genießt der Step-Listener dank das Objekts StepExecutions (2) den größten Informationszugriff. StepExecutions zählt unter anderem gelesene, geschriebene und übersprungene Objekte, Rollbacks und Commits.
Der ChunkContext (3) des Chunk-Listeners ist hingegen bereits erheblich informationsärmer, einzig der Verweis auf den StepContext verbessert dies. Im StepContext befinden sich nämlich zwei interessante Methoden: getStepExecution ermöglicht den Zugriff auf das Objekt StepExecutions, wodurch Chunk und die Step-Execution miteinander verbunden sind. Die andere Funktion ist GetStepExecutionContext. Sie ermöglicht den Zugriff auf einen begrenzten Cache-Speicher in Form einer Map, auf den unter anderem der Partitioner schreibenden Zugriff hat.
Einzig allein steht der RetryContext (4). Doch überraschenderweise offenbart er über eine geerbte Map und den dortigen Key “context.state” das zum RetryContext gehörige Item. Auf den Chunk oder die StepExecution hat der RetryContext jedoch keinen Zugriff.

Parallelität und TaskExecutor

Die Menge aller Items, die verarbeitet werden soll, kann in Partitions (5) aufgesplittet werden. Spring Batch verarbeitet diese Partitions dann nacheinander oder parallel. Die maximale Parallelität wird erreicht, wenn jeder TaskExecutor-Thread eine Partition bearbeitet, also wenn die Poolgröße des TaskExecutors gleich der Anzahl der Partitions ist. Eine Partition führt zu einer Step-Execution, die in beliebig großen und somit in beliebig vielen Chunks stattfindet. Diese Chunks werden sequentiell verarbeitet. Somit werden sowohl Step-Execution (oder Partition) und Chunk von nur einem TaskExecutor-Thread ausgeführt, da jede Step-Execution von einem Thread übernommen wird und die Chunks darin sequentiell durchlaufen werden.

Der Versuchsaufbau

Ein kleines Experimentierprojekt hilft dabei, zu verstehen, wie die verschiedenen Listeners zusammenspielen und dient als “Testgelände” für die Progess Bar. Im Experimentierprojekt werden 20 Items verarbeitet. Von Ihnen lösen vier eine fehlerhafte Verarbeitung aus, indem sie eine CustomRetryException werfen. Sowohl die Grid-Size des Partitioners, als auch die Poolsize des TaskExecutors und die Chunksize sind zentral konfigurierbar. Das Retry-Limit steht fest auf 3 und das Skip-Limit fest auf 5. Im gesamten Projekt sind mehrere Listeners verbaut, die ihre Ausgaben loggen und zentral speichern.

Das Projekt in Bild …

… und in Code:

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Autowired
ChunkListener chunkListener;
@Autowired
ItemListener itemListener;
@Autowired
ReadListener readListener;
@Autowired
SkipListener skipListener;
@Autowired
StepListener stepListener;
@Autowired
WriteListener writeListener;
@Autowired
RetryListener retryListener;
@Autowired
JobCompletionNotificationListener jobCompletionNotificationListener;
@Autowired
JobParameters jobParameters;

@Bean
public Partitioner partitioner() {
return new CustomItemInputPartitioner();
}
@Bean
@StepScope
public ItemReader<CustomItemInput> reader(@Value("#{stepExecutionContext['From']}") Integer from,
@Value("#{stepExecutionContext['To']}") Integer to,
CustomItemRepository customItemRepository) {
return new CustomItemInputPartitionReader(from,to,customItemRepository);
}
@Bean
public ItemProcessor<CustomItemInput, CustomItemOutput> processor() {
return new CustomItemProcessor();
}
@Bean
public ItemWriter<CustomItemOutput> writer() {
return new CustomItemOutputItemWriter();
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(jobParameters.TASKEXECUTORPOOLSIZE);
executor.setMaxPoolSize(jobParameters.TASKEXECUTORPOOLSIZE);
executor.initialize();
return executor;
}

@Bean
public Job customJob(Step partitionStep) {
return jobBuilderFactory.get("customJob")
.listener(jobCompletionNotificationListener)
.flow(partitionStep)
.end()
.build();
}
@Bean
public Step partitionStep(TaskExecutor taskExecutor) {
return stepBuilderFactory.get("partitionStep")
.partitioner("partitionStep", partitioner())
.step(step1())
.gridSize(jobParameters.GRIDSIZE)
.taskExecutor(taskExecutor)
.build();
}

@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<CustomItemInput, CustomItemOutput> chunk(jobParameters.CHUNKSIZE)
.faultTolerant()
.retryLimit(3)
.retry(CustomRetryException.class)
.listener(retryListener)
.skipLimit(5)
.skip(CustomRetryException.class)
.listener(skipListener)
.reader(reader(null,null,null))
.listener(readListener)
.processor(processor())
.listener(itemListener)
.writer(writer())
.listener(writeListener)
.listener(chunkListener)
.listener(stepListener)
.build();
}
}

Logvergleich: Den Listeners auf die Finger schauen

Das Experimentierprojekt eignet sich auch ideal, um Logausgaben zu vergleichen. Man sieht gut, welche Listener wie angesprochen werden, wenn ein Item fehlerhaft verarbeitet wurde — und wie die gleiche Ausgabe aussieht, wenn es erfolgreich verarbeitet wird.
Eine normale Verarbeitung eines Items innerhalb eines Chunks sieht folgendermaßen aus (teils gekürzt).

+-----+---------------+---------------+
| Nr. | Listenername | Listenerevent |
+-----+---------------+---------------+
| 1 | StepListener | beforeStep |
| 2 | ChunkListener | beforeChunk |
| 3 | ReadListener | afterRead |
| 4 | RetryListener | open |
| 5 | ItemListener | beforeProcess |
| 6 | ItemListener | afterProcess |
| 7 | RetryListener | close |
+-----+---------------+---------------+

Durch die Konfiguration eines Retry-Limits wird neben Step-Execution und Chunk-Execution eine weitere Klammer, der RetryContext, um die eigentliche Verarbeitung gelegt (Nummer 4 und 7).
Eine fehlerhafte Itemverarbeitung unterscheidet sich nun wie folgt (wieder teils gekürzt).


+-----+---------------+----------------+
| Nr. | Listenername | Listenerevent |
+-----+---------------+----------------+
| 1 | StepListener | beforeStep |
| 2 | ChunkListener | beforeChunk |
| 3 | ReadListener | afterRead |
| 4 | RetryListener | open |
| 5 | ItemListener | beforeProcess |
| 6 | ItemListener | onProcessError |
| 7 | RetryListener | onError |
| 8 | RetryListener | onClose |
| 9 | ChunkListener | afterCunkError |
| 10 | ChunkListener | beforeChunk |
+-----+---------------+----------------+

Interessant zu sehen ist, dass der Retry-Listener beide Events empfängt: Natürlich onError, denn die Verarbeitung war fehlerhaft, was man im Item-Listener (onProcessError) bereits erkennt. Anschließend wird der RetryContext (des Items) geschlossen, weshalb wir auch hier onClose lesen, während der Item-Listener kein Event afterProcess empfängt.

Wege zur kontinuierlichen Progress Bar

Das Ziel wiederholt: Wir wollen — trotz fehlertoleranter, transaktionaler Verarbeitung, die im Fehlerfall den gesamten Chunk und dort jedes bereits verarbeitete Item wiederholt — eine kontinuierliche Fortschrittsanzeige sehen, die auf Basis der einzelnen verarbeiteten Items innerhalb eines Chunks basiert. Zusätzliche Würze erhält das Vorhaben durch Parallelität, es werden also gleichzeitige mehrere Chunks durchgeführt.

Das Problem

Ohne transaktionaler Verarbeitung, die zurückrollt und wiederholt, könnte man einfach einen Counter im Item-Listener einrichten: Bei jedem verarbeiteten Element zählt der Counter um eins hoch. Durch die Wiederholungen der Chunks würde unsere Progress Bar aber schnell mehr als 100% Fortschritt anzeigen. Diese Wiederanläufe müssen also beachtet werden. Der Chunk-Listener reagiert auf sie mit den Events afterChunkError und beforeChunk anstelle von afterChunk.

Nun könnte man im Item-Listener gezählte Items bei Auftreten des Events afterChunkError wieder von der Fortschrittsanzeige abziehen — sieht zwar komisch aus, funktioniert aber.
Oder der Item-Listener merkt sich alle bereits verarbeiteten Items und zählt sie nicht doppelt. Hier ist offensichtlich, dass es eine große Liste braucht, womöglich gar eine weitere Datenbank. Lesende und schreibende Zugriffe würden auf sie in derselben Geschwindigkeit wie die Itemverarbeitung stattfinden, je weiter diese fortgeschritten ist, desto länger dauern die lesenden Zugriffe. Dies ließe sich mit asynchronen Events und einer speziell für diesen Zweck optimierten Datenbank noch lösen. Zumal sie sich die verarbeiteten Items ja nur solange merken muss, bis der Chunk erfolgreich verarbeitet wurde (afterChunk).

Das Genick bricht beide Ideen aber die parallele Verarbeitung. Plötzlich muss man wissen, welcher Chunk gerade zurückgerollt wurde und welchem Chunk der Item-Listener gerade zuhört.
Hier erscheint ein grundlegendes Problem der Listeners. Sie haben nämlich nicht die gleichen Informationen. Dem Item-Listener fehlt die Informationen zu Chunk und Step-Execution — er kennt nur das einzelne Item. Chunk-Listener und Step-Listener wiederum wissen nicht, welche Items sie im Chunk oder der Step-Execution verarbeiten. Zwar kennt die Step-Execution die Gesamtzahl der gelesenen (und später geschriebenen) Items, jedoch nicht deren Identität. Diese ist aber notwendig, um sie mit den Daten des Item-Listeners und des Retry-Listeners abzugleichen.
Die Items einer Step-Execution (nicht Chunk) durch den Partitioner in den StepExecutionContext schreiben zu lassen ist aufgrund der begrenzten Speichermenge auch nicht möglich. Bestenfalls kann hier eine ID eingetragen werden, die auf eine Itemliste verweist, die der Partitioner parallel geschrieben hat. Diese Liste befindet sich vermutlich auch in einem externen Speicher.

Lösungen mit Datenhaltung

Somit ergeben sich bisher zwei Lösungsmöglichkeiten, die beide eine Datenbank voraussetzen.

  1. In der ersten Möglichkeit werden alle Identitäten der gelesenen Items gespeichert und durch den Item-Listener „abgehakt“. So können sie nicht doppelt gezählt werden; Parallelität spielt keine Rolle.
  2. Die zweite Möglichkeit sieht vor, dass der Partitioner im Zusammenspiel mit dem Item-Reader die gleiche Liste persistiert. Jedoch erhält jeder Listenabschnitt (der eine Partition bildet) eine eindeutige ID, die der Step-Execution im StepExecutionContext bekannt gemacht wird. So weiß man, welche Items in welcher Step-Execution verarbeitet werden. Die Items werden im Item-Listener gezählt, beim Zurückrollen eines Chunks kann ermittelt werden, welche Items davon betroffen werden, die im Folgenden nicht erneut gezählt würden.
    Die zweite Möglichkeit modifiziert die Erste, sodass schreibende Datenbankzugriffe reduziert werden. Beide Möglichkeiten benötigen jedoch eine Auflistung der gelesenen Elemente.

Lösung ohne Datenhaltung (mit erheblich weniger)

Um diese Listenbildung zu umgehen, kann man sich das TaskExecutor zum Komplizen machen. Ein TaskExecutor bearbeitet gleichzeitig nur eine Step-Execution und darin sequentiell die Chunks. Daher ist eine Verbindung im Item-Listener zum Chunk beziehungsweise der Step-Execution über den TaskExecutor, den Thread.currentThread(), möglich.
Das Wissen um den TaskExecutor kann einerseits verwendet werden, um die zweite Möglichkeit oben von Item-Reader und Partitioner zu befreien. Dies würde jedoch die Persistenz von Itemlisten nicht verhindern.

Stattdessen bauen wir ein schmales Datenmodell auf: Zu jedem TaskExecutor speichern wir (für den jeweils aktuellen Chunk), wieviele Items im aktuellen Chunkdurchlauf erfolgreich verarbeitet wurden. Zusätzlich wird dieselbe Zahl nochmal gespeichert, allerdings für den letzten fehlerhaften Chunkdurchlauf. Also wieviele Items wurden im Chunk erfolgreich verarbeitet, bis er wegen einer fehlerhaften Itemverarbeitung fehlerhaft beendet (afterChunkError) und wiederholt wurde. Erst wenn im aktuellen Chunkdurchlauf die Anzahl der erfolgreich verarbeiteten Items größer als die Anzahl dieser aus dem vorherigen Lauf ist, wird der Zähler der Progress Bar angesprochen. Diese Lösung benötigt also einen Item-Listener, Skip-Listener und einen Chunk-Listener.

Die folgende Tabelle verdeutlicht, welche Auswirkungen die Events der Listener auf die Datenhaltung haben. Der Bezeichner des zweiten Attributs, lastAmountProcessedItemsBeforeLastError, ist womöglich zu lang, aber irgendwie auch sprechend.
Die Tabelle zeigt zwei Chunkdurchläufe mit drei Items, wobei im ersten Durchlauf das dritte Item fehlerhaft verarbeitet wurde.

+---------------------------+----+----------------------+---------------------+-------------+
| Aktion/Event | ID | amountProcessedItems | lastAmountProcessed | ProgressBar |
| | | | ItemsBeforeLastError| |
+---------------------------+----+----------------------+---------------------+-------------+
| Neuer Chunk | 1 | 0 | 0 | |
| Erfolgreiches Item | 2 | 1 | 0 | +1 |
| Erfolgreiches Item | 3 | 2 | 0 | +1 |
| Fehlerhaftes Item | 4 | 0 | 2 | |
| Erfolgreiches Item | 5 | 1 | 2 | |
| Erfolgreiches Item | 6 | 2 | 2 | |
| Erfolgreiches Item | 7 | 3 | 2 | +1 |
| Chunk erfolgreich beendet | 8 | 3 | 2 | |
+---------------------------+----+----------------------+---------------------+-------------+

Das Ganze im Code

Die beispielhafte Implementierung besteht aus folgenden Klassen.

ProgressMonitoring bietet den Listenern Methoden an, die aufgerufen werden, wenn eine neue Chunkverarbeitung beginnt, eine Chunkverarbeitung beendet, ein Item erfolgreich oder fehlerhaft verarbeitet und wenn ein Item nach mehrfachen fehlerhaften Verarbeitungen übersprungen wurde.
Je TaskExecutor-Thread hält ProgressLedgerManagement ein Objekt der Klasse ProgressLedger. Jedes ProgressLedger besteht aus einer Liste von ProgressLedgerEntry-Objekten. Ein solches hat drei Attribute: eine fortlaufende Nummer, die Anzahl der verarbeiteten Items (erfolgreich wie auch übersprungen) und die Anzahl der verarbeiteten Items, bevor der letzte Fehler innerhalb der Chunkverarbeitung passiert ist.
Sofern ein neuer Eintrag hinzukommt und die Anzahl der bearbeiteten Items höher ist als die Anzahl der bereits bearbeiteten Items des letzten Chunkdurchlaufs, bevor dort der Verarbeitungsfehler aufgetreten ist, wird diese neue Itemverarbeitung an die Progress Bar weitergegeben, die einen halben Tannenbaum zeichnet.

[ taskExecutor-1] d.d.s.monitor.ProgressBar                : ** (Items: 1/20)
[ taskExecutor-2] d.d.s.monitor.ProgressBar : ***** (Items: 2/20)
[ taskExecutor-1] d.d.s.monitor.ProgressBar : ******* (Items: 3/20)
[ taskExecutor-2] d.d.s.monitor.ProgressBar : ********** (Items: 4/20)
[ taskExecutor-1] d.d.s.monitor.ProgressBar : ************ (Items: 5/20)
[ taskExecutor-2] d.d.s.monitor.ProgressBar : *************** (Items: 6/20)
[ taskExecutor-1] d.d.s.monitor.ProgressBar : ***************** (Items: 7/20)
[ taskExecutor-1] d.d.s.monitor.ProgressBar : ******************** (Items: 8/20)
[ taskExecutor-1] d.d.s.monitor.ProgressBar : ********************** (Items: 9/20)
[ taskExecutor-1] d.d.s.monitor.ProgressBar : ************************* (Items: 10/20)
[ taskExecutor-1] d.d.s.monitor.ProgressBar : *************************** (Items: 11/20)
[ taskExecutor-1] d.d.s.monitor.ProgressBar : ****************************** (Items: 12/20)
[ taskExecutor-1] d.d.s.monitor.ProgressBar : ******************************** (Items: 13/20)
[ taskExecutor-2] d.d.s.monitor.ProgressBar : *********************************** (Items: 14/20)
[ taskExecutor-2] d.d.s.monitor.ProgressBar : ************************************* (Items: 15/20)
[ taskExecutor-2] d.d.s.monitor.ProgressBar : **************************************** (Items: 16/20)
[ taskExecutor-2] d.d.s.monitor.ProgressBar : ****************************************** (Items: 17/20)
[ taskExecutor-2] d.d.s.monitor.ProgressBar : ********************************************* (Items: 18/20)
[ taskExecutor-2] d.d.s.monitor.ProgressBar : *********************************************** (Items: 19/20)
[ taskExecutor-2] d.d.s.monitor.ProgressBar : ************************************************** (Items: 20/20)
@Data
@NoArgsConstructor
public class ProgressLedgerEntry {
private int id;
private int amountProcessedItems = 0;
private int lastAmountProcessedItemsBeforeLastError = 0;
}
public class ProgressLedger {
private List<ProgressLedgerEntry> entries = new ArrayList<>();
private int entryId;

@Synchronized
private int getNextEntryId() {
entryId = entryId + 1;
return entryId;
}
private void addListEntry(ProgressLedgerEntry progressLedgerEntry) {
progressLedgerEntry.setId(getNextEntryId());
entries.add(progressLedgerEntry);
updateProgressBar(progressLedgerEntry);
}
private void updateProgressBar(ProgressLedgerEntry progressLedgerEntry) {
if(progressLedgerEntry.getAmountProcessedItems() > progressLedgerEntry.getLastAmountProcessedItemsBeforeLastError()) {
ProgressBar.increase();
}
}
public void addEntry(ProgressLedgerEntry entry) {
continueLastEntry(entry, getLastEntry());
addListEntry(entry);
}
public void addEntryNewCounter(ProgressLedgerEntry entry) {
continueLastEntry(entry, getLastEntry());
entry.setAmountProcessedItems(0);
addListEntry(entry);
}
public void addEntryObliqueStroke(ProgressLedgerEntry entry) {
switchFromLastEntry(entry, getLastEntry());
addListEntry(entry);
}
private void continueLastEntry(ProgressLedgerEntry newEntry, ProgressLedgerEntry lastEntry) {
if(newEntry.getLastAmountProcessedItemsBeforeLastError() == 0) {
newEntry.setLastAmountProcessedItemsBeforeLastError(lastEntry.getLastAmountProcessedItemsBeforeLastError());
}
if(newEntry.getAmountProcessedItems() == 0) {
newEntry.setAmountProcessedItems(lastEntry.getAmountProcessedItems() + 1);
}
}
private void switchFromLastEntry (ProgressLedgerEntry newEntry, ProgressLedgerEntry lastEntry) {
newEntry.setLastAmountProcessedItemsBeforeLastError(lastEntry.getAmountProcessedItems());
}
private ProgressLedgerEntry getLastEntry() {
return entries.stream()
.max(Comparator.comparing(row -> row.getId()))
.orElseGet(() -> new ProgressLedgerEntry());
}
}
@Component
public class ProgressLedgerManagement {
private Map<String, ProgressLedger> ledgersMap = new HashMap<>();

public ProgressLedger getProgressLedger(String threadName) {
if(!ledgersMap.containsKey(threadName)) {
ledgersMap.put(threadName, new ProgressLedger());
}
return ledgersMap.get(threadName);
}
public void removeProgressLedger(String threadName) {
if(ledgersMap.containsKey(threadName)) {
ledgersMap.remove(threadName);
}
}
}
@Component
public class ProgressMonitoring {
@Autowired
ProgressLedgerManagement progressLedgerManagement;

public void newChunkExecution() {
String taskExecutorThread = Thread.currentThread().getName();
ProgressLedger progressLedger = progressLedgerManagement.getProgressLedger(taskExecutorThread);
progressLedger.addEntryNewCounter(new ProgressLedgerEntry());
}
public void finishedChunk() {
String taskExecutorThread = Thread.currentThread().getName();
progressLedgerManagement.removeProgressLedger(taskExecutorThread);
}
public void itemProcessed() {
String taskExecutorThread = Thread.currentThread().getName();
ProgressLedger progressLedger = progressLedgerManagement.getProgressLedger(taskExecutorThread);
progressLedger.addEntry(new ProgressLedgerEntry());
}
public void itemError() {
String taskExecutorThread = Thread.currentThread().getName();
ProgressLedger progressLedger = progressLedgerManagement.getProgressLedger(taskExecutorThread);
progressLedger.addEntryObliqueStroke(new ProgressLedgerEntry());
}
}

ProgressMonitoring wird von den drei Listeners verwendet:

@Component
public class ChunkListener implements org.springframework.batch.core.ChunkListener {
@Autowired
ProgressMonitoring progressMonitoring;
@Override
public void beforeChunk(ChunkContext chunkContext) {
progressMonitoring.newChunkExecution();
}
@Override
public void afterChunk(ChunkContext chunkContext) {
progressMonitoring.finishedChunk();
}
}

@Component
public class ItemListener implements ItemProcessListener<CustomItemInput, CustomItemOutput> {
@Autowired
ProgressMonitoring progressMonitoring;

@Override
public void afterProcess(CustomItemInput customItemInput, CustomItemOutput customItemOutput) {
progressMonitoring.itemProcessed();
}
@Override
public void onProcessError(CustomItemInput customItemInput, Exception e) {
progressMonitoring.itemError();
}
}

@Component
public class SkipListener implements org.springframework.batch.core.SkipListener<CustomItemInput, CustomItemOutput> {
@Autowired
ProgressMonitoring progressMonitoring;

@Override
public void onSkipInProcess(CustomItemInput customItemInput, Throwable throwable) {
progressMonitoring.itemProcessed();
}
}

Der dargestellte Weg ist eine ressourcenschonende Möglichkeit, eine kontinuierliche Fortschrittsanzeige innerhalb einer blockorientierten Spring Batch Verarbeitung zu verwirklichen. Weitere Möglichkeiten würden sicherlich offensichtlicher, wenn die unterschiedlichen Listeners Zugriff auf die „Session“, den Verarbeitungsrahmen (wie Chunk oder StepExecution) hätten, zu welchem sie ein Event empfangen haben.
Ein Hinweis zum Schluss: Die Progess Bar ist nur für einen Step ausgelegt. In Spring Batch ist aber auch möglich, Steps parallel auszuführen. In diesem Fall sollte man die Progress Bars und Listeners weitesgehend isolieren.

--

--

Jump Boat
Jump Boat

Written by Jump Boat

Ich schreibe hier über Software Development und alles, was damit zu tun hat.

No responses yet