Customizing and Extending DataFlow Functionality : Writing an Operator : Writing a CompositeOperator
 
Share this page                  
Writing a CompositeOperator
Overview
A CompositeOperator is a composition-only operator. CompositeOperators are defined by using other operators to build out a subsection of the graph. CompositeOperators do not play a role at execution time; thus they can be thought of as a template.
CompositeOperators are commonly used in cases where an operation may be defined by re-using existing operations.
CompositeOperators must be used in cases where an overall operation consists of a mixture of parallelizable and non-parallelizable operations or a mixture of operations that have different distribution requirements. An example of this parallel/non-parallel mix would be the Group operator: in the case of a key-less aggregation, there is an initial parallel operator where partial aggregations are computed and connected to a non-parallel operator that combines the one row from each partition into a final result.
Like all operators, CompositeOperators define various ports and properties and must be JSON serializable (see General Requirements and Best Practices).
Defining Metadata
Composite operators themselves do not declare metadata directly; rather their metadata is implied by their component metadata. Composite operators that should influence partitioning metadata may do so by the operator PartitionHint. Composite operators that should influence sort metadata may do so by the operator AssertSorted.
Composition
Most of the implementation of a CompositeOperator lives in its compose() method. The compose() method is passed a CompositionContext. The composition context serves the following purposes:
Provides a place to add operators and define connections for the subsection of the graph. Thus, it is similar to a LogicalGraph (see Composing an Application).
Provides metadata resolution for input ports.
The input metadata allows validation.
The input metadata also drives composition decisions. For example, a compose method may conditionally choose one implementation over another if the input is already sorted.
Implementations of the compose() method should do the following:
1. Perform any validation of configuration, input types, and so on.
2. Instantiate and configure suboperators, adding them to the provided context by the method CompositionContext.add().
3. Create any necessary connections through the method CompositionContext.connect(). This includes connections from the composite’s input ports to suboperators, connections between suboperators, and connections from suboperators output ports to the composite’s output ports. Note that it will generate a run time exception if composition fails to connect any output ports.
Example
The following example is the AdderComposite class, which can also be found in the training materials. This is an example of a CompositeOperator that takes a record input and produces an output that includes the original data plus the sum of two of the fields.
Using the CompositeOperator
public final class AdderComposite extends CompositeOperator implements RecordPipelineOperator {

    //required: declare input ports
    private final RecordPort input = newRecordInput("input");
    //required: declare output ports
    private final RecordPort output = newRecordOutput("output");
    //required: declare properties
    private String inputFieldName1 = "field1";
    private String inputFieldName2 = "field2";
    private String sumFieldName = "sum";
    
    /**
     * Default constructor
     */
    //required: all operators must have a default constructor
    public AdderComposite() {
    }
    
    /**
     * Creates an adder for the specified fields
     * @param inputFieldName1 the first input field
     * @param inputFieldName2 the second input field
     * @param sumFieldName the output field that will contain the sum
     */
    //optional: convenience constructor
    public AdderComposite(String inputFieldName1, String inputFieldName2, String sumFieldName) {
        setInputFieldName1(inputFieldName1);
        setInputFieldName2(inputFieldName2);
        setSumFieldName(sumFieldName);
    }
    
    ///////////////////////////////////////////////////
    //
    // Required: getters and setters for each property
    //
    ///////////////////////////////////////////////////
    /**
     * Returns the name of the first input field to be added
     * @return the name of the first input field
     */
    public String getInputFieldName1() {
        return inputFieldName1;
    }
    
    /**
     * Sets the name of the first input field to be added
     * @param inputFieldName1 the name of the first input field
     */
    public void setInputFieldName1(String inputFieldName1) {
        this.inputFieldName1 = inputFieldName1;
    }
    
    /**
     * Returns the name of the second input field to be added
     * @return the name of the second input field
     */
    public String getInputFieldName2() {
        return inputFieldName2;
    }
    
    /**
     * Sets the name of the second input field to be added
     * @param inputFieldName2 the name of the second input field
     */
    public void setInputFieldName2(String inputFieldName2) {
        this.inputFieldName2 = inputFieldName2;
    }
    
    /**
     * Returns the name of the output field that will contain the sum of the two fields.
     * @return the name of the output field that will contain the sum of the two fields.
     */
    public String getSumFieldName() {
        return sumFieldName;
    }
   
    /**
     * Sets the name of the output field that will contain the sum of the two fields.
     * @param sumFieldName the name of the output field that will contain the sum of the two fields.
     */
    public void setSumFieldName(String sumFieldName) {
        this.sumFieldName = sumFieldName;
    }

    ///////////////////////////////////////////////////
    //
    // Required: getters for each port
    //
    ///////////////////////////////////////////////////
    /**
     * Returns the input port containing the data to add
     */
    @Override
    public RecordPort getInput() {
        return input;
    }

    /**
     * Returns the output port containing the original data from the input
     * plus an additional field containing the sum
     */
    @Override
    public RecordPort getOutput() {
        return output;
    }

    private void validateInput(MetadataContext ctx) {

        //make sure specified properties are not empty
        PropertyUtil.checkNotEmpty("inputFieldName1", inputFieldName1);
        PropertyUtil.checkNotEmpty("inputFieldName2", inputFieldName2);
        PropertyUtil.checkNotEmpty("sumFieldName", sumFieldName);

        //make sure inputFieldName1 and inputFieldName2 are defined in the input
        RecordTokenType inputType = input.getType(ctx);
        inputType.verifyNames(inputFieldName1,inputFieldName2);
    }

    @Override
    protected void compose(CompositionContext ctx) {

        //best practice: perform any input validation: should be done first
        validateInput(ctx);

        //define a FieldDerivation to derive a new field equal to the
        //sum of two other fields
        FieldDerivation sum =
            FieldDerivation.derive(sumFieldName, 
                    Arithmetic.add(inputFieldName1, inputFieldName2));

        //add DeriveFields to the graph
        DeriveFields derive = ctx.add(new DeriveFields(sum));     

        //connect the input of this operator to the input of DeriveFields
        ctx.connect(input, derive.getInput());

        //required: compose must connect sources to all output ports
        //  in this case, we connect the output of DeriveFields. 
        ctx.connect(derive.getOutput(), output);
    }
}