Building DataFlow Applications in Java : DataFlow Operator Library : Processing Rows Using User-defined Scripts
 
Share this page                  
Processing Rows Using User-defined Scripts
Using the RunScript Operator
The RunScript operator processes rows using user-defined scripts. Calls to third-party script engines are done through the javax.scripting APIs of JSR 223. The script engine must support the Compilable interface.
Operation is as follows:
1. Bind variables into the script’s context.
2. Compile all scripts.
3. Run the script provided by the beforeFirstRecordScript property (if given).
4. For every row of input:
a. Move data from input ports to script context.
b. Run the script provided by the onEveryRecordScript property.
c. Move data from script context to output ports.
5. Run the script provided by the afterLastRecordScript property (if given).
Binding and retrieval are done using the names given to the input and output fields from their associated types.
Note that this operator supports running in a distributed context, whether it is locally partitioned or within a cluster. Each instance of the operator executes the previous operation sequence. This implies that the before and after scripts only affect the scripting environment within which they are run. The environments are not shared across instances.
Code Example
The example below reads an ARFF file that contains weather information. We will assume there is one run of data per day. The weather data includes the fields: outlook, temperature, humidity, windy, and play. The goal is to inject the previous day’s outlook into each day’s record. The first day will not have a previous day’s outlook and so should be null.
The beforeFirstRecordScript property is set to a code fragment that initializes a previousOutlook variable. This variable is then used in the onEveryRecordScript script to set the previous day’s outlook. Note that the parallelism of the application is set to 1. This is required since the usage of the RunScript operator in this case introduces a data dependency. Any sort of partitioning of the data would break that data dependency. Setting the parallelism to 1 enforces that no data partitioning will take place.
JavaScript is used by setting the engine name to Rhino. The version of the engine is also set. The version can be important if a particular feature of the language or engine is needed.
Using the RunScript operator in Java
import com.pervasive.dataflow.graphs.EngineConfig;
import com.pervasive.dataflow.graphs.LogicalGraph;
import com.pervasive.dataflow.graphs.LogicalGraphFactory;
import com.pervasive.dataflow.io.FileClient;
import com.pervasive.dataflow.operators.io.textfile.ReadARFF;
import com.pervasive.dataflow.operators.scripting.RunScript;
import com.pervasive.dataflow.operators.sink.LogRows;
import com.pervasive.dataflow.types.RecordTokenType;
import com.pervasive.dataflow.types.TokenTypeConstant;
import com.pervasive.dataflow.types.TypeUtil;

/**
 * Read in the the weather data to inject yesterday's weather into each record.
 * The assumption is that each row of data is a single day's weather summary.
 * Inject the previous days outlook into the current row.
 */
