Customizing and Extending DataFlow Functionality : Writing an Operator : Writing an Executable Operator
 
Share this page                  
Writing an Executable Operator
Overview
ExecutableOperator are the worker operators. All other operators, whether CompositeOperator, IterativeOperator, or DeferredCompositeOperators, ultimately use ExecutableOperator to process or transform data.
Like all operators, ExecutableOperator defines ports and properties and must be JSON serializable (see General Requirements and Best Practices).
ExecutableOperators have a composition component and a run time component. Their composition component, computeMetadata(), specifies certain attributes about their metadata. The metadata specifies type information, parallelizability, data sort order, and data distribution. This metadata is used at compilation time to optimize the execution plan. It also allows operators to validate that their inputs are of the correct types so that misconfigurations are detected early at compilation time rather than at execution time. Finally, it allows the operator to declare whether it can be replicated.
The run time component, the execute method, generally consists of reading data from the input ports and writing data to the output ports.
The overall life cycle is as follows:
1. computeMetadata(): Is invoked to compute metadata. This is always performed locally and is done up front prior to execution, so this is the appropriate time to perform any necessary validation.
2. cloneForExecution(): Is performed to clone the operator. This is performed once for each thread of execution. This guarantees that, if the operator modifies any internal states as part of its execution, those changes will not be visible to other threads. Thus, we provide a safeguard to guarantee thread safety. This step is skipped if the operator is non-parallelizable.
3. execute(): Is invoked on each copy that was returned by calling cloneForExecution to perform the work.
Implementing Metadata
Implementations of computeMetadata() must adhere to the following contracts:
General
Regardless of input/output port types, all implementations must do the following:
1. Validation: Validation of configuration should always be performed first.
2. Declare parallelizability: Implementations must declare max parallelism by invoking StreamingMetadataContext.parallelize.
Input Record Ports
Implementations with input record ports must declare the following:
1. Required data ordering: Implementations that have data ordering requirements must declare them by calling RecordPort.setRequiredDataOrdering(); otherwise, data may arrive in any order.
2. Required data distribution (only applies to parallelizable operators): Implementations that have data distribution requirements must declare them by calling RecordPort.setRequiredDataDistribution(); otherwise, data will arrive in an unspecified distribution.
Note that if the upstream operator’s output distribution and ordering is compatible with those required, we avoid a re-sort/re-distribution, which is generally a large savings from a performance standpoint. In addition, some operators may choose to query the upstream output distribution and ordering by calling RecordPort.getSourceDataDistribution() and RecordPort.getSourceDataOrdering(). These should be viewed as hints to help choose a more efficient algorithm. In such cases, though, operators must still declare data ordering and data distribution requirements; otherwise there is no guarantee that the data will arrive sorted or distributed as required. See Managing the Port Metadata for additional information about handling input requirements.
Output Record Ports
Implementations with output record ports must declare the following:
1. Type: Implementations must declare their output type by calling RecordPort.setType().
Implementations with output record ports may declare the following:
1. Output data ordering: Implementations that can make guarantees as to their output ordering may do so by calling RecordPort.setOutputDataOrdering().
2. Output data distribution (only applies to parallelizable operators): Implementations that can make guarantees as to their output distribution may do so by calling RecordPort.setOutputDataDistribution().
Note that both of these properties are optional; if unspecified, performance may suffer since the framework may unnecessarily re-sort or re-distribute the data. See Managing the Port Metadata for additional information about handling output guarantees.
Input Model Ports
In general, there is nothing special to declare for input model ports. Models are implicitly duplicated to all partitions when going from non-parallel to parallel operators. The case of a model going from a parallel to a non-parallel node is a special case of a "model reducer" operator. In the case of a model reducer, the downstream operator must declare the following:
1. Merge handler: Model reducers must declare a merge handler by calling AbstractModelPort.setMergeHandler().
Note that MergeModel is a convenient, reusable model reducer, parameterized with a merge-handler.
Output Model Ports
SimpleModelPorts have no associated metadata and therefore there is never any output metadata to declare. On the other hand, PMMLPorts do have associated metadata. For all PMMLPorts, implementations must declare the following:
1. pmmlModelSpec: Implementations must declare the PMML model spec by calling PMMLPort.setPMMLModelSpec() and passing a PMMLModelSpec.
Execution Method
The "work" of an executable operator is performed within the execute() method. The execute() method is provided an ExecutionContext. The primary purpose of the ExecutionContext is to provide bindings from logical ports to physical ports. For more information, see Ports section in Application Model.
In general, the execute method consists of the following steps:
1. Obtain physical ports for each logical port.
2. Obtain handles to input and output fields.
3. Step over the inputs:
a. process input rows, possibly pushing to output.
4. Call pushEndOfData() on the outputs.
Implementations of execute() must adhere to the following contracts:
1. Following execution, all input ports must be at end-of-data. If the method does not consume all input data it must explicitly close the port by calling RecordInput.detach(). In the case of a model port, the implementation must invoke AbstractModelPort.getModel() at least once. See Physical Port States and End of Data for additional information.
2. Following execution, all output ports must be at end-of-data. Output ports must always be explicitly closed by calling RecordOutput.pushEndOfData(). In the case of a model port, the implementation must invoke AbstractModelPort.setModel() exactly once. See Physical Port States and End of Data for additional information.
See the following sections for more information about execution:
Control Flow Patterns for Execution
Exception Handling in ExecutableOperators
Working with Sparse Records
Example
The following example is the AdderExecutable class, which can be found in the training materials. This is an example of an ExecutableOperator that takes a record input and produces an output that includes the original data plus a sum of two of the fields. Note that this example is purely for illustrative purposes; see Writing a CompositeOperator for an example of how to accomplish the same thing using existing operators.
Example ExecutableOperator
public final class AdderExecutable extends ExecutableOperator implements RecordPipelineOperator {

