Customizing and Extending DataFlow Functionality : Writing an Operator : Writing a DeferredCompositeOperator
 
Share this page                  
Writing a DeferredCompositeOperator
Overview
The DeferredCompositeOperator is similar to the CompositeOperator in that it is defined by using other operators to build out a subsection of the graph. The key distinction is that composition is deferred until execution time. CompositeOperators are composed once per graph. However, DeferredCompositeOperators are composed once per partition.
Per-partition composition gives implementations both more control and more responsibility than standard CompositeOperators:
1. Parallelizability of the component operators is ignored. The parallelizable declaration of the DeferredCompositeOperator controls whether its compose() method is invoked multiple times and thus controls whether multiple copies of the component operators are created.
2. Partitioning requirements of the component operators are ignored. The DeferredCompositeOperator must assume the responsibility of declaring partitioning requirements for all of its component operators.
3. By implication, all of the component operators must be equivalently partitioned.
4. Because output type information is not known until execution time, DeferredCompositeOperators must declare the output types of their ports.
5. Similarly, DeferredCompositeOperators must declare output partitioning and sort order.
The primary use cases for DeferredCompositeOperators are:
Cases where the composition depends on partitioning information. The following example is one such use case.
Converting an otherwise non-parallel operation into a parallel operation. This is true in the case of the following example since the sequence function (non-parallel) is used to generate a parallel sequence.
Defining a "block" of equivalently partitioned operations.
That said, it should be noted that use cases for DeferredCompositeOperators are extremely rare. Implementers should mostly use CompositeOperators (see Writing a CompositeOperator) since there are a number of caveats. In addition to the extra burden when defining metadata ( CompositeOperators do not need to declare metadata), implementers must be aware of the following restrictions:
A DeferredCompositeOperator must not contain any iterative operators.
A DeferredCompositeOperator must not contain any operators that force staging.
If any component operators declare metadata requirements, they are treated as assertions (rather than being automatically fulfilled by the framework). Thus, it is the responsibility of DeferredCompositeOperators to ensure that any metadata requirements are fulfilled.
Like all operators, DeferredCompositeOperators define ports and properties and must be JSON serializable (see General Requirements and Best Practices).
Using Metadata
Implementations of computeMetadata() must adhere to the same contracts specified for executable operators. (See Writing an Executable Operator.) In addition, they must bear the responsibility of declaring metadata for their component operators.
General
Regardless of input/output port types, all implementations must do the following:
1. Validation: Validating configuration should always be performed first.
2. Declare parallelizability: Implementations must declare parallizability by calling StreamingMetadataContext.parallelize.
Input Record Ports
Implementations with input record ports must declare the following:
1. Required data ordering: Implementations that have data ordering requirements must declare them by calling RecordPort.setRequiredDataOrdering(), otherwise data may arrive in any order. DeferredCompositeOperators have the additional responsibility of satisfying ordering requirements for the component operators. Failure to do so will result in a run time exception.
2. Required data distribution (only applies to parallelizable operators): Implementations that have data distribution requirements must declare them by calling RecordPort.setRequiredDataDistribution(); otherwise, data will arrive in an unspecified distribution. DeferredCompositeOperators must determine a distribution strategy that is appropriate for their component operators since the default partitioning requirements of the components will be ignored.
Output Record Ports
Implementations with output record ports must declare the following:
1. Type: Implementations must declare their output type by calling RecordPort.setType(). DeferredCompositeOperators have the additional responsibility of specifying what the output type of the component operators will be. A mismatch in type will be detected and result in a run time exception.
Implementations with output record ports may declare the following:
1. Output data ordering: Implementations that can make guarantees as to their output ordering may do so by calling RecordPort.setOutputDataOrdering().
2. Output data distribution (only applies to parallelizable operators): Implementations that can make guarantees as to their output distribution may do so by calling RecordPort.setOutputDataDistribution().
Note that both of these properties are optional; if unspecified, performance may suffer since the framework may unnecessarily re-sort or re-distribute the data.
Composition
Like CompositeOperators, DeferredCompositeOperators perform their composition within a compose() method (see Writing a CompositeOperator). DeferredCompositeOperators are passed a DeferredCompositionContext which provides the same capabilities as a CompositionContext:
Provides a place to add operators and define connections for the subsection of the graph.
Thus, it is similar to a LogicalGraph (see Composing an Application).
Provides metadata resolution for input ports.
The input metadata allows validation.
The input metadata also drives composition decisions. For example, a compose() method may conditionally choose one implementation over another if the input is already sorted.
In addition, DeferredCompositionContext provides:
Partition ID information. This allows composition to depend on partition information as you will see in the following example.
Example
This example is the AssignUniqueIDs class that can also be found in the training material. This example appends an "id" field to each row where the ID is guaranteed to be unique across the cluster. Each partition outputs a sequence of values where the sequence starts with each partition's partitionID and is always incremented by the total number of partitions. For example, if there are two partitions, partition 0 will output a sequence of even numbers and partition 1 will output a sequence of odd numbers.
Using the DeferredCompositeOperator
public final class AssignUniqueIDs extends DeferredCompositeOperator implements RecordPipelineOperator {