public class YesterdaysWeather {
    public static void main(String[] args) {
        
        // Create an empty logical graph
        LogicalGraph graph = LogicalGraphFactory.newLogicalGraph("ScriptAccounts");
        
        // Create a delimited text reader for the accounts data
        ReadARFF reader = graph.add(new ReadARFF("weather.arff"));
        
        // Get the meta-data of the weather data to discover the record type
        RecordTokenType inputType = reader.discoverMetadata(FileClient.basicClient()).getSchema().getTokenType();
        
        // Create the script output type. Use the input type merged with
        // a single new field: yesterdaysOutlook.
        RecordTokenType scriptOutputType = TypeUtil.merge(
                inputType, 
                TokenTypeConstant.record(TokenTypeConstant.STRING("yesterdaysOutlook")));
        
        // Use RunScript to execute a JavaScript snippet to inject yesterdays outlook
        RunScript runScript = graph.add(new RunScript());
        runScript.setEngineName("Rhino");                   // Use Rhino JavaScript engine
        runScript.setLanguageVersion("1.6");                // Use language version 1.6
        runScript.setOutputType(scriptOutputType);
        runScript.setBeforeFirstRecordScript("var previousOutlook = null;");
        runScript.setOnEveryRecordScript("yesterdaysOutlook = previousOutlook; previousOutlook = outlook;");
        
        // Connect the reader and the script runner
        graph.connect(reader.getOutput(), runScript.getInput());
        
        // Log the output of the script run
        LogRows logger = graph.add(new LogRows(1));
        graph.connect(runScript.getOutput(), logger.getInput());
        
        // Set parallelism to 1 due to data dependency (on previous days outlook)
        graph.compile(EngineConfig.engine().parallelism(1)).run();
    }
}
The output of the sample application is captured as follows. Note that the output contains a new field: yesterdaysOutlook. The output of the log rows operator shows that the value of yesterdaysOutlook is null for the first record as expected. Subsequent rows contain the previous row’s value as expected.
INFO: Executing phase 0 graph: {[AssignSplits, LogRows, ParseSplits, RunScript]}
INFO: Assigned 1 total splits across 1 nodes, with 1 non-local
INFO: Input type is {"type":"record","representation":"DENSE_BASE_NULL","fields":[{"outlook":{"type":"enum","values":["sunny","overcast","rainy"]}},{"temperature":{"type":"double"}},{"humidity":{"type":"double"}},{"windy":{"type":"enum","values":["TRUE","FALSE"]}},{"play":{"type":"enum","values":["yes","no"]}},{"yesterdaysOutlook":{"type":"string"}}]}
INFO: 0: row 1 is [sunny, 85.0, 85.0, FALSE, no, ]
INFO: 0: row 2 is [sunny, 80.0, 90.0, TRUE, no, sunny]
INFO: 0: row 3 is [overcast, 83.0, 86.0, FALSE, yes, sunny]
INFO: 0: row 4 is [rainy, 70.0, 96.0, FALSE, yes, overcast]
INFO: 0: row 5 is [rainy, 68.0, 80.0, FALSE, yes, rainy]
INFO: 0: row 6 is [rainy, 65.0, 70.0, TRUE, no, rainy]
INFO: 0: row 7 is [overcast, 64.0, 65.0, TRUE, yes, rainy]
INFO: 0: row 8 is [sunny, 72.0, 95.0, FALSE, no, overcast]
INFO: 0: row 9 is [sunny, 69.0, 70.0, FALSE, yes, sunny]
INFO: 0: row 10 is [rainy, 75.0, 80.0, FALSE, yes, sunny]
INFO: 0: row 11 is [sunny, 75.0, 70.0, TRUE, yes, rainy]
INFO: 0: row 12 is [overcast, 72.0, 90.0, TRUE, yes, sunny]
INFO: 0: row 13 is [overcast, 81.0, 75.0, FALSE, yes, overcast]
INFO: 0: row 14 is [rainy, 71.0, 91.0, TRUE, no, overcast]
INFO: Counted 14 rows
INFO: Phase 0 graph: {[AssignSplits, LogRows, ParseSplits, RunScript]} completed in 617 milliseconds
Using the RunJavaScript operator in RushScript
importClass(com.pervasive.datarush.types.TokenTypeConstant);
importClass(com.pervasive.datarush.types.TypeUtil);
importClass(com.pervasive.datarush.io.FileClient);
// Create a delimited text reader for theaccounts data
var reader = dr.readARFF({source:"weather.arff"});
var scriptOutputType = TokenTypeConstant.record(TokenTypeConstant.STRING("yesterdaysOutlook"));
// Use RunJavaScript to execute a JavaScript snippet to inject yesterdays outlook
var runJavaScript = dr.runJavaScript(reader, {languageVersion:160,
outputType: scriptOutputType,
beforeFirstRecordScript:'var previousOutlook = null;',
onEveryRecordScript:'yesterdaysOutlook = previousOutlook;
previousOutlook = outlook;'});
// Log the output of the script run
var logRows = dr.logRows(runJavaScript,{logFrequency:1});
// Set parallelism to 1 due to data dependency (on previous days outlook)
dr.parallelism(1);
dr.execute();
Properties
The RunScript operator supports the following properties.
Name
Type
Description
afterLastRecordScript
String
The script to use for clean-up processing. This script is executed after every record within the input flow has been processed.
beforeFirstRecordScript
String
The script to use for initialization processing. This script is executed before the first record of the input data is processed.
engineName
String
The name of the script engine to use.
engineVersion
String
The version of the script engine to use.
languageName
String
The name of the script language to use.
languageVersion
String
The version of the script language to use.
onEveryRecordScript
String
The script to execute for every input record. Changes made to output variables by this script are pushed to the output of the operator.
outputType
The type of the output port of the operator. The output type must include all variables created by the script being executed.
stderrFileName
String
The pathname of the file that will capture the standard error output of the script execution environment. If this is not specified, the error output is not captured.
stdoutFileName
String
The pathname of the file that will capture the standard output of the script execution environment. If this is not specified, the output is not captured.
variables
Map<String,Object>
The collection of variables to be incorporated into the script environment before executing any of the given script source. The key values within the given map will be used as the variable names. The values for each are set to the variable values within the script environment.
Ports
The RunScript operator provides a single input port.
Name
Type
Get Method
Description
input
getInput()
Input data.
The RunScript operator provides a single output port.
Name
Type
Get Method
Description
output
getOutput()
Results of the script execution.
Using the RunJavaScript Operator
The RunJavaScript operator processes rows using user-defined scripts written in JavaScript. The Rhino JavaScript engine is used to compile and execute the scripts. The execution of the operator is as follows:
1. Compile all scripts.
2. Bind input fields into the script’s context. Input field names may have to be transformed to valid script variable names.
3. Run the beforeFirstRecordScript.
4. For every row of input:
a. Move data from input ports to script context.
b. Run the onEveryRecordScript.
c. Move data from script context to output ports.
5. Run the afterLastRecordScript.
Binding and retrieval are done using the names given to the input and output fields from their associated types. Native JavaScript types are supported where possible.
Note that this operator supports running in a distributed context, whether that be from a local partition or within a cluster. Each instance of the operator executes the above operation sequence. This implies that the before and after scripts only affect the scripting environment within which they are run. The environments are not shared across operator instances.
By default, the highest level of optimization is used for compiling the scripts. This will provide the most efficient and highest performing script execution. Setting the optimization level to <code>-1</code> turns on interpreted mode, disabling the performance gains of compiled scripts. There should never be a need to lower the optimization level. However, compiled scripts can type variables differently at times. If this causes an issues, use interpreted mode by setting the optimization level to -1.
By default, the JavaScript language version is set to version 1.8. As of this writing, this was the highest language version supported by the Rhino engine. Set the language version lower to disable newer language features as desired.
Code Examples
The following example demonstrates using the RunJavaScript operator. For more information about this example and to see an alternate implementation, see Processing Rows Using User-defined Scripts.
Using the RunScript operator in Java
import com.pervasive.dataflow.graphs.EngineConfig;
import com.pervasive.dataflow.graphs.LogicalGraph;
import com.pervasive.dataflow.graphs.LogicalGraphFactory;
import com.pervasive.dataflow.io.FileClient;
import com.pervasive.dataflow.operators.io.textfile.ReadARFF;
import com.pervasive.dataflow.operators.scripting.RunJavaScript;
import com.pervasive.dataflow.operators.sink.LogRows;
import com.pervasive.dataflow.types.RecordTokenType;
import com.pervasive.dataflow.types.TokenTypeConstant;
import com.pervasive.dataflow.types.TypeUtil;

/**
 * Read in the the weather data to inject yesterday's weather into each record.
 * The assumption is that each row of data is a single day's weather summary.
 * Inject the previous days outlook into the current row.
 */
public class YesterdaysWeather {
    public static void main(String[] args) {
        
        // Create an empty logical graph
        LogicalGraph graph = LogicalGraphFactory.newLogicalGraph("ScriptAccounts");
        
        // Create a delimited text reader for the accounts data
        ReadARFF reader = graph.add(new ReadARFF("weather.arff"));
        
        // Get the meta-data of the weather data to discover the record type
        RecordTokenType inputType = reader.discoverMetadata(FileClient.basicClient()).getSchema().getTokenType();
        
        // Create the script output type. Use the input type merged with
        // a single new field: yesterdaysOutlook.
        RecordTokenType scriptOutputType = TypeUtil.merge(
                inputType, 
                TokenTypeConstant.record(TokenTypeConstant.STRING("yesterdaysOutlook")));
        
        // Use RunJavaScript to execute a JavaScript snippet to inject yesterdays outlook
        RunJavaScript runJavaScript = graph.add(new RunScript());
        runJavaScript.setLanguageVersion(160);                // Use language version 1.6
        runJavaScript.setOutputType(scriptOutputType);
        runJavaScript.setBeforeFirstRecordScript("var previousOutlook = null;");
        runJavaScript.setOnEveryRecordScript("yesterdaysOutlook = previousOutlook; previousOutlook = outlook;");
        
        // Connect the reader and the script runner
        graph.connect(reader.getOutput(), runJavaScript.getInput());
        
        // Log the output of the script run
        LogRows logger = graph.add(new LogRows(1));
        graph.connect(runJavaScript.getOutput(), logger.getInput());
        
        // Set parallelism to 1 due to data dependency (on previous days outlook)
        graph.compile(EngineConfig.engine().parallelism(1)).run();
    }
}
Properties
The RunJavaScript operator supports the following properties.
Name
Type
Description
afterLastRecordScript
String
The script to use for clean-up processing. This script is executed after every record within the input flow has been processed.
beforeFirstRecordScript
String
The script to use for initialization processing. This script is executed before the first record of the input data is processed.
enforceType
boolean
Whether an exception should be issued if a JavaScript type cannot be converted into the output field type. By default is set to false, which will only set the field to null if a conversion cannot be performed.
languageVersion
String
The version of the JavaScript language to use.
onEveryRecordScript
String
The script to execute for every input record. Changes made to output variables by this script are pushed to the output of the operator.
outputType
The type of the output port of the operator. The output type must include all variables created by the script being executed.
variables
Map<String,Object>
The collection of variables to be incorporated into the JavaScript environment before executing any of the given script source. The key values within the given map will be used as the variable names. The values for each are set to the variable values within the script environment.
Ports
The RunJavaScript operator provides a single input port.
Name
Type
Get Method
Description
input
getInput()
Input data.
The RunJavaScript operator provides a single output port.
Name
Type
Get Method
Description
output
getOutput()
Results of the script execution.