Writing a batch is mainly about writing a main(String…​) method which uses Yupiik Batch compoennts.

Create a batch

Implementing a batch is about extending Batch interface:

public class MyBatch implements Batch<MyConf> {
    @Override
    public void accept(final MyConf configuration) {
        // ...
    }
}

What is important to note is that a batch has a configuration and the configuration is "injected" in accept method. This enables the framework to map the main arguments to the configuration in an unified fashion.

Once you have a batch implementation, you can run it using Batch launcher method:

public static void main(final String... args) {
    Batch.run(MyBatch.class, args);
}

Define your batch configuration

A batch configuration is a class with field(s) decorated with @Param:

public class DataSourceConfiguration {
    @Param(description = "Driver to use", required = true)
    private String driver;

    @Param(description = "JDBC URL to use", required = true)
    private String url;

    @Param(description = "Database username.")
    private String username;

    @Param(description = "Database password.")
    private String password;
}
Note
fields are injected so they shouldn’t be final but there is no constraint on having getters/setters.
Tip
io.yupiik.batch.runtime.documentation.ConfigurationParameterCollector class enables to extract all parameters for a batch and then map this extraction to a documentation if needed.
Tip
simple-configuration package enables to use this configuration without all the batch stack.
Tip
passing yupiik.binder.unset enables to make the configuration library to behave as if no configuration was passed to the value.

Reusable batch components

Reusable components are in io.yupiik.batch.runtime.component package. This section highlights a few of them.

io.yupiik.batch.runtime.component.AcceptedLossDiffFilter

Enables to filter a Diff. If the acceptedLoss is not reached, i.e. more row would be deleted than this percentage, the chain will end there.

Goal is to not delete a database if incoming data are corrupted or not properly up to date.

io.yupiik.batch.runtime.component.DatasetDiffComputer

Component which takes as input two iterators representing sorted datasets.

Both dataset will be compared - in streaming mode - using the Comparator passed in the constructor/factory method. It enables to detect deletions and additions and it will be reflected in the resulting Diff instance.

If equals, the BiPredicate will be used to check it is actually equal or not. If not the data will be considered updated, otherwise not changed and ignored from the diff.

io.yupiik.batch.runtime.component.DiffExecutor

Enables to apply a Diff - from a DatasetDiffComputer.

It will apply it in a database represented by the connectionSupplier with the provided commitInterval. The statements are creating using the related factories - insertFactory, updateFactory, deleteFactory.

Finally, dryRun toggle enables to simulate the processing without issuing any modification in the database.

io.yupiik.batch.runtime.component.Mapper

This mapping component enables to convert an input to an output instance by providing an specification instance.

It is a class decorated with @Mapping:

@Mapping(
        from = IncomingModel.class,
        to = OutputModel.class,
        documentation = "Converts an input to an output.",
        properties = {
                @Property(type = CONSTANT, to = "outputFieldUrl", value = "https://foo.bar/"),
                @Property(type = TABLE_MAPPING, from = "inputKeyField", to = "mappedOutput", value = "myLookupTable", onMissedTableLookup = FORWARD)
        },
        tables = {
                @Mapping.MappingTable(
                        name = "myLookupTable",
                        entries = {
                                @Entry(input = "A", output = "1"),
                                @Entry(input = "C", output = "3")
                        }
                )
        })
