Example: Writing a Custom Aggregation
For this example, we will define an "average" aggregation. This is a simplified version of the standard "average" aggregation that is built into the
Aggregation class. Full source code for this example may be found in the training materials.
First, we must define the
Aggregator:
Define the Aggregator
import com.pervasive.dataflow.operators.group.Aggregator;
import com.pervasive.dataflow.operators.group.ReadableStorage;
import com.pervasive.dataflow.operators.group.WriteableStorage;
import com.pervasive.dataflow.tokens.scalar.DoubleSettable;
import com.pervasive.dataflow.tokens.scalar.DoubleValued;
import com.pervasive.dataflow.tokens.scalar.ScalarSettable;
import com.pervasive.dataflow.tokens.scalar.ScalarValued;
import com.pervasive.dataflow.types.ScalarTokenType;
import com.pervasive.dataflow.types.TokenTypeConstant;
/*package*/ final class AverageDouble implements Aggregator
{
//keeps a handle to the current input value; the reference stays constant but the value reflects
//that of the current row
private DoubleValued input;
//count of non-null values
private long count;
//sum of non-null values
private double sum;
//metadata: declare output type
@Override
public ScalarTokenType getOutputType() {
return TokenTypeConstant.DOUBLE;
}
//metadata: declare internal types: need one array element for each field of my internal state
@Override
public final ScalarTokenType[] getInternalTypes() {
return new ScalarTokenType[] {
//corresponds to the "count" field; count is referenced at index 0 ( in combineInternals and storeInternals )
TokenTypeConstant.LONG,
//corresponds to the "sum" field; sum is referenced at index 1 ( in combineInternals and storeInternals )
TokenTypeConstant.DOUBLE
};
}
//runtime: called once to initialize
public final void setInputs(ScalarValued[] inputs) {
if ( inputs.length != 1 ) {
throw new IllegalArgumentException(getClass().getSimpleName()+" expects a single input.");
}
//bind to my input value
this.input= (DoubleValued)inputs[0];
}
//runtime: called once for each row of input
@Override
public final void accumulate() {
//accumulate internal state from input
if (!input.isNull()) {
this.count++;
this.sum += input.asDouble();
}
}
//runtime: called when switching groups or when combining partials from same group
@Override
public final void combineInternals(ReadableStorage internals) {
//load and accumulate each field
//load "count" from index 0 ( must match the ordering declared by getInternalTypes )
this.count+= internals.getLong(0);
//load "sum" from index 1 ( must match the ordering declared by getInternalTypes )
this.sum+= internals.getDouble(1);
}
//runtime: called when switching groups or when outputing partial results
@Override
public final void storeInternals(WriteableStorage internals) {
//store each field
//store "count" in index 0 ( must match the ordering declared by getInternalTypes )
internals.setLong(0,count);
//store "sum" in index 1 ( must match the ordering declared by getInternalTypes )
internals.setDouble(1,sum);
}
@Override
public void updateInternals(ReadableStorage readView, WriteableStorage writeView) {
//this implementation should always be used as long as all internals are primitive types
storeInternals(writeView);
}
//runtime: called to "zero-out" internal state at initialization or to recycle a given
//aggregator on a different group
@Override
public final void reset() {
//reset my internal state
this.count= 0;
this.sum= 0;
}
//runtime: called once per final group to perform "reduction" step
@Override
public final void storeFinalResult(ScalarSettable output) {
//store my internal state in the output
if ( count == 0 ) {
//special case for all null--output is null
output.setNull();
}
else {
//otherwise compute the average
double avg = sum / count;
((DoubleSettable)output).set(avg);
}
}
}
Next, we must define the
AggregatorFactory:
Define the AggregatorFactory
public final class Average implements AggregatorFactory {
//required: public constructor required for JSON serialization
public Average() {
}
@Override
public Aggregator newAggregator(List<String> inputNames, ScalarTokenType[] inputs) {
//validate number of arguments is 1
if ( inputs.length != 1) {
throw new InvalidArgumentCountException(getKey(), 1, inputs.length);
}
//construct an appropriate aggregator for the given input type
// in this case, we have one aggregator that supports double, long, float, or int
// but in general, you may require type-specific implementations
ScalarTokenType inputType= inputs[0];
if (TokenTypeConstant.DOUBLE.isAssignableFrom(inputType)) {
//supports double or any types assignable to double
return new AverageDouble();
}
else {
throw new DRException("average is unsupported for token type " + inputType + " for field " + inputNames.get(0));
}
}
@Override
public String getKey() {
return "avg";
}
}
The custom aggregation may now be used in a
Group operator as follows:
Use the custom Aggregation
...
Group group= graph.add(new Group(Arrays.asList("key"),
Arrays.asList(new Aggregation(new Average(), "value"))));
...
Adding Expression Language Support
You must perform a few additional steps to add support for a newly created aggregation to the expression language. For more information, see
Expression Language.
A class must be created containing methods which return your aggregation. Each of these methods must be marked with the
@Function annotation. You can also specify a custom
ArgumentConverter for each argument using the
@FunctionArgument annotation. Once your aggregations have been defined, you can create a subclass of
SimpleAggregationProvider to register these method classes. Finally, add a file to META-INF/services called
com.pervasive.dataflow.aggregations.AggregationProvider containing the fully qualified name of your
SimpleAggregationProvider subclass.
An example is provided here:
Aggregation expression language support
package com.custom.function.aggregation
public final class CustomAggregations {
@Function(description="Returns the Average")
public static Aggregation average(
@FunctionArgument(name="input", description="The input to average.") ScalarValuedFunction input)
{
return new Aggregation(new Average(), input);
}
}
In order to register the new aggregation a provider class must be created and registered under the services file:
Aggregation provider registration
public class CustomAggregationProvider extends SimpleAggregationProvider {
public CustomAggregationProvider() {
register(com.custom.function.aggregation.CustomAggregation.class);
}
}