Building DataFlow Applications : Customizing and Extending DataFlow Functionality : Writing a Function : Advanced Function Techniques
 
Share this page                  
Advanced Function Techniques
In Writing a Function, a simple adder taking two integers was used to illustrate how to implement a function. Unfortunately, not all functions are quite so simple. In fact, the simple adder itself can be made more complex.
Dynamic Return Types and Complex Type Constraints
Suppose we wanted to extend the adder to accept any numeric type, from double to int, just as it does in Java. In this case, the result is no longer always an integer. While the result could always be treated as a double, we would like to exhibit the same behavior as the Java language—adding two ints yields an int, adding two doubles yields a double, and an int and a double yields a double. One way to do this is to define multiple factory methods, each defining slightly different functions for each possible output type. However, forcing the end user to specify the desired result type every time makes the function harder to use. Ideally, we'd like to determine the output type automatically from the input types.
To handle the case where the result type may depend on the arguments, DataFlow defines the FunctionTyper interface to use when defining a function. There are two different types which are needed for a function:
An upper bound on the result type. This is not necessarily the final type, but rather a promise that the final type will be assignable to this type. If no bound can be provided, TokenTypeConstant.SCALAR should be returned. The upper bound is important for early detection of incompatible types when using a function as an argument to another function. When the upper bound is computed for a function, only the upper bounds of its arguments are known.
The actual result type. This type must be a concrete type; TokenTypeConstant.SCALAR is not a valid result type. When the result type is computed for a function, the result types of its arguments are known.
Dynamically typing a function
public class AdderTyper implements FunctionTyper {

    @Override
    public ScalarTokenType computeUpperBound(FunctionContext ctx) {
        return TokenTypeConstant.DOUBLE;
    }
 
    @Override
    public ScalarTokenType computeResultType(FunctionContext ctx) {
        return TypeUtil.widestType(ctx.getArgumentType(0), ctx.getArgumentType(1));
    }
}

public static ScalarValuedFunction add(ScalarValuedFunction l, ScalarValuedFunction r) {
    return define("MyMath.add", new AdderTyper(), MyAdder.class,
        arg(leftExpr, TokenTypeConstant.DOUBLE), arg(rightExpr, TokenTypeConstant.DOUBLE));
}
Above, since we know the output type will always be a number, we can safely return TokenTypeConstant.DOUBLE as the upper bound. The real result may be a narrower type—for instance, if we add two ints—but it will be no wider than a double. When determining the actual result type, we know it will be the wider of the two input types, as we widen the narrower type to match the other.
As it turns out, this widening behavior is fairly common so there is already a class in DataFlow which can do the same thing called WidestInputTyper. Referring back to our original class we could have specified the upper bound as follows:
public static ScalarValuedFunction add(ScalarValuedFunction l, ScalarValuedFunction r) {
    return define("MyMath.add", new WidestInputTyper(TokenTypeConstant.DOUBLE), MyAdder.class,
        arg(leftExpr, TokenTypeConstant.DOUBLE), arg(rightExpr, TokenTypeConstant.DOUBLE));
}
Although our example did not require it, a FunctionTyper can also be used to perform complex type checks on arguments. For instance, we might have wanted to restrict the arguments to be of the same type. While this is not particularly meaningful for our adder, it might be useful if we were implementing an equals predicate. Unfortunately the type checking specified in the definition only considers each argument individually, so it cannot enforce such a constraint. However, due to the fact that the methods specified by FunctionTyper are provided the types of all the arguments, it is the ideal place to put any additional logic to enforce these more complex constraints. If a constraint is violated, the method should throw an exception indicating the error.
Type-specific Evaluators
In the previous section, we never addressed the issue of how we would perform the addition. We could rewrite the evaluator to treat the inputs as doubles and truncate the result to the correct type. However, this approach is undesirable for a number of reasons. For performance and correctness it is better to write an evaluator for each possible return type.
To support the ability to choose different evaluators based on arguments, DataFlow defines the EvaluatorFactory interface to use when defining a function. The EvaluatorFactory is passed all the arguments which are intended for the evaluator's constructor and returns a new evaluator.
Choosing an appropriate evaluator
public class AdderFactory implements EvaluatorFactory {
    @Override
    public FunctionEvaluator contructEvaluator(ScalarSettable result, Object[] arguments) {
        ScalarTokenType resultType = result.getType();
        if (TokenTypeConstant.DOUBLE.equals(resultType)) {
            return new MyDoubleAdder((DoubleSettable)result, (DoubleValued)arguments[0], (DoubleValued)arguments[1]);
        }
        else if (TokenTypeConstant.FLOAT.equals(resultType)) {
            return new MyFloatAdder((FloatSettable)result, (FloatValued)arguments[0], (FloatValued)arguments[1]);
        }
        else if (TokenTypeConstant.LONG.equals(resultType)) {
            return new MyLongAdder((LongSettable)result, (LongValued)arguments[0], (LongValued)arguments[1]);
        }
        else if (TokenTypeConstant.INT.equals(resultType)) {
            return new MyIntAdder((IntSettable)result, (IntValued)arguments[0], (IntValued)arguments[1]);
        }
        throw new RuntimeException("Unsupported result type!");
    }
}
 