    //required: declare input ports
    private final RecordPort input = newRecordInput("input");
    //required: declare output ports
    private final RecordPort output = newRecordOutput("output");
    //required: declare properties
    private String inputFieldName1 = "field1";
    private String inputFieldName2 = "field2";
    private String sumFieldName = "sum";

    /**
     * Default constructor
     */
    //required: all operators must have a default constructor
    public AdderExecutable() {
    }

    /**
     * Creates an adder for the specified fields
     * @param inputFieldName1 the first input field
     * @param inputFieldName2 the second input field
     * @param sumFieldName the output field that will contain the sum
     */
    //optional: convenience constructor
    public AdderExecutable(String inputFieldName1, String inputFieldName2, String sumFieldName) {
        setInputFieldName1(inputFieldName1);
        setInputFieldName2(inputFieldName2);
        setSumFieldName(sumFieldName);
    }

    ///////////////////////////////////////////////////
    //
    // Required: getters and setters for each property
    //
    ///////////////////////////////////////////////////
    /**
     * Returns the name of the first input field to be added
     * @return the name of the first input field
     */
    public String getInputFieldName1() {
        return inputFieldName1;
    }
   
    /**
     * Sets the name of the first input field to be added
     * @param inputFieldName1 the name of the first input field
     */
    public void setInputFieldName1(String inputFieldName1) {
        this.inputFieldName1 = inputFieldName1;
    }
   
    /**
     * Returns the name of the second input field to be added
     * @return the name of the second input field
     */
    public String getInputFieldName2() {
        return inputFieldName2;
    }
    
    /**
     * Sets the name of the second input field to be added
     * @param inputFieldName2 the name of the second input field
     */
    public void setInputFieldName2(String inputFieldName2) {
        this.inputFieldName2 = inputFieldName2;
    }
    
    /**
     * Returns the name of the output field that will contain the sum of the two fields.
     * @return the name of the output field that will contain the sum of the two fields.
     */
    public String getSumFieldName() {
        return sumFieldName;
    }
    
    /**
     * Sets the name of the output field that will contain the sum of the two fields.
     * @param sumFieldName the name of the output field that will contain the sum of the two fields.
     */
    public void setSumFieldName(String sumFieldName) {
        this.sumFieldName = sumFieldName;
    }
    
    ///////////////////////////////////////////////////
    //
    // Required: getters for each port
    //
    ///////////////////////////////////////////////////
    /**
     * Returns the input port containing the data to add
     */
    @Override
    public RecordPort getInput() {
        return input;
    }

