Building DataFlow Applications : Customizing and Extending DataFlow Functionality : Writing an Aggregation : Example: Writing a Custom Aggregation
 
Share this page                  
Example: Writing a Custom Aggregation
For this example, we will define an "average" aggregation. This is a simplified version of the standard "average" aggregation that is built into the Aggregation class. Full source code for this example may be found in the training materials.
First, we must define the Aggregator:
Define the Aggregator
import com.pervasive.dataflow.operators.group.Aggregator;
import com.pervasive.dataflow.operators.group.ReadableStorage;
import com.pervasive.dataflow.operators.group.WriteableStorage;
import com.pervasive.dataflow.tokens.scalar.DoubleSettable;
import com.pervasive.dataflow.tokens.scalar.DoubleValued;
import com.pervasive.dataflow.tokens.scalar.ScalarSettable;
import com.pervasive.dataflow.tokens.scalar.ScalarValued;
import com.pervasive.dataflow.types.ScalarTokenType;
import com.pervasive.dataflow.types.TokenTypeConstant;

/*package*/ final class AverageDouble implements Aggregator
{
    //keeps a handle to the current input value; the reference stays constant but the value reflects
    //that of the current row
    private DoubleValued input;   
    //count of non-null values
    private long count;
    //sum of non-null values
    private double sum;
    
    
    //metadata: declare output type
    @Override
    public ScalarTokenType getOutputType() {
        return TokenTypeConstant.DOUBLE;
    }

    //metadata: declare internal types: need one array element for each field of my internal state
    @Override
    public final ScalarTokenType[] getInternalTypes() {
        return new ScalarTokenType[] {
            //corresponds to the "count" field; count is referenced at index 0 ( in combineInternals and storeInternals ) 
            TokenTypeConstant.LONG,
            //corresponds to the "sum" field; sum is referenced at index 1 ( in combineInternals and storeInternals )
            TokenTypeConstant.DOUBLE
        };
    }
    
    //runtime: called once to initialize
    public final void setInputs(ScalarValued[] inputs) {
        if ( inputs.length != 1 ) {
            throw new IllegalArgumentException(getClass().getSimpleName()+" expects a single input.");
        }
        //bind to my input value
        this.input= (DoubleValued)inputs[0];
    }
    
    
    //runtime: called once for each row of input
    @Override
    public final void accumulate() {
        //accumulate internal state from input
        if (!input.isNull()) {
            this.count++;
            this.sum += input.asDouble();            
        }
    }
    
    //runtime: called when switching groups or when combining partials from same group
    @Override
    public final void combineInternals(ReadableStorage internals) {
        //load and accumulate each field
        
        //load "count" from index 0 ( must match the ordering declared by getInternalTypes )
        this.count+= internals.getLong(0);
        //load "sum" from index 1 ( must match the ordering declared by getInternalTypes )
        this.sum+= internals.getDouble(1);
    }
    
    //runtime: called when switching groups or when outputing partial results
    @Override
    public final void storeInternals(WriteableStorage internals) {
        //store each field
        
        //store "count" in index 0 ( must match the ordering declared by getInternalTypes )
        internals.setLong(0,count);
        //store "sum" in index 1 ( must match the ordering declared by getInternalTypes )
        internals.setDouble(1,sum);        
    }

    @Override
    public void updateInternals(ReadableStorage readView, WriteableStorage writeView) {
        //this implementation should always be used as long as all internals are primitive types
        storeInternals(writeView);
    }
    
    //runtime: called to "zero-out" internal state at initialization or to recycle a given
    //aggregator on a different group
    @Override
    public final void reset() {
        //reset my internal state
        this.count= 0;
        this.sum= 0;
    }
        
    //runtime: called once per final group to perform "reduction" step
    @Override
    public final void storeFinalResult(ScalarSettable output) {
        //store my internal state in the output
        if ( count == 0 ) {
            //special case for all null--output is null
            output.setNull();
        }
        else {
            //otherwise compute the average
            double avg = sum / count;
            ((DoubleSettable)output).set(avg);            
        }        
    }
}
Next, we must define the AggregatorFactory:
Define the AggregatorFactory
public final class Average implements AggregatorFactory {

    //required: public constructor required for JSON serialization
    public Average() {
    }
    
    @Override
    public Aggregator newAggregator(List<String> inputNames, ScalarTokenType[] inputs) {
        //validate number of arguments is 1
        if ( inputs.length != 1) {
            throw new InvalidArgumentCountException(getKey(), 1, inputs.length);
        }
        
        //construct an appropriate aggregator for the given input type        
        //  in this case, we have one aggregator that supports double, long, float, or int
        //  but in general, you may require type-specific implementations
        ScalarTokenType inputType= inputs[0];
        if (TokenTypeConstant.DOUBLE.isAssignableFrom(inputType)) {
            //supports double or any types assignable to double
            return new AverageDouble();
        }
        else {
            throw new DRException("average is unsupported for token type " + inputType + " for field " + inputNames.get(0));
        }
    }
    @Override
    public String getKey() {
        return "avg";
    }

}
The custom aggregation may now be used in a Group operator as follows:
Use the custom Aggregation
...
    Group group= graph.add(new Group(Arrays.asList("key"), 
            Arrays.asList(new Aggregation(new Average(), "value"))));
...
Adding Expression Language Support
You must perform a few additional steps to add support for a newly created aggregation to the expression language. For more information, see Expression Language.
A class must be created containing methods which return your aggregation. Each of these methods must be marked with the @Function annotation. You can also specify a custom ArgumentConverter for each argument using the @FunctionArgument annotation. Once your aggregations have been defined, you can create a subclass of SimpleAggregationProvider to register these method classes. Finally, add a file to META-INF/services called com.pervasive.dataflow.aggregations.AggregationProvider containing the fully qualified name of your SimpleAggregationProvider subclass.
An example is provided here:
Aggregation expression language support
package com.custom.function.aggregation

public final class CustomAggregations {

   @Function(description="Returns the Average")
   public static Aggregation average(
              @FunctionArgument(name="input", description="The input to average.") ScalarValuedFunction input)
   {
      return new Aggregation(new Average(), input);
   }

}
In order to register the new aggregation a provider class must be created and registered under the services file:
Aggregation provider registration
public class CustomAggregationProvider extends SimpleAggregationProvider {

   public CustomAggregationProvider() {
      register(com.custom.function.aggregation.CustomAggregation.class);
   }

}