    //required: declare any input ports
    private final RecordPort input= newRecordInput("input");
    //required: declare any output ports
    private final RecordPort output= newRecordOutput("output");
    //required: declare any properties
    private String fieldName= "id";

    /**
     * Default constructor, uses a fieldName of "id" by default
     */
    //required: all operators must have a default constructor    
    public AssignUniqueIDs() {
    }

    /**
     * Create a new AssignUniqueIDs, specifying the name of the id field to output
     * @param fieldName the name of the id output field
     */
    //optional: convenience constructor
    public AssignUniqueIDs(String fieldName) {
        setFieldName(fieldName);
    }

    ///////////////////////////////////////////////////
    //
    // Required: getters and setters for each property
    //
    ///////////////////////////////////////////////////

    /**
     * Returns the name of the id output field
     * @return the name of the id output field
     */
    public String getFieldName() {
        return fieldName;
    }

    /**
     * Sets the name of the id output field
     * @param fieldName the name of the id output field
     */
    public void setFieldName(String fieldName) {
        this.fieldName = fieldName;
    }

    ///////////////////////////////////////////////////
    //
    // Required: getters for each port
    //
    ///////////////////////////////////////////////////
 
    @Override
    public RecordPort getInput() {
        return input;
    }

    @Override
    public RecordPort getOutput() {
        return output;
    }

    @Override
    protected void computeMetadata(StreamingMetadataContext ctx) {

        //best practice: perform validation first
        PropertyUtil.checkNotEmpty("fieldName", fieldName);

        //required: declare parallelizability
        //  in this case we set are parallelism based on source
        ctx.parallelize(NEGOTIATE_BASED_ON_SOURCE);

        //required: declare output type
        //  in this case our output type is the input type plus an additional field
        //  containing the id
        RecordTokenType outputType= TypeUtil.mergeTypes(input.getType(ctx), record(LONG(fieldName))); 
        output.setType(ctx, outputType);

        //best practice: define output ordering/distribution
        //  in this case we are appending a field to the source so
        //  ordering and distribution are preserved
        output.setOutputDataOrdering(ctx, input.getSourceDataOrdering(ctx));        
        output.setOutputDataDistribution(ctx, input.getSourceDataDistribution(ctx));
    }

    @Override
    protected void compose(DeferredCompositionContext ctx) {

        //get the partitionID information 
        //(available at execution time and at deferred composition time)
        PartitionInstanceInfo info= ctx.getAssignment().getPartitionInfo();

        //define startValue to be my partitionID
        long startValue= info.getPartitionID();

        //define stepSize to be the total number of partitions 
        long stepSize= info.getPartitionCount();

        //derive a sequence with the given startValue and stepSize
        FieldDerivation derivation= FieldDerivation.derive(fieldName, 
                Sequences.sequence(startValue, stepSize));

        //add the DeriveFields operator to the graph, connecting our input
        DeriveFields df= ctx.add(new DeriveFields(derivation));
        ctx.connect(input, df.getInput());

        //required: connect all output ports
        //  in this case we're connecting the output from derive fields to our output
        ctx.connect(df.getOutput(), output);
    }

}