    /**
     * Returns the output port containing the original data from the input
     * plus an additional field containing the sum
     */
    @Override
    public RecordPort getOutput() {
        return output;
    }
    
    private void validateInput(MetadataContext ctx) {

        //make sure specified properties are not empty
        PropertyUtil.checkNotEmpty("inputFieldName1", inputFieldName1);
        PropertyUtil.checkNotEmpty("inputFieldName2", inputFieldName2);
        PropertyUtil.checkNotEmpty("sumFieldName", sumFieldName);

        //make sure inputFieldName1 and inputFieldName2 are defined in the input
        RecordTokenType inputType = input.getType(ctx);
        inputType.verifyNames(inputFieldName1,inputFieldName2);
    }

    @Override
    protected void computeMetadata(StreamingMetadataContext ctx) {

        //best practice: perform any input validation: should be done first
        validateInput(ctx);

        //required: declare our parallelizability. 
        //  in this case we use source parallelism as a hint for our parallelism.
        ctx.parallelize(NEGOTIATE_BASED_ON_SOURCE);

        //required: declare output type 
        //  in this case our output type is the input type plus an additional field
        //  containing the sum
        RecordTokenType outputType = TypeUtil.mergeTypes(input.getType(ctx), record(DOUBLE(sumFieldName))); 
        output.setType(ctx, outputType);                

        //best practice: define output ordering/distribution
        //  in this case we are appending a field to the source so
        //  ordering and distribution are preserved
        output.setOutputDataOrdering(ctx, input.getSourceDataOrdering(ctx));
        output.setOutputDataDistribution(ctx, input.getSourceDataDistribution(ctx));
    }

