Capturing Data
When composing and executing applications using DataFlow, the application is self-contained. By self-contained it is meant that the code executing the application does not "touch" the data that is being processed.
At times, the need arises to have the executing application access the data within an application directly. For example, an application may be executed to aggregate a set of data. The aggregation results are used to determine the next steps in processing the data (that is, composing the next application to run). Being able to programmatically access the data allows the controlling program to easily use the results of one application to determine how to compose the next.
Likewise, at times it is also required to inject data into an application programmatically. This could be the result of gathering the data in-memory and then wanting to push that data into a DataFlow application. Either way, several operators are available to help with capturing and emitting data for DataFlow applications.
The same holds for model data. A common use case involves executing a DataFlow application to build a predictive model. The code controlling the application execution accesses the produced model to determine if the model meets certain criteria. If the model does not meet the constraints, another model can be built using a different set of properties or combination of fields. In this process, being able to access the model after the application execution is crucial.
The following sections provide details on how to access record and model data directly. Reference each section for specifics on how to use these operators.
Covered Operations
Using the CollectRecords Operator to Capture Records Data
The
CollectRecords operator gathers input data into an in-memory token list. This operator should be used
only in contexts where the data is known to be small because all the data will be loaded into memory.
After this operator is executed within a dataflow graph, the resulting data can be accessed using the getOutput() method. The result is a
RecordTokenList. This list supports iteration and so can be used to peruse each resultant row. The result of invoking the getOutput() method before execution of the graph completes is undefined.
This operator is non-parallel.
WARNING! All input data to this operator is collected into memory. Only use this operator when the data it will consume is known to be of a reasonable size to fit in memory.
Code Example
The code example reads in movie rating data, aggregating the data using the
Group operator. The output from the
Group operator is captured using the
CollectRecords operator. The captured output is accessed after the graph execution has completed successfully. The output records are iterated, dumping out the user identifier (group key) and the average rating.
Using the CollectRecords operator in Java
import com.pervasive.datarush.graphs.LogicalGraph;
import com.pervasive.datarush.graphs.LogicalGraphFactory;
import com.pervasive.datarush.operators.group.Group;
import com.pervasive.datarush.operators.io.textfile.ReadDelimitedText;
import com.pervasive.datarush.operators.sink.CollectRecords;
import com.pervasive.datarush.sequences.record.RecordTokenList;
import com.pervasive.datarush.tokens.TokenUtils;
import com.pervasive.datarush.tokens.record.RecordValued;
/**
* Use the Group operator to aggregate values in the ratings data.
*
*/
public class AggregateRatingsLikeSQL {
public static void main(String[] args) {
// Create an empty logical graph
LogicalGraph graph = LogicalGraphFactory.newLogicalGraph("GroupAndCollect");
// Create a delimited text reader for the "ratings.txt" file
ReadDelimitedText reader = graph.add(new ReadDelimitedText("data/ratings.txt"));
reader.setFieldSeparator("::");
reader.setHeader(true);
// Create a group operator and set aggregations to perform.
// Note the use of "as" to specify the output name of an aggregation.
Group groupRatings = graph.add(new Group());
groupRatings.setKeys(new String[] {"userID"});
groupRatings.setAggregations(
"count(rating) as ratingCount, " +
"min(rating) as minRating, " +
"max(rating) as maxRating, " +
"avg(rating) as avgRating, " +
"stddev(rating) as stddevRating");
// Connect reader to group
graph.connect(reader.getOutput(), groupRatings.getInput());
// Create the record collector
CollectRecords collector = graph.add(new CollectRecords());
collector.setInitialBufferSize(50);
// Connect group to collector to collect the aggregation results
graph.connect(groupRatings.getOutput(), collector.getInput());
// Compile and run the graph
graph.run();
// Step through the aggregation results
RecordTokenList aggResults = collector.getOutput();
for (int i = 0; i < aggResults.size(); i++) {
RecordValued row = aggResults.getToken(i);
System.out.println("userID:" + TokenUtils.asString(row.getField("userID")));
System.out.println("average rating:" + TokenUtils.asString(row.getField("avgRating")));
}
}
}
The output of the application is captured as follows. Note the logs from the application execution are followed by the print output from the application execution.
INFO: Executing phase 0 graph: {[AssignSplits, CollectRecords, ParseSplits, ReducePartialAggregates, UnsortedPartialAggregator]}
INFO: Assigned 1 total splits across 8 nodes, with 1 non-local
INFO: Phase 0 graph: {[AssignSplits, CollectRecords, ParseSplits, ReducePartialAggregates, UnsortedPartialAggregator]} completed in 1159 milliseconds
userID:1; average rating:4.188679245283019
userID:2; average rating:3.7131782945736433
userID:3; average rating:3.9019607843137254
userID:4; average rating:4.190476190476191
userID:5; average rating:3.1464646464646466
userID:6; average rating:3.9014084507042255
userID:7; average rating:4.32258064516129
userID:8; average rating:3.884892086330935
userID:9; average rating:3.7358490566037736
userID:10; average rating:4.074626865671642
Using the CollectRecords operator in RushScript
var collector = dr.collectRecords(data, {initialBufferSize:50});
Properties
The
CollectRecords operator provides one property.
Ports
The
CollectRecords operator provides a single input port.
The
CollectRecords operator provides a single output port.
Using the GetModel Operator to Capture Model Data
The GetModel operator updates an in-memory reference to a model object. This is typically used when a graph is embedded within a larger context where you need to obtain a handle to a model object.
This operator is always implicitly non-parallel; if the source is parallel, a merge must be performed, and thus a merge handler property must be specified.
Code Example
The example code fragment creates a
GetModel operator and connects it with an upstream PMML model producer. After graph execution, the PMML model is accessed by invoking the getModel() method on the operator instance. An alternative is to use a
Reference to the model that can be collected later. This can be used instead of holding on to a reference to the
GetModel operator.
Using the GetModel operator in Java
// Create the get model operator for a PMML model object
GetModel<PMMLModel> modelOp = graph.add(new GetModel<PMMLModel>(PMMLPort.FACTORY));
// Connect the source of the model to the GetModel operator
graph.connect(modelPort, modelOp.getInput());
// Execute the graph
graph.run();
// Access the created model
PMMLModel model = modelOp.getModel();
Properties
The
GetModel operator provides the following properties.
Ports
The
GetModel operator provides single input port.
The
GetModel operator provides a single output port.
Using the EmitRecords Operator to Emit Record Data
The
EmitRecords operator pushes the given set of records to its output record port. This makes the records available to downstream operators.
Use this operator if a small number of records that can be fit into memory need to be used as a data source. Use conventional I/O operators if the data can be contained in a supported data source.
The data must be collected into a
RecordTokenList. The captured data is passed to the operator as a property. When the operator executes, it will emit each record to its output port. The output port takes on the schema of the given
RecordTokenList.
Code Example
The following example creates a record token list and then populates the list with data. An application is created using an
EmitRecords operator to push the data for downstream consumption.
Using the EmitRecords operator in Java
import com.pervasive.datarush.graphs.EngineConfig;
import com.pervasive.datarush.graphs.LogicalGraph;
import com.pervasive.datarush.graphs.LogicalGraphFactory;
import com.pervasive.datarush.operators.sink.LogRows;
import com.pervasive.datarush.operators.source.EmitRecords;
import com.pervasive.datarush.sequences.record.RecordTokenList;
import com.pervasive.datarush.tokens.record.RecordToken;
import com.pervasive.datarush.tokens.scalar.IntToken;
import com.pervasive.datarush.tokens.scalar.StringToken;
import com.pervasive.datarush.types.RecordTokenType;
import com.pervasive.datarush.types.TokenTypeConstant;
/**
* An example of building an in-memory data set and emitting the data set
* within an application graph for other operators to consume.
*/
public class EmitRecordData {
public static void main(String[] args) {
// Define the record token type
RecordTokenType dataType =
TokenTypeConstant.record(
TokenTypeConstant.STRING("forecast"),
TokenTypeConstant.INT("temperature"));
// Create the record token list
RecordTokenList data = new RecordTokenList(dataType, 50);
// Append data to the list
data.append(new RecordToken(dataType, StringToken.parse("sunny"), IntToken.parse("78")));
data.append(new RecordToken(dataType, StringToken.parse("cloudy"), IntToken.parse("67")));
data.append(new RecordToken(dataType, StringToken.parse("rain"), IntToken.parse("64")));
data.append(new RecordToken(dataType, StringToken.parse("sunny"), IntToken.parse("74")));
// Create a new logical graph
LogicalGraph graph = LogicalGraphFactory.newLogicalGraph("EmitData");
// Create the emit records operator and set the data to emit
EmitRecords emitRecs = graph.add(new EmitRecords());
emitRecs.setInput(data);
// Create a log rows instance to capture the data
LogRows logger = graph.add(new LogRows(1));
// Connect emit records to the logger
graph.connect(emitRecs.getOutput(), logger.getInput());
// Execute the graph in a single stream reduce output from logger
graph.compile(EngineConfig.engine().parallelism(1)).run();
}
}
The output from the sample application is captured as follows. Note that all four created records were emitted correctly and processed by the downstream LogRows operator.
INFO: Executing phase 0 graph: {[EmitRecords, LogRows]}
INFO: Input type is {"type":"record","representation":"DENSE_BASE_NULL","fields":[{"forecast":{"type":"string"}},{"temperature":{"type":"int"}}]}
INFO: 0: row 1 is [sunny, 78]
INFO: 0: row 2 is [cloudy, 67]
INFO: 0: row 3 is [rain, 64]
INFO: 0: row 4 is [sunny, 74]
INFO: Counted 4 rows
INFO: Phase 0 graph: {[EmitRecords, LogRows]} completed in 420 milliseconds
Using the EmitRecords operator in RushScript
var emitter = dr.emitRecords(data);
Properties
The
EmitRecords operator provides one property.
Ports
The
EmitRecords operator provides a single input port.
The
EmitRecords operator provides a single output port.
Using the PutModel Operator to Emit Model Data
The
PutModel operator provides a way to inject an in-memory reference to a model object into a graph. This is typically used when a graph is embedded within a larger context where we already have a reference to a model object. This operator is always implicitly non-parallel; if the target is parallel, a scatter will be performed, sending a copy of the model to all partitions.
Code Example
The following code fragment demonstrates creating a
PutModel operator within an application. The model is handed to the operator. During execution, the operator will push the model to its output port. Downstream operators will then have access to the model instance.
Using the PutModel operator in Java
// Create a simple model
Long model = new Long(42);
// Create the PutModel operator and set the model to emit
PutModel<Long> modelDist = graph.add(new PutModel<Long>(CountPort.FACTORY));
modelDist.setModel(model);
// Connect the PutModel output to a downstream consumer
graph.connect(modelDist.getOutput(), targetPort);
Properties
The
PutModel operator provides the following properties.
Ports
The
PutModel operator provides a single output port.
Using the LogRows Operator to Log Record Data
The
LogRows operator logs information about the input data from a flow. The record type of the flow is logged. A logFrequency property is used to determine which data rows are logged. A data row is logged by outputting the value of each input field within the row. A format can be provided that specifies the output format for each row log.
Code Example
The code fragment shows how to construct a
LogRows operator and connect its output port with a source port.
Using the LogRows operator in Java
// Create a log rows instance to capture the data
LogRows logger = graph.add(new LogRows(1));
// Connect source to the logger
graph.connect(source, logger.getInput());
Using the LogRows operator in RushScript
dr.logRows(data, {logFrequency:1});
Properties
The
LogRows operator provides the following properties.
Ports
The
LogRows operator provides a single input port.