Customizing and Extending DataFlow Functionality : Writing an Operator : Writing an IterativeOperator
 
Share this page                  
Writing an IterativeOperator
Overview
IterativeOperators are mainly used for analytics operations that must make multiple passes over the input data. The framework provides services for staging (writing to temporary storage) the input data such that it can be iterated multiple times.
An IterativeOperator executes by dynamically executing a series of subgraphs that can interact with the top-level graph. The top-level graph is the LogicalGraph to which the IterativeOperator was added to by calling LogicalGraph.add(). A subgraph is a separate LogicalGraph that the IterativeOperator composes dynamically at run time. Subgraphs have access to the input ports of the top-level graph.
An IterativeOperator consists of two components: a composition component and a run time component.
The composition component, computeMetadata(), performs validation, declares requirements for how inputs are to be staged, and declares output guarantees for how outputs will be produced at the end of execution. Essentially it allows the IterativeOperator to validate its contract with upstream nodes in the top-level graph and define its contract to downstream nodes in the top-level graph.
The run time component, the CompositionIterator, is responsible for coordinating run time execution. It has an execute() method that is provided in IterativeExecutionContext. The IterativeExecutionContext is primarily a factory for creating subgraphs which the CompositionIterator can then compose and execute. The subgraphs will see input data staged according to the requirements that were stated in computeMetadata().
Finally the CompositionIterator must define a finalComposition() method where the iterator must connect all of its outputs. The sources for the outputs must produce data whose metadata is consistent with that declared in computeMetadata().
An IterativeOperator is similar to a CompositeOperator in that it delegates to other operators to perform the actual "work" of processing data. The key difference, though, is that it has a run time component that allows for multiple passes and composition that can depend on values gathered from the input data.
Like all operators, IterativeOperators define ports and properties and must be JSON serializable (see General Requirements and Best Practices).
The overall life cycle for IterativeOperators is:
1. IterativeOperator.computeMetadata() is called once during graph compilation. This gives operators a chance to validate and declare parallelizability, input metadata, and output metadata.
2. IterativeOperator.createIterator() is called once during graph execution to return a CompositionIterator, which is then used in the next two steps.
3. CompositionIterator.execute() is invoked once during graph execution. The body of execute will typically launch one or more subgraphs.
4. CompositionIterator.finalComposition() is invoked once during graph execution. At this point the operator must connect sources to each of its output ports.
Using Metadata
Implementation of computeMetadata() is similar to that for ExecutableOperators. The key difference is that an IterativeOperator must declare metadata for the operators in its subgraphs rather than itself.
When an IterativeOperator declares required metadata, it declares how the data is to be staged when accessed by its subgraphs. For efficiency it is important that, if the operators in the subgraph have any partitioning requirements themselves, the iterator advertise those requirements. Otherwise data may be repartitioned in each iteration.
When an IterativeOperator declares output metadata, it declares the metadata that will be produced by CompositionIterator.finalComposition(). A mismatch in the declared output metadata will result in a run time exception.
Finally, unlike ExecutableOperators, IterativeOperators themselves are never parallel. Rather, they are a single-threaded coordinator of parallel operations.
Thus, they declare parallelizability per-port rather than having a single setting for the whole operator. Input ports should be declared as parallelizable if any of the operators in the graph connected to those ports are parallelizable. Output ports should be declared as parallelizable if the source for the output port (the one added in finalComposition()) is parallelizable.
Implementations of computeMetadata() must adhere to the contracts in the following sections.
General
Regardless of input ports/output port types, all implementations must do the following:
1. Validation. Validating configuration should always be performed first.
2. Declare operator parallelizability. Implementations must declare by calling IterativeMetadataContext.parallelize().
3. Declare output port parallelizability. Implementations must declare by calling IterativeMetadataContext.setOutputParallelizable().
4. Declare input port parallelizability. Implementations must declare by calling IterativeMetadataContext.setIterationParallelizable().
Note:  There is a convenience method for performing steps 2-4 for the case where all record ports are parallelizable and where we are to determine parallelism based on source:

MetadataUtil.negotiateParallelismBasedOnSourceAssumingParallelizableRecords
Input Record Ports
Implementations with input record ports must declare the following:
1. Required data ordering: Implementations that have data ordering requirements must declare them by calling RecordPort.setRequiredDataOrdering; otherwise, iteration will proceed on an input data set whose order is undefined.
2. Required data distribution (only applies to parallelizable input ports): Implementations that have data distribution requirements must declare them by calling RecordPort.setRequiredDataDistribution, otherwise iteration will proceed on an input data set whose distribution is unspecified.
Note that if the upstream operator’s output distribution or ordering is compatible with those required, we avoid a re-sort and re-distribution, which is generally a large savings from a performance standpoint.
Output Record Ports (Static Metadata)
Implementations with output record ports must declare the following:
1. Type: Implementations must declare their output type by calling RecordPort.setType().
Implementations with output record ports may declare the following:
1. Output data ordering: Implementations that can make guarantees as to their output ordering may do so by calling RecordPort.setOutputDataOrdering().
2. Output data distribution (only applies to parallelizable output ports): Implementations that can make guarantees as to their output distribution may do so by calling RecordPort.setOutputDataDistribution().
Note that both of these properties are optional; if unspecified, performance may suffer since the framework may unnecessarily re-sort or re-distribute the data.
Input Model Ports
In general, iterative operators tend not to have model input ports, but if so, there is nothing special to declare for input model ports. Models are implicitly duplicated to all partitions when going from non-parallel to parallel ports.
Output Model Ports (Static Metadata)
SimpleModelPorts have no associated metadata and therefore there is never any output metadata to declare. PMMLPorts, on the other hand, do have associated metadata. For all PMMLPorts, implementations must declare the following:
1. pmmlModelSpec: Implementations must declare the PMML model specification by calling PMMLPort.setPMMLModelSpec().
Output Ports with Dynamic Metadata
If an output port has dynamic metadata, implementations can declare by calling IterativeMetadataContext.setMetadataDynamic(). In the case that metadata is dynamic, calls to RecordPort.setType(), RecordPort.setOutputDataOrdering(), and so forth, are not allowed, and thus the previous sections titled "Output Record Ports (Static Metadata)" and "Output Model Ports (Static Metadata)" must be skipped.
Note that, if possible, dynamic metadata should be avoided (see IterativeMetadataContext.setMetadataDynamic()).
Example
The following example is the ZScore class, which can be found in the training material. The operator computes the z-score of a given field. Z-score for a given value x is calculated by the formula:
zscore = (x-mean(X))/stddev(X)
Z-score calculation proceeds in two passes over the data:
1. In the first pass, mean and stddev are calculated.
2. In the second pass zscore is calculated.
Using the IterativeOperator
public final class ZScore extends IterativeOperator implements RecordPipelineOperator {

    //required: declare any input ports
    private final RecordPort input = newRecordInput("input");
    //required: declare any output ports
    private final RecordPort output = newRecordOutput("output");
    //required: declare any properties
    private String fieldName;

    /**
     * Default constructor.
     */
    //required: all operators must have a default constructor
    public ZScore() {
    }

    /**
     * Convenience constructor, specifies the field for which to
     * calculate the z-score
     * @param fieldName the field for which to calculate the z-score
     */
    //optional: convenience constructor
    public ZScore(String fieldName) {
        setFieldName(fieldName);
    }

    ///////////////////////////////////////////////////
    //
    // Required: getters and setters for each property
    //
    ///////////////////////////////////////////////////
    /**
     * Returns the field name of field for which to calculate the z-score
     * @return the field for which to calculate the z-score
     */
    public String getFieldName() {
        return fieldName;
    }

    /**
     * Sets the field name of field for which to calculate the z-score
     * @param fieldName the field for which to calculate the z-score
     */
    public void setFieldName(String fieldName) {
        this.fieldName = fieldName;
    }

    ///////////////////////////////////////////////////
    //
    // Required: getters for each port
    //
    ///////////////////////////////////////////////////
    /**
     * Returns the input port
     * @return the input port.
     */
    @Override
    public RecordPort getInput() {
        return input;
    }

    /**
     * Returns the output port containing input data plus an additional
     * field <fieldName>_zscore containing the calculated zscore for the given
     * input field
     * @return the output port
     */
    @Override
    public RecordPort getOutput() {
        return output;
    }