    @Override
    protected void execute(ExecutionContext ctx) {

        //get physical input/output ports that correspond to logical input/output ports
        RecordInput input = this.input.getInput(ctx);
        RecordOutput output = this.output.getOutput(ctx);

        //best practice: get handles to input fields and cache in local variable rather than performing in inner loop
        //  inputField1 is a handle to the first input field to add
        //  inputField2 is a handle to the second input field to add
        //  allInputs is an array of all input fields
        DoubleValued inputField1 = (DoubleValued)input.getField(this.inputFieldName1);
        DoubleValued inputField2 = (DoubleValued)input.getField(this.inputFieldName2);
        ScalarValued[] allInputs = input.getFields();

        //best practice: get handle to output fields
        //  sumField is a handle to the sum output field
        //  outputs is an array of all outputs *except* for sumField; corresponds by index to allInputs
        DoubleSettable sumField = (DoubleSettable)output.getField(this.sumFieldName);
        ScalarSettable[] outputs = TokenUtils.selectFields(output, input.getType().getNames());

        //required: step until input is drained
        while ( input.stepNext() ) {

            //copy original fields as is into the current row buffer
            TokenUtils.transfer(allInputs, outputs);

            //set the sum field equal to the sum of the two input fields
            double val1 = inputField1.asDouble();
            double val2 = inputField2.asDouble();
            sumField.set(val1 + val2);

            //output the current row buffer
            output.push();
        }

        //required: signal end-of-data on output
        output.pushEndOfData();
    } 
}
Managing the Port Metadata
Overview
As the metadata provided by ExecutableOperators is used by the DataFlow framework to plan execution, the accuracy of this information is of significant importance. Incorrect declarations can not only impact performance but also can lead to incorrect results. Following certain guidelines can help avoid errors of this sort, maximizing performance while ensuring correctness.
Input Requirements
By default, no guarantees are made about the data delivered to an operator’s input. However, an operator may need certain guarantees on the data. By declaring requirements on the inputs, the framework can be aware of these needs and ensure they are fulfilled. Best practice is to always declare the requirements of an operator, even if they are "don't care". Being explicit about the requirements provides documentation and can avoid unexpected behaviors.
Managing the Requirement
Sometimes input requirements are not absolute, but instead may be open to negotiation. Allowing for some flexibility in requirements can provide performance improvements as the framework can avoid work which was not strictly required. For example, an operator may want data to be ordered by fields A and B, but does not care whether a given field is in ascending or descending order. Suppose this operator explicitly requested ascending order on A and B. If the data was already sorted on A and B, but in descending order, the framework would re-sort all the data to meet the requirement. By negotiating the required order (to be descending order on A and B), this sort can be avoided.
The MetadataUtil class provides a number of static methods for performing these sorts of negotiations. Aside from the non-specific ordering as discussed above, another common case is grouping. Grouping is a data requirement that all records with the same values k1, k2, ... kn for group key fields be assigned to the same partition. Unlike standard data distribution requirements, grouping does not consider how the data is spread across partitions to be important. Any key-based partitioning is sufficient, as long as desired groups are not split across partitions. Generally this is combined with non-specific ordering on the group keys so that all records in a group are adjacent to one another.
If an operator has no specific requirement but wants to use the input metadata reported by RecordPort.getSourceDataDistribution() or RecordPort.getSourceDataOrdering() to make decisions, the operator should explicitly set its requirement to the values returned by these methods.
Changes to Metadata Declaration
In earlier versions of DataFlow, if an operator did not have input requirements, then it was safe to declare the output metadata as a function of the input metadata. Here, the parallelism was fixed and the sort order always matched the order of the input. Currently, parallelism can change and when it is changed, the sort order is not preserved unless the operator has specified the sort order requirements.
To handle this case the framework provides a new method called as RecordPort.getCombinedMetadata that considers the modifying parallelism.
Operators that declare output metadata must migrate from using getSourceMetadata to getCombinedMetadata. If the old metadata is used, the result will be incorrect.
For the case using DerivedFields it declares the output metadata same as the input with the exception of the fields that it will modify.
For DeriveFields, use the following method within the compute metadata method:
RecordMetadata outputMetadata=
input.getCombinedMetadata(ctx).mutating(new ArrayList<String>(modified));
output.setOutputDataOrdering(ctx, outputMetadata.getDataOrdering());
output.setOutputDataDistribution(ctx, outputMetadata.getDataDistribution());
Distributing the Full Data
Occasionally, an operator may require the data from all partitions. A special data distribution requirement is defined for exactly this case: FullDataDistribution. The operator can still be run in parallel; each copy will receive exactly the same data on the input. Full data distribution should be used sparingly and only for small sdata sets, as it requires exchanging data across all partitions.
This distribution option is only valid as an input requirement; no operator can use this distribution as an output guarantee.
Output Guarantees
By default, no assumptions are made about the ordering and distribution of an operator’s output. Although this may lead to performance hits to ensure the requirements of downstream operators, it is always a safe choice. A false guarantee about the ordering or partitioning of output data can cause the framework to make assumptions that lead to violations of input requirements of operators. If in doubt about a guarantee, do not declare it in the output metadata!
Notes:
Never use the methods RecordPort.getSourceDataDistribution() and RecordPort.getSourceDataOrdering() when making output guarantees.
Instead, use the methods RecordPort.getCombinedDataOrdering() and RecordPort.getCombinedDataDistribution().
The former do not take into account how changes in parallelism impact the actual data ordering. The latter compute the actual data ordering and data distribution, taking into account:
1. Source parallelism
2. Target parallelism
3. Source metadata
4. Target metadata
Setting Parallelism
The parallelism can vary from one operator to the next within the same graph. Instead of a binary setting there is a max parallelism for each operator. This modification impacts the best practices for declaring the parallelizability. This also impacts the method an operator declares its output metadata.
Example: Nonparallelizable operator
For nonparallelizable operators, use the following method by which within in the compute metadata method.
@Override
protected void
computeMetadata(StreamingMetadataContext ctx)
{
ctx.parallelize(ParallelismStrategy.NON_PARALLELIZABLE)
...
}
Example: Parallelizable operator with at least one input
Most of the operators have at least one record input. In earlier versions they had unconstrained parallelism. However, they must have a parallelism that matches the source operator if the source is also parallel.
For parallelizable operators with at least one input, use the following method within in the compute metadata method.
@Override
protected void
computeMetadata(StreamingMetadataContext ctx)
{
//use source parallelism if source is parallel; otherwise, don't constrain parallelism
ctx.parallelize(ParallelismStrategy.NEGOTIATE_ BASED_ON_SOURCE);
...
}
Parallelizable operator with at least one input and the operation is simple.
A few operations are computationally inexpensive that it should only be parallelized if the source is parallel (for example, SelectFields). For these, use the MetadataUtil.acceptSourceParallelism method.
For parallelizable operators with at least one input and operation is simple, use the following method within in the compute metadata method.
@Override
protected void
computeMetadata(StreamingMetadataContext ctx)
{
//we're such an inexpensive operation that we always use source parallelism, even if non-parallel
ctx.parallelize(ParallelismStrategy.ACCEPT_SOU RCE);
...
}
Parallelizable Source Operators
The operators that do not have inputs must use the configured parallelism.
For parallelizable source operators without input, use the following method within in the compute metadata method.
@Override
protected void
computeMetadata(StreamingMetadataContext ctx)
{
ctx.parallelize(ParallelismStrategy.CONFIGURED
)
...
}
Physical Port States and End of Data
Overview
ExecutableOperators read tokens from input ports and write tokens to output ports. In general, there are no constraints on the number of input and output ports, the number of tokens read and written using these ports, and the order in which tokens are read and written on them. However, ports do change state, and different methods are available in different states.
Input Port States and APIs
Input ports have three states:
Before the first token
On a token
Detached
At the start of the execute() method, an input port is before the first token. The process should not use methods that access the "current token" (for example, asString()) when the port is in this state.
Calls to stepNext() change input port state. The first step advances the input so that it is on the first token, and each later step advances to the next token until end of data is reached. When the end of data is reached, the input port detaches itself from the queue. If there are zero tokens, the first step reaches end of data and detaches immediately.
The process should use methods that access the current token only when the port is on a token. After an input is detached, the process should make no calls to methods that access its current token, and should make no further calls to stepNext().
There is more than one way for an operator to determine input port state. The boolean return value of stepNext() indicates whether the input is on a token after taking the step. For many operators this is enough. See Control Flow Patterns for Execution for more information. For more complex operators, the isAttached(), isOnToken(), and getPosition() methods allow the state to be queried without advancing the port.
After the end of the execute() method, the framework verifies that all input ports are detached. Operators that do not need to read their entire input may call detach() explicitly to detach an input before it reaches end of data. In the case of a model port, the implementation must invoke AbstractModelPort.getModel() at least once.
Output Port States and APIs
Output ports have only two states:
Ready to push a token
Already pushed end of data
At the start of the execute() method, an output port is ready to push a token (the first token). When the port is in this state, the operator may call the various methods that push a token (including push() and pushNull()) any number of times without causing the port to change state.
After the operator pushes all tokens it wishes, it must call pushEndOfData(). After pushing end of data on an output, operators should not call methods that push a token on that port, and should not call pushEndOfData() again.
After the end of the execute() method, the framework verifies that end of data has been pushed on all output ports. In the case of a model port, the implementation must invoke AbstractModelPort.setModel() exactly once.
Control Flow Patterns for Execution
Overview
The execute() method of an ExecutableOperator reads tokens from input ports and writes tokens to output ports. Most operators have very few ports, and their execute() methods tend to use a few simple control flow patterns that satisfy the rules about port state and follow the conventions of null handling.
Control Flow Patterns for Port State
This section includes stepping a single input, sending the end-of-data on output, pushing data on record outputs, and final blocks.
Stepping a Single Input
Typically a node has one input port and needs to step through all tokens of that input:
protected void execute(ExecutionContext ctx) {
    RecordInput input = this.input.getInput(ctx);
 
    //best practice: obtain handles to fields outside of the main loop
    IntValued id = (IntValued)input.getField("id");
    StringValued name = (StringValued)input.getField("name");
 
    ...
    while (input.stepNext()) {
       int idVal = id.asInt();
       String nameVal = name.asString(); 
       // TODO: add processing for current input token
    }
    ...
}
The first call to stepNext() advances the input so that it is on the first token, and each later step advances to the next token, until end of data is reached. When end of data is reached, the input detaches, stepNext() returns false, and the loop stops. The corner case of zero tokens is handled correctly, because the first call to stepNext() returns false.
Notice that we obtain field handles outside of the loop. This avoids incurring the performance hit of a hash map lookup for each row of input.
Pushing End of Data on Outputs
The pushEndOfData() method should be called once for each output port, after all tokens have been pushed on that output. The simplest solution is to place calls to pushEndOfData() at the bottom of the execute method.
protected void execute(ExecutionContext ctx) {
  ...
  while(input.stepNext()) {
    // TODO: add processing for current input token
  }
  output.pushEndOfData();
}
If an operator finishes with an output port a long time before it reaches the end of the execute method, then it may call pushEndOfData() as soon as it has pushed all tokens for that output. Calling pushEndOfData() early can unblock downstream operators, which might be waiting for tokens that have been pushed on the output but are buffered in the current token batch. Be careful to call pushEndOfData() only once for each output port.
Pushing Data on a RecordOutput
A RecordOutput is associated with a number of field registers—similar to the structure of a RecordInput. However, in this case, these registers are settable. When push() is called on the RecordOutput, the current values of the field registers are used as the field values of the record placed in the flow. After a push(), the field registers of the port are reset to null.
protected void execute(ExecutionContext ctx) {
    RecordOutput output = this.output.getOutput(ctx);
 
    //best practice: obtain field registers once outside of the loop
    IntSettable id = (IntSettable }output.getField("id");
    StringSettable name = (StringSettable)output.getField("name");
 
    Map<Integer,String> employees;
    ...
    for (Map.Entry<Integer,String> employee : employees.entrySet()) {
       id.set(employee.getKey());
       name.set(employee.getValue());
       output.push();
    }
    output.pushEndOfData();
}
Finally Blocks
Avoid calls to port methods in finally blocks. This includes calls to stepNext(), detach(), and pushEndOfData(). See the information about abnormal shutdown under Exception Handling in ExecutableOperators for details.
Control Flow Patterns for Null Handling
For SQL compatibility, all token types support null values. A null value represents missing or unknown information. In most cases, any unknown input to a calculation should lead to an unknown output. The most common patterns for null handling are shown here.
Testing a RecordInput
protected void execute(ExecutionContext ctx) {
  while(input.stepNext()) {
    if(input.containsNull()) {
       output.pushNull();
    }
    else {
      // TODO: add processing for current non-null input tokens
    }
  }
  output.pushEndOfData();
}
Notice the use of containsNull() rather than isNull(). The containsNull() method returns true if any field is null valued, which is usually desirable. The isNull() method returns true if all field values are null, in order to match the behavior of a composite IS NULL expression in SQL.
Detach Propagation
The DataFlow engine has an optimization that improves the performance of graphs that are not fully connected. In the case where a graph does not have a sink or an end node that consumes all tokens, DataFlow will detach that node’s inputs and remove that node. The engine will work backwards, removing operators and edges that do not have consumers attached to them.
This potentially has the side effect of completely removing the graph. The fix is to add a sink node (for example, a file writer) to the end of the graph to guarantee that nodes get executed.
Exception Handling in ExecutableOperators
Overview
One often overlooked area of process development is the handling of exceptions. Operators should be robust enough to deal with unexpected input data, but they must be careful to stop work and exit after being interrupted by the framework.
Unexpected Input Data
The DataFlow framework is often used to process large volumes of data. An unexpected value may appear anywhere in the data, including at the very end. The best way to handle such surprises is usually to keep going and provide default output, rather than halting after processing 99,999,999 of 100,000,000 lines.
Unfortunately, many Java APIs take the opposite approach, and throw an Exception if passed an unexpected parameter value. For example, the sub-string methods of java.lang.String issue an IndexOutOfBoundsException if the string is too short.
If your process calls a method that can issue a data-dependent exception, you should consider how to recover. One common recovery strategy is to push a null in the place of the value you were trying to compute.
Catch and Recover (Not Recommended)
try {
   field.set(str.substring(4,9));
   output.push();
}
catch (IndexOutOfBoundsException e) {
   field.setNull();
   output.push();
}
While this approach works, issuing and catching an exception is extremely expensive. It is usually far more efficient, and at least as easy, to test the data and prevent the exception from being issued:
Test and Recover (Recommended)
if (str.length() >= 9) {
   field.set(str.substring(4,9));
   output.push();
}
else {
   field.setNull();
   output.push();
}
Consider Abnormal Shutdown
ExecutableOperators must be careful to stop work and exit after being interrupted by the framework. Such interruption occurs as a result of an abnormal shutdown.
Abnormal Shutdown
If any operators in the executing dataflow graph throw an exception, the framework begins an abnormal shutdown. During abnormal shutdown, the framework:
1. Sets the Thread.interrupted() flag for all running ExecutableOperator threads.
2. Waits for all running ExecutableOperator to exit their execute() methods.
3. Issues a DROperatorException containing the exceptions that caused the abnormal shutdown, along with information identifying the operators that failed.
Handling Abnormal Shutdown
Most operator developers need only a few simple rules in order to correctly handle abnormal shutdown:
Do not catch ProcessCancellationError or its supertypes Error and Throwable.
If an operator thread has been interrupted, as soon as a call to input.stepNext() or output.push() crosses a token batch boundary, the port issues a ProcessCancellationError. As long as this is not caught, the operator will exit its execute() method. A corollary to this rule is to exit the dataflow operator immediately if you catch an InterruptedException. Often, issuing a ProcessCancellationError in response is a good idea, since it breaks normal control flow, and the framework will properly recognize the situation.
Do not call port methods in finally clauses.
After a port throws a ProcessCancellationError, it may be left in an invalid internal state. Therefore, port methods should not be used in Java finally clauses, because this code will be run during an abnormal shutdown. After an abnormal shutdown, the usual port state rules are not checked, so it is not necessary to detach from input ports or push end of data on output ports.
Working with Sparse Records
Consuming Sparse Data
When records ports are created, they can be optionally sparse or dense. Consumers of sparse data can make use of the various FieldIterator interfaces to avoid iterating over the zero elements.
The following is an example sparse consumer. The code calculates the sum of non-zero double fields:
public class SparseConsumer extends ExecutableOperator{

