Writing a Function
Overview
While composing operators together is one means of implementing complex functionality in DataFlow, it is not the only mechanism. DataFlow also has the concept of functions—given a record, a function produces a scalar value. Any operator that produces exactly one output record for each input record and the output record depends only on the current input record can be alternately expressed as a collection of functions, one for each output field.
Using functions can be more efficient than using operators, as they avoid communication overheads associated with operators. Because of this, there are a number of operators which use functions to provide pluggable functionality. The two most notable examples of this are
Using the FilterRows Operator to Filter by Predicate, which takes a predicate function for filtering the input records, and
Using the DeriveFields Operator to Compute New Fields, which applies one or more functions to the input records.
A function consists of two pieces:
A definition
Describes the function: how many and what type of arguments are allowed, and the data type of the value returned. This information can be used to determine whether a function is valid in a given context by checking if all the fields required by the function are present in the context, or if the expected types are compatible with those in the context.
An evaluator
Implements the function; the actual code used to compute the value. Users of the function will acquire an evaluator bound to a given context for computing the function's result.
For the purposes of illustrating the basics of writing new functions, the remainder of this section will walk through the creation of a simple function which adds two integer values together. For information about writing more complex functions refer to
Advanced Function Techniques, which builds upon the example here.
Implementing the Evaluator
The evaluator class for a function must implement the
FunctionEvaluator interface. Implementations are expected to have a single constructor. The first argument of the constructor is always the result buffer. This buffer will be some subclass of
ScalarSettable. Subsequent arguments are the inputs of the function. Any inputs which represent a variable—for example, our adder takes two variables, the values to be added together—need to be subclasses of
ScalarValued. Arguments which represent constants (of which we have none in our example) can be of any type.
Returning to our evaluator, since we are writing an adder for integers, we expect the result to be an integer value. Thus the first argument, the result buffer, is an
IntSettable. As noted earlier we also have two variable arguments which are expected to be integers. Therefore the second and third arguments are
IntValued.
Creating an evaluator class
import com.pervasive.dataflow.functions.FunctionEvaluator;
import com.pervasive.dataflow.tokens.scalar.IntSettable;
import com.pervasive.dataflow.tokens.scalar.IntValued;
/* Compute x+y */
public class MyAdder implements FunctionEvaluator {
private final IntSettable result;
private final IntValued left;
private final IntValued right;
public MyIntAdder(IntSettable out, IntValued x, IntValued y) {
result = out;
left = x;
right = y;
}
...
}
The key piece of an evaluator is the evaluate() method defined in the
FunctionEvaluator interface. This is where the logic of the function is placed. Some important points to remember about the evaluate() method:
• It may be called several times. Each time it is called, all
TokenValued arguments may have a different value from the previous call.
• On exit, it is expected that the result of the function has been written to the output buffer.
In our example, the implementation is straightforward:
Adding the function logic
@Override
public void evaluate() {
if (left.isNull() || right.isNull()) {
result.setNull();
}
else {
result.set(left.asInt() + right.asInt());
}
}
Remember that values in DataFlow may be NULL, representing an unknown or missing value. Implementers of functions need to determine the appropriate behavior when one or more of the arguments are null valued. Typically the best behavior is to yield a NULL result, as demonstrated in the case above.
Non-deterministic Evaluation
Some functions are non-deterministic. That is, for the exact same input values, they may return a different result. Examples of functions like this would be:
• A function returning the current time. Each invocation returns a different result since each invocation is at a different point in time.
• A function returning random data.
The evaluator class for a function like this needs to implement the
NondeterministicFunctionEvaluator interface. This indicates to the DataFlow framework that the function is non-deterministic and is therefore not eligible for certain optimizations. Failure to do so may yield unexpected results under some situations.
Defining the Function
Writing an evaluator is only the first step. It is also necessary to provide a description of the function so that consumers may use the function. This associates the evaluator with its arguments, both variable and constant. To assist in the construction of these descriptions, the
ScalarFunctionDescriptor class is provided. The static
ScalarFunctionDescriptor.define() method creates an instance of a function which can be used.
Continuing the example adder of two integers, we write a factory method to create the function definition. Each call to this method defines a new instance of the function. Best practice is to use static factory methods for defining functions. In this way, you can implement any required argument checking while also hiding the details of the definition and evaluator.
Creating a definition
import static com.pervasive.dataflow.functions.ScalarFunctionDescriptor.define;
import static com.pervasive.dataflow.functions.ScalarFunctionDescriptor.arg;
public static ScalarValuedFunction add(ScalarValuedFunction leftExpr, ScalarValuedFunction rightExpr) {
return define("MyMath.add", TokenTypeConstant.INT, MyAdder.class,
arg(leftExpr, TokenTypeConstant.INT), arg(rightExpr, TokenTypeConstant.INT));
}
The above code defines a description of a new function instance and declares the following:
• The function is called "MyMath.add"; this is the name which will be printed out in messages related to the function. It is recommended that this name refer back to the factory method to make the error message more usable.
• The function returns an integer value. The evaluator expects an
IntSettable object to use as the result buffer.
• The evaluator is implemented by the MyAdder class. Whenever an evaluator is required for some context, an instance of this class will be constructed and passed a result buffer and the values.
• The function takes two arguments which must evaluate to integer values in the evaluation context. When an evaluator is constructed, these arguments will be passed to the constructor in the order specified here, starting after the result buffer. Only arguments which are themselves functions and have a restriction on the type to which they evaluate require use of the arg() static method. Otherwise, arguments can be passed directly.
Note that we have declared the arguments to our factory method to be functions, not value types such as int or
IntValued. Generally this is a good idea as it provides more flexibility. Instead of having to write a new function for every desired combination, functions can be passed to other functions to create even more complex expressions. Unfortunately, the generic nature of the function-based signature also makes usage more difficult, as all callers need to construct functions for even the simplest cases. To address this, define additional convenience signatures to simplify use in common cases. The
ConstantReference and
FieldReference classes define static methods for constructing functions which return a constant and the value of a field respectively.
Referencing fields and constants
import static com.pervasive.dataflow.functions.ConstantReference.constant;
import static com.pervasive.dataflow.functions.FieldReference.value;
public static ScalarValuedFunction addConstant(ScalarValuedFunction v, int c) {
if (c < 1) {
throw new IllegalArgumentException("Only positive constants can be added");
}
return add(v, constant(c));
}
public static ScalarValuedFunction addFields(String l, String r) {
return add(value(l), value(r));
}
Above are examples of convenience factory methods for common cases. The first adds a positive value to the result of an expression. The second adds the values of two fields in the input record. Both convert the input arguments into functions so that the user of the function isn't forced to do so. Also notice that the constraint that the constant be positive is enforced in the factory method itself. Checks on the validity of any arguments which are not functions must be handled by the function implementer. Checking in the factory method is a good idea as it catches errors early, at the first point the problem is known, as well as before the graph is executed. Checks in the evaluator will not occur until later, when the graph is executed. This can partially mask the source of the error, as it is raised far from the original call passing the invalid argument.
Using the Function
Using the newly created function is simple; just call the factory method to create an instance. This can then be used like any other function:
ScalarValuedFunction f = MyMath.addFields("width","height");
DeriveFields calcTotal = new DeriveFields(FieldDerivation.derive("perimeterHalved", f));