public static ScalarValuedFunction add(ScalarValuedFunction l, ScalarValuedFunction r) {
    return define("MyMath.add", new WidestInputTyper(TokenTypeConstant.DOUBLE), new AdderFactory(),
        arg(leftExpr, TokenTypeConstant.DOUBLE), arg(rightExpr, TokenTypeConstant.DOUBLE)); 
In our particular case, we want to choose an evaluator based only on the desired return type. So we get the result type and directly create an evaluator. Note that we cast the arguments to specific types. It is generally better to write evaluators using the most specific type possible so that each call to evaluate() doesn't have to perform a cast. While the cost of the cast is fairly small, over millions of records it can add up quickly.
Since choosing an evaluator based on return type is not uncommon, the library already has a generic factory for just this purpose called ResultMappedFactory. To use ResultMappedFactory requires a list of type-to-evaluator mappings:
public static ScalarValuedFunction add(ScalarValuedFunction l, ScalarValuedFunction r) {
   return define("MyMath.add", new WidestInputTyper(TokenTypeConstant.DOUBLE),
           new ResultMappedFactory(
               use(TokenTypeConstant.DOUBLE, MyDoubleAdder.class),
               use(TokenTypeConstant.FLOAT, MyFloatAdder.class),
               use(TokenTypeConstant.LONG, MyLongAdder.class),
               use(TokenTypeConstant.INT, MyIntAdder.class)),
           arg(leftExpr, TokenTypeConstant.DOUBLE), arg(rightExpr, TokenTypeConstant.DOUBLE));
}
If we had needed to choose based on input types—for example writing a function implementing an equality test—there is a similar generic factory defined in the library called InputMappedFactory.
Variable Numbers of Arguments
The example function we've constructed is a binary function; it takes exactly two arguments. We could extend our adder to take any number of arguments and then add a list of values to get the total sum. DataFlow supports variable length argument lists for functions to provide this functionality. To use one:
The evaluator needs to declare a list or array of elements of some subtype of ScalarValued in the final argument position.
The variable length list must be in the final argument position of the definition. Lists and arrays of ScalarValuedFunction elements are not allowed in any other position.
Use the varargs() method to apply a type constraint to all the arguments.
Below we change our adder to accept any number of arguments. For brevity we omit changes to the logic of the evaluator.
Accepting a variable number of arguments
static import com.pervasive.dataflow.functions.ScalarFunctionDescriptor.varargs;

public class MyDoubleAdder extends FunctionEvaluator {

   public MyDoubleAdder(DoubleSettable result, DoubleValued[] args) {
       ...
   }

   ...
}

public static ScalarValuedFunction add(ScalarValuedFunction... addends) {
    return define("MyMath.add", new WidestInputTyper(TokenTypeConstant.DOUBLE),
           new ResultMappedFactory(
               use(TokenTypeConstant.DOUBLE, MyDoubleAdder.class),
               use(TokenTypeConstant.FLOAT, MyFloatAdder.class),
               use(TokenTypeConstant.LONG, MyLongAdder.class),
               use(TokenTypeConstant.INT, MyIntAdder.class)),
           varargs(Arrays.asList(addends), TokenTypeConstant.DOUBLE));
}
Serialization
The definition of a function needs to be JSON serializable. This means that everything passed to the define() method must also be JSON serializable. The only exceptions to this rule are function evaluators; evaluators do not need to be serializable.
When implementing a FunctionTyper or EvaluatorFactory, remember to make the class serializable. In both of the cases above, since our classes contained no fields and had a no-argument constructor, they were trivially serializable and required no additional work. Refer to Customizing Operator Serialization for more information on making classes JSON serializable.