public class MyMapperSpec {
    @Mapping.Custom(description = "This will map X to Y.")
    String outputField(final IncomingModel in[, @Table("myLookupTable") final Map<String, String> myLookupTable) {
        return ...;
    }
}

To get a mapper, you simply call Mapper.mapper(MyMapperSpec.class) and then can insert this mapper in any BatchChain.

The specification API enables static mapping (properties) or custom mapping - @Mapping.Custom - for more advanced logic.

The companion class io.yupiik.batch.runtime.documentation.MapperDocGenerator enables to generate an asciidoctor documentation for a mapper class.

io.yupiik.batch.runtime.component.SQLQuery

Enables to extract data from a SQL query.

A custom mapper will be called for each ResultSet line to convert current row in an object passed to the rest of the BatchChain.

io.yupiik.batch.runtime.component.uship.DatabaseDiffExecutor

Enables to apply a Diff - from a DatasetDiffComputer when the entities (diff model) are Yupiik UShip Persistence models.

It will apply it in a database with the provided commitInterval.

dryRun toggle enables to simulate the processing without issuing any modification in the database.

Combine components

Combining components can be done in a plain main:

final var referenceData = new ReferenceDataSQLQuery(databaseConnection);
final var newInputs = new MyNewInputs();
final var comparingProcessor = new ReferencialRowDatasetDiffComputer();
final var diff = comparingProcessor.apply(referenceData, newInputs);
if (new AcceptedLossDiffFilter(0.10).test(diff)) {
    new ReferenceDataDiffExecutor(databaseConnection, 25).accept(diff);
}

However, for too noisy cases, it can be neat to use a fluent API on the diff to make it more readable and composable. Indeed you can use the Stream or Optional API:

// init
final var diff = new ReferencialRowDatasetDiffComputer()
        .apply(new ReferenceDataSQLQuery(databaseConnection), new MyNewInputs());

// start the flow
Stream.of(diff)
    .filter(new AcceptedLossDiffFilter(0.10))
    .forEach(new ReferenceDataDiffExecutor(databaseConnection, 25));

// or
Optional.of(diff)
    .filter(new AcceptedLossDiffFilter(0.10))
    .ifPresent(new ReferenceDataDiffExecutor(databaseConnection, 25));

This enables to read more explicitly the flow of processing thanks Stream or Optional fluent API. It is also now easier to insert an element or decorate components explicitly.

However, these two API are not designed for that and will quickly hit some limitation. To make it more batch oriented, parent Batch class enables to define a stream like flow but more batch oriented. You have to start your flow by from() or use a specific source such as DatasetDiffComputer. For example:

@Override
public void accept(final MyConfiguration configuration) {
    final var connectionProvider = configuration.datasource.toConnectionProvider();
    referencialRowDatasetDiff()
            .withCustomData(myInput())
            .withReferencialData(referenceData(datasource, configuration.table))
        .diff()
        .filter(withAcceptedLoss(configuration.acceptedLoss))
        .then(applyDiff(connectionProvider, configuration.commitInterval, configuration.dryRun, configuration.table))
        .run(runConfiguration(datasource, getClass().getSimpleName(), systemUTC()));
}

This DSL is more friendly to the batches we write (integrating with default components).

Important
until you hit run() nothing is done.
Tip
some components have a static factory to make it more expressive, don’t hesitate to use it.

Finally, the RunConfiguration enables to intercept any step of the BatchChain defined by the previous DSL. Combined with ExecutionTracer, it will let you store any execution and its steps in a database for audit or monitoring/administration purposes.

Async result

It can be neat to pass a step and its result to next step without it being finished.

It is often the case for reactive bacthes (one "thread" starts to poll data, next step processes it etc.. but you want to keep the polling and processing split in terms of "step" and tracing).

To do that, you can return a BatchPromise which is just a holder of a value (reactive in our example) and a CompletionStage which notifies the batch runtime and tracer when the step is done:

from()
  .map("step1", new CommentifiableFunction<Void, BatchPromise<String>>() {
    @Override
    public BatchPromise<String> apply(final Void o) {
        final var reactiveComponent = runStep1(); // an Observable with RxJava for example
        return BatchPromise.of(reactiveComponent::onItem, reactiveComponent.toCompletionStage());
    }
  })
  .map("step2", new CommentifiableConsumer<BatchPromise<Void>>() {
    @Override
    public BatchPromise<Void> apply(final BatchPromise<Observable> in) {
        final var promise = new CompletableFuture<Void>();
        in.subscribe(
                this::doStep2ItemProcessing,
                error -> {
                    // log etc...
                    promise.completeExceptionally(error);
                },
                () -> promise.complete(null));
        return BatchPromise.of(null, promise);
    }
  })
  .run(tracingConfig);

So overall the step1 starts to read some data and emiiting it when step2 starts and subscribes to it.

When step1 is done it notifies the batch runtime it is ok and the tracer (or runtime) will stop the thread 1 monitoring.

Step2 being asynchronous too (due to its reactive nature, it also emits a BatchPromise leading to the same kind of behavior).

Tip
thanks to this trick, you can run a concurrent job with a flat chain since you can pass in the BatchRuntime a complex value which would represent each branch of your concurrent batch.

Reusable Iterators

Reusable iterators are either provided through FluentIterator or extensions (in this case you must add a dependency to get it).

io.yupiik.batch.iterator.excel.component.ExcelIterator

Reads an excel file sheet row by row.

Factories

<A> ExcelIterator<A> ExcelIterator.of(Path,int,RowMapper<A>)

Creates a new ExcelIterator with a custom row mapper.

ExcelIterator<List<String>> ExcelIterator.ofLines(Path,int)

Creates a new ExcelIterator with a default row mapper mapping lines as List<String>.

Dependency

<dependency>
  <groupId>io.yupiik.batch</groupId>
  <artifactId>excel-iterator</artifactId>
  <version>${yupiik-batch.version}</version>
</dependency>

TIP

  • Use FluentIterator to filter and map your input Iterator, it makes the code more readable and adds some features Stream does not have like a more advanced distinct implementation.

Here is a sample input iterator written using FluentIterator and filtering its data with some business rule on the raw input and post processing the mapped data with PspRolePasswordInjector.

final var iterator = FluentIterator.of(myIterator()) (1)
        .filter(new MyBusinessFilter()) (2)
        .map(new MyBusinessMapper()) (3)
        .unwrap(); (4)
  1. Create a FluentIterator from myIterator() result,

  2. Filter the iterator with a Predicate,

  3. Map the iterator data to another model with a `Mapper>,

  4. Remove the enclosing FluentIterator wrapper which is not needed once the full iterator chain is defined (optional).