Writing a DeferredCompositeOperator
Overview
The
DeferredCompositeOperator is similar to the
CompositeOperator in that it is defined by using other operators to build out a subsection of the graph. The key distinction is that composition is
deferred until execution time. CompositeOperators are composed once per graph. However,
DeferredCompositeOperators are composed once per partition.
Per-partition composition gives implementations both more control and more responsibility than standard CompositeOperators:
1. Parallelizability of the component operators is ignored. The parallelizable declaration of the
DeferredCompositeOperator controls whether its compose() method is invoked multiple times and thus controls whether multiple copies of the component operators are created.
2. Partitioning requirements of the component operators are ignored. The DeferredCompositeOperator must assume the responsibility of declaring partitioning requirements for all of its component operators.
3. By implication, all of the component operators must be equivalently partitioned.
4. Because output type information is not known until execution time,
DeferredCompositeOperators must declare the output types of their ports.
5. Similarly, DeferredCompositeOperators must declare output partitioning and sort order.
The primary use cases for
DeferredCompositeOperators are:
• Cases where the composition depends on partitioning information. The following example is one such use case.
• Converting an otherwise non-parallel operation into a parallel operation. This is true in the case of the following example since the sequence function (non-parallel) is used to generate a parallel sequence.
• Defining a "block" of equivalently partitioned operations.
That said, it should be noted that use cases for
DeferredCompositeOperators are extremely
rare. Implementers should mostly use
CompositeOperators (see
Writing a CompositeOperator) since there are a number of caveats. In addition to the extra burden when defining metadata (
CompositeOperators do not need to declare metadata), implementers must be aware of the following restrictions:
• If any component operators declare metadata requirements, they are treated as assertions (rather than being automatically fulfilled by the framework). Thus, it is the responsibility of
DeferredCompositeOperators to ensure that any metadata requirements are fulfilled.
Like all operators,
DeferredCompositeOperators define ports and properties and must be JSON serializable (see
General Requirements and Best Practices).
Using Metadata
Implementations of computeMetadata() must adhere to the same contracts specified for executable operators. (See
Writing an Executable Operator.) In addition, they must bear the responsibility of declaring metadata for their component operators.
General
Regardless of input/output port types, all implementations must do the following:
1. Validation: Validating configuration should always be performed first.
2. Declare parallelizability: Implementations must declare parallizability by calling StreamingMetadataContext.parallelize.
Input Record Ports
Implementations with input record ports must declare the following:
1. Required data ordering: Implementations that have data ordering requirements must declare them by calling RecordPort.setRequiredDataOrdering(), otherwise data may arrive in any order.
DeferredCompositeOperators have the additional responsibility of satisfying ordering requirements for the component operators. Failure to do so will result in a run time exception.
2. Required data distribution (only applies to parallelizable operators): Implementations that have data distribution requirements must declare them by calling RecordPort.setRequiredDataDistribution(); otherwise, data will arrive in an unspecified distribution.
DeferredCompositeOperators must determine a distribution strategy that is appropriate for their component operators since the default partitioning requirements of the components will be ignored.
Output Record Ports
Implementations with output record ports must declare the following:
1. Type: Implementations must declare their output type by calling RecordPort.setType().
DeferredCompositeOperators have the additional responsibility of specifying what the output type of the component operators will be. A mismatch in type will be detected and result in a run time exception.
Implementations with output record ports may declare the following:
1. Output data ordering: Implementations that can make guarantees as to their output ordering may do so by calling RecordPort.setOutputDataOrdering().
2. Output data distribution (only applies to parallelizable operators): Implementations that can make guarantees as to their output distribution may do so by calling RecordPort.setOutputDataDistribution().
Note that both of these properties are optional; if unspecified, performance may suffer since the framework may unnecessarily re-sort or re-distribute the data.
Composition
• Provides a place to add operators and define connections for the subsection of the graph.
• Provides metadata resolution for input ports.
• The input metadata allows validation.
• The input metadata also drives composition decisions. For example, a compose() method may conditionally choose one implementation over another if the input is already sorted.
• Partition ID information. This allows composition to depend on partition information as you will see in the following example.
Example
This example is the AssignUniqueIDs class that can also be found in the training material. This example appends an "id" field to each row where the ID is guaranteed to be unique across the cluster. Each partition outputs a sequence of values where the sequence starts with each partition's partitionID and is always incremented by the total number of partitions. For example, if there are two partitions, partition 0 will output a sequence of even numbers and partition 1 will output a sequence of odd numbers.
Using the DeferredCompositeOperator
public final class AssignUniqueIDs extends DeferredCompositeOperator implements RecordPipelineOperator {
//required: declare any input ports
private final RecordPort input= newRecordInput("input");
//required: declare any output ports
private final RecordPort output= newRecordOutput("output");
//required: declare any properties
private String fieldName= "id";
/**
* Default constructor, uses a fieldName of "id" by default
*/
//required: all operators must have a default constructor
public AssignUniqueIDs() {
}
/**
* Create a new AssignUniqueIDs, specifying the name of the id field to output
* @param fieldName the name of the id output field
*/
//optional: convenience constructor
public AssignUniqueIDs(String fieldName) {
setFieldName(fieldName);
}
///////////////////////////////////////////////////
//
// Required: getters and setters for each property
//
///////////////////////////////////////////////////
/**
* Returns the name of the id output field
* @return the name of the id output field
*/
public String getFieldName() {
return fieldName;
}
/**
* Sets the name of the id output field
* @param fieldName the name of the id output field
*/
public void setFieldName(String fieldName) {
this.fieldName = fieldName;
}
///////////////////////////////////////////////////
//
// Required: getters for each port
//
///////////////////////////////////////////////////
@Override
public RecordPort getInput() {
return input;
}
@Override
public RecordPort getOutput() {
return output;
}
@Override
protected void computeMetadata(StreamingMetadataContext ctx) {
//best practice: perform validation first
PropertyUtil.checkNotEmpty("fieldName", fieldName);
//required: declare parallelizability
// in this case we set are parallelism based on source
ctx.parallelize(NEGOTIATE_BASED_ON_SOURCE);
//required: declare output type
// in this case our output type is the input type plus an additional field
// containing the id
RecordTokenType outputType= TypeUtil.mergeTypes(input.getType(ctx), record(LONG(fieldName)));
output.setType(ctx, outputType);
//best practice: define output ordering/distribution
// in this case we are appending a field to the source so
// ordering and distribution are preserved
output.setOutputDataOrdering(ctx, input.getSourceDataOrdering(ctx));
output.setOutputDataDistribution(ctx, input.getSourceDataDistribution(ctx));
}
@Override
protected void compose(DeferredCompositionContext ctx) {
//get the partitionID information
//(available at execution time and at deferred composition time)
PartitionInstanceInfo info= ctx.getAssignment().getPartitionInfo();
//define startValue to be my partitionID
long startValue= info.getPartitionID();
//define stepSize to be the total number of partitions
long stepSize= info.getPartitionCount();
//derive a sequence with the given startValue and stepSize
FieldDerivation derivation= FieldDerivation.derive(fieldName,
Sequences.sequence(startValue, stepSize));
//add the DeriveFields operator to the graph, connecting our input
DeriveFields df= ctx.add(new DeriveFields(derivation));
ctx.connect(input, df.getInput());
//required: connect all output ports
// in this case we're connecting the output from derive fields to our output
ctx.connect(df.getOutput(), output);
}
}