Processing Rows Using User-defined Scripts
Using the RunScript Operator
The
RunScript operator processes rows using user-defined scripts. Calls to third-party script engines are done through the javax.scripting APIs of
JSR 223. The script engine must support the Compilable interface.
Operation is as follows:
1. Bind variables into the script’s context.
2. Compile all scripts.
3. Run the script provided by the beforeFirstRecordScript property (if given).
4. For every row of input:
a. Move data from input ports to script context.
b. Run the script provided by the onEveryRecordScript property.
c. Move data from script context to output ports.
5. Run the script provided by the afterLastRecordScript property (if given).
Binding and retrieval are done using the names given to the input and output fields from their associated types.
Note that this operator supports running in a distributed context, whether it is locally partitioned or within a cluster. Each instance of the operator executes the previous operation sequence. This implies that the before and after scripts only affect the scripting environment within which they are run. The environments are not shared across instances.
Code Example
The example below reads an ARFF file that contains weather information. We will assume there is one run of data per day. The weather data includes the fields: outlook, temperature, humidity, windy, and play. The goal is to inject the previous day’s outlook into each day’s record. The first day will not have a previous day’s outlook and so should be null.
The beforeFirstRecordScript property is set to a code fragment that initializes a previousOutlook variable. This variable is then used in the onEveryRecordScript script to set the previous day’s outlook. Note that the parallelism of the application is set to 1. This is required since the usage of the
RunScript operator in this case introduces a data dependency. Any sort of partitioning of the data would break that data dependency. Setting the parallelism to 1 enforces that no data partitioning will take place.
JavaScript is used by setting the engine name to Rhino. The version of the engine is also set. The version can be important if a particular feature of the language or engine is needed.
Using the RunScript operator in Java
import com.pervasive.dataflow.graphs.EngineConfig;
import com.pervasive.dataflow.graphs.LogicalGraph;
import com.pervasive.dataflow.graphs.LogicalGraphFactory;
import com.pervasive.dataflow.io.FileClient;
import com.pervasive.dataflow.operators.io.textfile.ReadARFF;
import com.pervasive.dataflow.operators.scripting.RunScript;
import com.pervasive.dataflow.operators.sink.LogRows;
import com.pervasive.dataflow.types.RecordTokenType;
import com.pervasive.dataflow.types.TokenTypeConstant;
import com.pervasive.dataflow.types.TypeUtil;
/**
* Read in the the weather data to inject yesterday's weather into each record.
* The assumption is that each row of data is a single day's weather summary.
* Inject the previous days outlook into the current row.
*/
public class YesterdaysWeather {
public static void main(String[] args) {
// Create an empty logical graph
LogicalGraph graph = LogicalGraphFactory.newLogicalGraph("ScriptAccounts");
// Create a delimited text reader for the accounts data
ReadARFF reader = graph.add(new ReadARFF("weather.arff"));
// Get the meta-data of the weather data to discover the record type
RecordTokenType inputType = reader.discoverMetadata(FileClient.basicClient()).getSchema().getTokenType();
// Create the script output type. Use the input type merged with
// a single new field: yesterdaysOutlook.
RecordTokenType scriptOutputType = TypeUtil.merge(
inputType,
TokenTypeConstant.record(TokenTypeConstant.STRING("yesterdaysOutlook")));
// Use RunScript to execute a JavaScript snippet to inject yesterdays outlook
RunScript runScript = graph.add(new RunScript());
runScript.setEngineName("Rhino"); // Use Rhino JavaScript engine
runScript.setLanguageVersion("1.6"); // Use language version 1.6
runScript.setOutputType(scriptOutputType);
runScript.setBeforeFirstRecordScript("var previousOutlook = null;");
runScript.setOnEveryRecordScript("yesterdaysOutlook = previousOutlook; previousOutlook = outlook;");
// Connect the reader and the script runner
graph.connect(reader.getOutput(), runScript.getInput());
// Log the output of the script run
LogRows logger = graph.add(new LogRows(1));
graph.connect(runScript.getOutput(), logger.getInput());
// Set parallelism to 1 due to data dependency (on previous days outlook)
graph.compile(EngineConfig.engine().parallelism(1)).run();
}
}
The output of the sample application is captured as follows. Note that the output contains a new field: yesterdaysOutlook. The output of the log rows operator shows that the value of yesterdaysOutlook is null for the first record as expected. Subsequent rows contain the previous row’s value as expected.
INFO: Executing phase 0 graph: {[AssignSplits, LogRows, ParseSplits, RunScript]}
INFO: Assigned 1 total splits across 1 nodes, with 1 non-local
INFO: Input type is {"type":"record","representation":"DENSE_BASE_NULL","fields":[{"outlook":{"type":"enum","values":["sunny","overcast","rainy"]}},{"temperature":{"type":"double"}},{"humidity":{"type":"double"}},{"windy":{"type":"enum","values":["TRUE","FALSE"]}},{"play":{"type":"enum","values":["yes","no"]}},{"yesterdaysOutlook":{"type":"string"}}]}
INFO: 0: row 1 is [sunny, 85.0, 85.0, FALSE, no, ]
INFO: 0: row 2 is [sunny, 80.0, 90.0, TRUE, no, sunny]
INFO: 0: row 3 is [overcast, 83.0, 86.0, FALSE, yes, sunny]
INFO: 0: row 4 is [rainy, 70.0, 96.0, FALSE, yes, overcast]
INFO: 0: row 5 is [rainy, 68.0, 80.0, FALSE, yes, rainy]
INFO: 0: row 6 is [rainy, 65.0, 70.0, TRUE, no, rainy]
INFO: 0: row 7 is [overcast, 64.0, 65.0, TRUE, yes, rainy]
INFO: 0: row 8 is [sunny, 72.0, 95.0, FALSE, no, overcast]
INFO: 0: row 9 is [sunny, 69.0, 70.0, FALSE, yes, sunny]
INFO: 0: row 10 is [rainy, 75.0, 80.0, FALSE, yes, sunny]
INFO: 0: row 11 is [sunny, 75.0, 70.0, TRUE, yes, rainy]
INFO: 0: row 12 is [overcast, 72.0, 90.0, TRUE, yes, sunny]
INFO: 0: row 13 is [overcast, 81.0, 75.0, FALSE, yes, overcast]
INFO: 0: row 14 is [rainy, 71.0, 91.0, TRUE, no, overcast]
INFO: Counted 14 rows
INFO: Phase 0 graph: {[AssignSplits, LogRows, ParseSplits, RunScript]} completed in 617 milliseconds
Using the RunJavaScript operator in RushScript
importClass(com.pervasive.datarush.types.TokenTypeConstant);
importClass(com.pervasive.datarush.types.TypeUtil);
importClass(com.pervasive.datarush.io.FileClient);
// Create a delimited text reader for theaccounts data
var reader = dr.readARFF({source:"weather.arff"});
var scriptOutputType = TokenTypeConstant.record(TokenTypeConstant.STRING("yesterdaysOutlook"));
// Use RunJavaScript to execute a JavaScript snippet to inject yesterdays outlook
var runJavaScript = dr.runJavaScript(reader, {languageVersion:160,
outputType: scriptOutputType,
beforeFirstRecordScript:'var previousOutlook = null;',
onEveryRecordScript:'yesterdaysOutlook = previousOutlook;
previousOutlook = outlook;'});
// Log the output of the script run
var logRows = dr.logRows(runJavaScript,{logFrequency:1});
// Set parallelism to 1 due to data dependency (on previous days outlook)
dr.parallelism(1);
dr.execute();
Properties
The
RunScript operator supports the following properties.
Ports
The
RunScript operator provides a single input port.
The
RunScript operator provides a single output port.
Using the RunJavaScript Operator
The
RunJavaScript operator processes rows using user-defined scripts written in JavaScript. The Rhino JavaScript engine is used to compile and execute the scripts. The execution of the operator is as follows:
1. Compile all scripts.
2. Bind input fields into the script’s context. Input field names may have to be transformed to valid script variable names.
3. Run the beforeFirstRecordScript.
4. For every row of input:
a. Move data from input ports to script context.
b. Run the onEveryRecordScript.
c. Move data from script context to output ports.
5. Run the afterLastRecordScript.
Binding and retrieval are done using the names given to the input and output fields from their associated types. Native JavaScript types are supported where possible.
Note that this operator supports running in a distributed context, whether that be from a local partition or within a cluster. Each instance of the operator executes the above operation sequence. This implies that the before and after scripts only affect the scripting environment within which they are run. The environments are not shared across operator instances.
By default, the highest level of optimization is used for compiling the scripts. This will provide the most efficient and highest performing script execution. Setting the optimization level to <code>-1</code> turns on interpreted mode, disabling the performance gains of compiled scripts. There should never be a need to lower the optimization level. However, compiled scripts can type variables differently at times. If this causes an issues, use interpreted mode by setting the optimization level to -1.
By default, the JavaScript language version is set to version 1.8. As of this writing, this was the highest language version supported by the Rhino engine. Set the language version lower to disable newer language features as desired.
Code Examples
The following example demonstrates using the
RunJavaScript operator. For more information about this example and to see an alternate implementation, see
Processing Rows Using User-defined Scripts.
Using the RunScript operator in Java
import com.pervasive.dataflow.graphs.EngineConfig;
import com.pervasive.dataflow.graphs.LogicalGraph;
import com.pervasive.dataflow.graphs.LogicalGraphFactory;
import com.pervasive.dataflow.io.FileClient;
import com.pervasive.dataflow.operators.io.textfile.ReadARFF;
import com.pervasive.dataflow.operators.scripting.RunJavaScript;
import com.pervasive.dataflow.operators.sink.LogRows;
import com.pervasive.dataflow.types.RecordTokenType;
import com.pervasive.dataflow.types.TokenTypeConstant;
import com.pervasive.dataflow.types.TypeUtil;
/**
* Read in the the weather data to inject yesterday's weather into each record.
* The assumption is that each row of data is a single day's weather summary.
* Inject the previous days outlook into the current row.
*/
public class YesterdaysWeather {
public static void main(String[] args) {
// Create an empty logical graph
LogicalGraph graph = LogicalGraphFactory.newLogicalGraph("ScriptAccounts");
// Create a delimited text reader for the accounts data
ReadARFF reader = graph.add(new ReadARFF("weather.arff"));
// Get the meta-data of the weather data to discover the record type
RecordTokenType inputType = reader.discoverMetadata(FileClient.basicClient()).getSchema().getTokenType();
// Create the script output type. Use the input type merged with
// a single new field: yesterdaysOutlook.
RecordTokenType scriptOutputType = TypeUtil.merge(
inputType,
TokenTypeConstant.record(TokenTypeConstant.STRING("yesterdaysOutlook")));
// Use RunJavaScript to execute a JavaScript snippet to inject yesterdays outlook
RunJavaScript runJavaScript = graph.add(new RunScript());
runJavaScript.setLanguageVersion(160); // Use language version 1.6
runJavaScript.setOutputType(scriptOutputType);
runJavaScript.setBeforeFirstRecordScript("var previousOutlook = null;");
runJavaScript.setOnEveryRecordScript("yesterdaysOutlook = previousOutlook; previousOutlook = outlook;");
// Connect the reader and the script runner
graph.connect(reader.getOutput(), runJavaScript.getInput());
// Log the output of the script run
LogRows logger = graph.add(new LogRows(1));
graph.connect(runJavaScript.getOutput(), logger.getInput());
// Set parallelism to 1 due to data dependency (on previous days outlook)
graph.compile(EngineConfig.engine().parallelism(1)).run();
}
}
Properties
The
RunJavaScript operator supports the following properties.
Ports
The
RunJavaScript operator provides a single input port.
The
RunJavaScript operator provides a single output port.