    @Override
    protected void computeMetadata(IterativeMetadataContext ctx) {

        //best practice: perform validation first
        // make sure field is non-null 
        PropertyUtil.checkNotEmpty(fieldName, "fieldName");

        // make sure field exists in the input
        input.getType(ctx).verifyNames(fieldName);

        //Required: declare parallelizability of this operator, and each input and output port
//In this case, we can use the convenience method. This method does the following:
        // 1)Specifies that the input will be accessed iteratively in parallel (distributed)
// 2)Specifies that the output record port is parallel (distributed)
// 3)Specifies that our parallelism is to be determined by the source parallelism
MetadataUtil.negotiateParallelismBasedOnSourceAssumingParallelizableRecords(ctx);

        //required: declare sort order and data distribution of each 
        //  input port. data is staged *once* according to these parameters and each
        //  iteration then uses that dataset. incorrectly defining these parameters
        //  means that data may be redistributed with each iteration
        //
        //  in this case, we want input to be staged in a parallel fashion since both the Group and the DeriveFields
        //  operators that we use during execution are parallelizable. 
        //
        //  in this case, we don't care about distribution or ordering since neither Group nor DeriveFields care
        input.setRequiredDataDistribution(ctx, UnspecifiedPartialDistribution.INSTANCE);
        input.setRequiredDataOrdering(ctx, DataOrdering.UNSPECIFIED);

        //required: declare each output type
        //
        //  in this case, outputType is input plus the new column appended
        RecordTokenType outputType = TypeUtil.mergeTypes(input.getType(ctx), record(DOUBLE(outputFieldName()))); 
        output.setType(ctx, outputType);

        //best practice: declare output data ordering and distribution if possible
        //
        //  in this case, since we're merely appending a field, output data ordering
        //  and partitioning are the same as input. this is consistent with the 
        //  metadata produce by DeriveFields
        output.setOutputDataOrdering(ctx, input.getSourceDataOrdering(ctx));        
        output.setOutputDataDistribution(ctx, input.getSourceDataDistribution(ctx));
    }

    @Override
    protected CompositionIterator createIterator(MetadataContext ctx) {
        return new ZScoreIterator();
    }

    /**
     * required: iterator -- this is the master execution piece that
     * handles execution of subgraphs. CompositionIterators are normally
     * inner classes of the primary operator since they always need to
     * reference input and output ports
     */
    private class ZScoreIterator implements CompositionIterator {

        //constants
        private static final String FIELD_STDDEV = "stddev";
        private static final String FIELD_AVG = "avg";

        //required: iteration state. all iterators
        // have some internal state that they capture in their
        // primary *execute* method and then reference in *finalComposition*
        private double avg;
        private double stddev;

        public ZScoreIterator() {
            super();
        }

        @Override
        public void execute(IterativeExecutionContext ctx) {        

            //create a new subgraph (typically execute methods
            //will create one or more subgraphs)
            LogicalSubgraph sub = ctx.newSubgraph();

            //add a group operator to calculate mean and stddev
            Group group = sub.add(new Group());
            group.setAggregations(new Aggregation[]{Aggregation.avg(ZScore.this.fieldName).as(FIELD_AVG),
                    Aggregation.stddev(ZScore.this.fieldName,true).as(FIELD_STDDEV)});

            //create a connection in the subgraph from the main input to the Group input
            sub.connect(ZScore.this.input, group.getInput());

            //create a placeholder to capture the results
            CollectRecords placeholder = sub.add(new CollectRecords());
            sub.connect(group.getOutput(), placeholder.getInput());

            //run the subgraph
            sub.run();

            //extract avg and stddev from results (results from
            //Group consist of a single row of two fields "stddev" and "avg")
            RecordTokenList list = placeholder.getOutput();
            assert list.size() == 1;
            RecordValued row = list.getToken(0);                   
            this.avg = ((DoubleValued)row.getField(FIELD_AVG)).asDouble();
            this.stddev = ((DoubleValued)row.getField(FIELD_STDDEV)).asDouble();
        }

        @Override
        public void finalComposition(OperatorComposable ctx) {

            //create a DeriveFields to derive the zscore
            FieldDerivation zscore =
                FieldDerivation.derive(outputFieldName(), 
                        StatsFunctions.zscore(ZScore.this.fieldName, this.avg, this.stddev));
            DeriveFields convert =
                ctx.add(new DeriveFields(zscore));

            //connect from the main operators's input to the converter's input
            ctx.connect(ZScore.this.input, convert.getInput());

            //required: implementations of finalComposition must connect
            //  all output ports
            ctx.connect(convert.getOutput(), ZScore.this.output);
        }

    }

    private String outputFieldName() {
        return fieldName + "_zscore";
    }
}