Advanced Graph Execution Operators
When executing a graph in DataFlow often there are various parameters that need to be changed in order to ensure the graph is using the correct sink or source. Usually this is accomplished with the use of graph overrides provided before the graph is run. However in some cases this information may not be available until runtime often due to the fact that the data is created by the graph or otherwise dynamically generated. In these situations it may be useful to utilize a SubJobExecutor.
For more information, refer to the following topics:
SubJobExecutor Operator
The
SubJobExecutor operator can be used to execute JSON serialized subgraphs within the current workflow. This can allow you to dynamically run a subgraph within the currently executing graph with alternative parameters or configuration. Within the subgraph itself two special operators can be used to dynamically configure the graph or to extract data produced by the the subgraph. A MockableExternalRecordSource explicitly named 'Start Node' and a MockableExternalRecordSink explicitly named 'Stop Node' can be used to utilized this feature.
The input port should consist of a record containing one or more string fields with the first field always consisting of the path to the JSON serialized graph that should be executed. Any remaining fields in the input are considered override properties for the 'Start Node' with the field name being the key and the contents of the field used as the value.
The override port can be used to override any applicable value within the graph and additionally contains several special overrides for altering JDBC connections. The first field should contain keys and the second field should contain the associated values. These work the same as when defining override values for a graph on the command line interface.
Code Example
The example below executes a simple self-contained graph that performs a character count operation on the input properties. To generate a serialized graph the com.pervasive.datarush.json.JSON class can be used on a LogicalGraph to get the serialized form or KNIME can be used to export a .dr file that contains the JSON graph and optionally the engine configuration.
Using the SubJobExecutor operator in Java
import static com.pervasive.datarush.types.TokenTypeConstant.INT;
import static com.pervasive.datarush.types.TokenTypeConstant.record;
import com.pervasive.datarush.graphs.LogicalGraph;
import com.pervasive.datarush.graphs.LogicalGraphFactory;
import com.pervasive.datarush.operators.graph.SubJobExecutor;
import com.pervasive.datarush.operators.io.textfile.ReadDelimitedText;
import com.pervasive.datarush.operators.sink.LogRows;
public class SubJobCharCount {
public static void main(String[] args) {
// Create an empty logical graph
LogicalGraph graph = LogicalGraphFactory.newLogicalGraph("SubJobCharCount");
// The file that specifies the path of the serialized graph and any associated properties
ReadDelimitedText reader = graph.add(new ReadDelimitedText("inputfilepath.txt"));
// Use the SubJobExecutor to calculate the values and pass them back to the main graph
SubJobExecutor executor = graph.add(new SubJobExecutor());
executor.setOutputType(record(INT("charCount")));
// Log the output of the subjob
LogRows logger = graph.add(new LogRows());
// Connect the operators and run grap
graph.connect(reader.getOutput(), executor.getInput());
graph.connect(executor.getOutput(), logger.getInput());
graph.run();
}
}
Example serialized CharCount.dr file for SubJobCharCount
{
"settings" : {
"name" : "CharCount",
"relativePath" : "CharCount"
},
"operators" : {
"Start Node" : {
"operator" : {
"@type" : "mockableExternalRecordSource",
"properties" : {
"body" : "\"Orange is the new Black\""
}
}
},
"Derive Fields" : {
"operator" : {
"@type" : "deriveFields",
"derivedFields" : [ {
"@type" : "field",
"name" : "charCount",
"function" : {
"function" : {
"description" : {
"name" : "Strings.length",
"bounds" : [ {
"type" : "string"
} ],
"typer" : {
"@class" : "com.pervasive.datarush.functions.FixedType",
"type" : {
"type" : "int"
}
},
"factory" : {
"@class" : "com.pervasive.datarush.functions.ReflectiveFactory",
"evaluator" : "com.pervasive.datarush.functions.evaluators.Length"
}
},
"arguments" : [ [ "com.pervasive.datarush.functions.ScalarFunctionInstance", {
"description" : {
"name" : "Conversions.toString",
"typer" : {
"@class" : "com.pervasive.datarush.functions.FixedType",
"type" : {
"type" : "string"
}
},
"factory" : {
"@class" : "com.pervasive.datarush.functions.ReflectiveFactory",
"evaluator" : "com.pervasive.datarush.functions.evaluators.ConvertToString"
}
},
"arguments" : [ [ "com.pervasive.datarush.functions.FieldReference", "body" ] ]
} ] ]
}
}
}, {
"@type" : "field",
"name" : "testInt",
"function" : {
"constant" : [ "int", 1 ]
}
}, {
"@type" : "field",
"name" : "testNumeric",
"function" : {
"constant" : [ "double", 3.141592653589793 ]
}
} ],
"dropUnderivedFields" : true
}
},
"Stop Node" : {
"operator" : {
"@type" : "mockableExternalRecordSink"
}
}
},
"connections" : [ {
"from" : "CharCount.Start Node.outputs.output",
"to" : "CharCount.Derive Fields.inputs.input"
}, {
"from" : "CharCount.Derive Fields.outputs.output",
"to" : "CharCount.Stop Node.inputs.input"
} ]
}
Properties
The
SubJobExecutor operator supports the following properties.
Ports
The
SubJobExecutor operator provides the following input ports.
The SubJobExecutor operator provides a single output port.
MockableExternalRecordSource Operator
The
MockableExternalRecordSource operator can be used with the SubJobExecutor to insert parameter data. When used with a SubJobExecutor this node should be explicitly named “Start Node”.
Additionally this operator can be used to generate key-value properties that can be used by connected operators. These can be provided directly on the input port or with a file that contains the property values, however both methods cannot be used together. The operator will check for input from the port and if it is not available then it will use the properties provided either from a file or programmatically. The properties will be provided on the output port of the operator to be used by a SubJobExecutor or any operator that requires property values.
Properties
The
MockableExternalRecordSource operator supports the following properties.
Ports
The
MockableExternalRecordSource operator provides a single input port.
The
MockableExternalRecordSource operator provides a single output port.
MockableExternalRecordSink Operator
The
MockableExternalRecordSink operator can be used with the SubJobExecutor to pass data back to the launcher. When used with a SubJobExecutor this node should be explicitly named 'Stop Node' and there should only ever be a single Stop Node in the executed workflow. The operator passes any data received directly to the output and will try to preserve the distribution and ordering of the data.
Ports
The
MockableExternalRecordSink operator provides a single input port.
The
MockableExternalRecordSink operator provides a single output port.
Last modified date: 03/10/2025