    //...

    @Override
    protected void execute(ExecutionContext ctx) {
        RecordInput input = this.input.getInput(ctx);
        String [] myDoubleFields = {...};
        DoubleValuedIterator doubleFields =
                     (DoubleValuedIterator)input.newFieldIterator(
                           TokenTypeConstant.DOUBLE,myDoubleFields);
        double sum = 0;
        while ( input.stepNext() ) {
            doubleFields.resetIteration();
            while ( doubleFields.stepNext() ) {
                sum += doubleFields.asDouble();
            }
        }
    }
}
Note that the FieldIterator interface is available regardless of whether the data is sparse or dense. This enables code written for sparse data to operate relatively efficiently on dense data. The converse is not true: code written to the dense interface will not operate efficiently on sparse data because it requires visitation of every field. However, the dense interface is more convenient to use. Also, depending on the application, it may not be possible to process sparse data more efficiently.
Producing Sparse Data
Sparse producers, on the other hand, use the same interface as dense producers. The following is an example of a sparse producer:
public class SparseProducer extends ExecutableOperator {

    private final RecordPort output = newRecordOutput("output");
   
    @Override
    protected void computeMetadata(StreamingMetadataContext ctx) {
        //declare the output to be sparse: that's what makes this a sparse producer
        RecordTokenType myType = TokenTypeConstant.sparseRecord(...);
output.setType(ctx,myType);
    }


    @Override
    protected void execute(ExecutableContext ctx) {
        RecordOutput output = this.output.getOutput(ctx);
        DoubleSettable[] outputs = output.getFields();
        for ( int i = 0; i < 10000; i++ ) {
            outputs[0].set(1);
            outputs[10000].set(4.5);
            output.push();
        }
    }
}
Notice that the output port is constructed in almost the same way as a dense port. The only difference is that the type is "sparseRecord" rather than "record". In the execute() method, we are populating only columns 0 and 10,000; the others are implicitly zero. Internally, because this is a sparse port, we only need to represent the two columns that are populated on each row.
Note:  It is important to note that the unspecified values for sparse ports are implicitly zero. This is different from the dense case, where unspecified values are implicitly null.