Was this helpful?
Capturing Data
When composing and executing applications using DataFlow, the application is self-contained. By self-contained it is meant that the code executing the application does not "touch" the data that is being processed.
At times, the need arises to have the executing application access the data within an application directly. For example, an application may be executed to aggregate a set of data. The aggregation results are used to determine the next steps in processing the data (that is, composing the next application to run). Being able to programmatically access the data allows the controlling program to easily use the results of one application to determine how to compose the next.
Likewise, at times it is also required to inject data into an application programmatically. This could be the result of gathering the data in-memory and then wanting to push that data into a DataFlow application. Either way, several operators are available to help with capturing and emitting data for DataFlow applications.
The same holds for model data. A common use case involves executing a DataFlow application to build a predictive model. The code controlling the application execution accesses the produced model to determine if the model meets certain criteria. If the model does not meet the constraints, another model can be built using a different set of properties or combination of fields. In this process, being able to access the model after the application execution is crucial.
The following sections provide details on how to access record and model data directly. Reference each section for specifics on how to use these operators.
Covered Operations
Using the CollectRecords Operator to Capture Records Data
The CollectRecords operator gathers input data into an in-memory token list. This operator should be used only in contexts where the data is known to be small because all the data will be loaded into memory.
After this operator is executed within a dataflow graph, the resulting data can be accessed using the getOutput() method. The result is a RecordTokenList. This list supports iteration and so can be used to peruse each resultant row. The result of invoking the getOutput() method before execution of the graph completes is undefined.
This operator is non-parallel.
WARNING!  All input data to this operator is collected into memory. Only use this operator when the data it will consume is known to be of a reasonable size to fit in memory.
Code Example
The code example reads in movie rating data, aggregating the data using the Group operator. The output from the Group operator is captured using the CollectRecords operator. The captured output is accessed after the graph execution has completed successfully. The output records are iterated, dumping out the user identifier (group key) and the average rating.
Using the CollectRecords operator in Java
import com.pervasive.datarush.graphs.LogicalGraph;
import com.pervasive.datarush.graphs.LogicalGraphFactory;
import com.pervasive.datarush.operators.group.Group;
import com.pervasive.datarush.operators.io.textfile.ReadDelimitedText;
import com.pervasive.datarush.operators.sink.CollectRecords;
import com.pervasive.datarush.sequences.record.RecordTokenList;
import com.pervasive.datarush.tokens.TokenUtils;
import com.pervasive.datarush.tokens.record.RecordValued;

/**
 * Use the Group operator to aggregate values in the ratings data.
 *
 */
public class AggregateRatingsLikeSQL {

    public static void main(String[] args) {
        // Create an empty logical graph
        LogicalGraph graph = LogicalGraphFactory.newLogicalGraph("GroupAndCollect");

        // Create a delimited text reader for the "ratings.txt" file
        ReadDelimitedText reader = graph.add(new ReadDelimitedText("data/ratings.txt"));
        reader.setFieldSeparator("::");
        reader.setHeader(true);
        
        // Create a group operator and set aggregations to perform.
        // Note the use of "as" to specify the output name of an aggregation.
        Group groupRatings = graph.add(new Group());
        groupRatings.setKeys(new String[] {"userID"});
        groupRatings.setAggregations(
                "count(rating) as ratingCount, " +
                "min(rating) as minRating, " +
                "max(rating) as maxRating, " +
                "avg(rating) as avgRating, " +
                "stddev(rating) as stddevRating");
        
        // Connect reader to group
        graph.connect(reader.getOutput(), groupRatings.getInput());
        
        // Create the record collector
        CollectRecords collector = graph.add(new CollectRecords());
        collector.setInitialBufferSize(50);
        
        // Connect group to collector to collect the aggregation results
        graph.connect(groupRatings.getOutput(), collector.getInput());
        
        // Compile and run the graph
        graph.run();
        
        // Step through the aggregation results
        RecordTokenList aggResults = collector.getOutput();
        for (int i = 0; i < aggResults.size(); i++) {
            RecordValued row = aggResults.getToken(i);
            System.out.println("userID:" + TokenUtils.asString(row.getField("userID")));
            System.out.println("average rating:" + TokenUtils.asString(row.getField("avgRating")));
        }
    }
}
The output of the application is captured as follows. Note the logs from the application execution are followed by the print output from the application execution.
INFO: Executing phase 0 graph: {[AssignSplits, CollectRecords, ParseSplits, ReducePartialAggregates, UnsortedPartialAggregator]}
INFO: Assigned 1 total splits across 8 nodes, with 1 non-local
INFO: Phase 0 graph: {[AssignSplits, CollectRecords, ParseSplits, ReducePartialAggregates, UnsortedPartialAggregator]} completed in 1159 milliseconds
userID:1; average rating:4.188679245283019
userID:2; average rating:3.7131782945736433
userID:3; average rating:3.9019607843137254
userID:4; average rating:4.190476190476191
userID:5; average rating:3.1464646464646466
userID:6; average rating:3.9014084507042255
userID:7; average rating:4.32258064516129
userID:8; average rating:3.884892086330935
userID:9; average rating:3.7358490566037736
userID:10; average rating:4.074626865671642
Using the CollectRecords operator in RushScript
var collector = dr.collectRecords(data, {initialBufferSize:50});
Properties
The CollectRecords operator provides one property.
Name
Type
Description
initialBufferSize
int
The initial size, in tokens, to use for the list collecting input data. The list will automatically grow to accommodate additional tokens, if the initial size is insufficient.
Ports
The CollectRecords operator provides a single input port.
Name
Type
Get Method
Description
input
getInput()
Input data to be collected.
The CollectRecords operator provides a single output port.
Name
Type
Get Method
Description
output
getOutput()
The RecordTokenList produced by the input data.
Using the GetModel Operator to Capture Model Data
The GetModel operator updates an in-memory reference to a model object. This is typically used when a graph is embedded within a larger context where you need to obtain a handle to a model object.
This operator is always implicitly non-parallel; if the source is parallel, a merge must be performed, and thus a merge handler property must be specified.
Code Example
The example code fragment creates a GetModel operator and connects it with an upstream PMML model producer. After graph execution, the PMML model is accessed by invoking the getModel() method on the operator instance. An alternative is to use a Reference to the model that can be collected later. This can be used instead of holding on to a reference to the GetModel operator.
Using the GetModel operator in Java
// Create the get model operator for a PMML model object
GetModel<PMMLModel> modelOp = graph.add(new GetModel<PMMLModel>(PMMLPort.FACTORY));

// Connect the source of the model to the GetModel operator
graph.connect(modelPort, modelOp.getInput());

// Execute the graph
graph.run();

// Access the created model
PMMLModel model = modelOp.getModel();
Properties
The GetModel operator provides the following properties.
Name
Type
Description
mergeHandler
The merge handler to be used to merge model fragments into a single model. This value must be specified if the upstream operator is parallel.
modelReference
For advanced use only; specifies a reference location where the model is to be set.
Ports
The GetModel operator provides single input port.
Name
Type
Get Method
Description
input
Model
getInput()
Input model port.
The GetModel operator provides a single output port.
Name
Type
Get Method
Description
output
T
getModel()
Outputs an in-memory model object.
Using the EmitRecords Operator to Emit Record Data
The EmitRecords operator pushes the given set of records to its output record port. This makes the records available to downstream operators.
Use this operator if a small number of records that can be fit into memory need to be used as a data source. Use conventional I/O operators if the data can be contained in a supported data source.
The data must be collected into a RecordTokenList. The captured data is passed to the operator as a property. When the operator executes, it will emit each record to its output port. The output port takes on the schema of the given RecordTokenList.
Code Example
The following example creates a record token list and then populates the list with data. An application is created using an EmitRecords operator to push the data for downstream consumption.
Using the EmitRecords operator in Java
import com.pervasive.datarush.graphs.EngineConfig;
import com.pervasive.datarush.graphs.LogicalGraph;
import com.pervasive.datarush.graphs.LogicalGraphFactory;
import com.pervasive.datarush.operators.sink.LogRows;
import com.pervasive.datarush.operators.source.EmitRecords;
import com.pervasive.datarush.sequences.record.RecordTokenList;
import com.pervasive.datarush.tokens.record.RecordToken;
import com.pervasive.datarush.tokens.scalar.IntToken;
import com.pervasive.datarush.tokens.scalar.StringToken;
import com.pervasive.datarush.types.RecordTokenType;
import com.pervasive.datarush.types.TokenTypeConstant;

/**
 * An example of building an in-memory data set and emitting the data set
 * within an application graph for other operators to consume.
 */
public class EmitRecordData {
    public static void main(String[] args) {
        
        // Define the record token type
        RecordTokenType dataType = 
            TokenTypeConstant.record(
                    TokenTypeConstant.STRING("forecast"), 
                    TokenTypeConstant.INT("temperature"));
        
        // Create the record token list
        RecordTokenList data = new RecordTokenList(dataType, 50);
        
        // Append data to the list
        data.append(new RecordToken(dataType, StringToken.parse("sunny"), IntToken.parse("78")));
        data.append(new RecordToken(dataType, StringToken.parse("cloudy"), IntToken.parse("67")));
        data.append(new RecordToken(dataType, StringToken.parse("rain"), IntToken.parse("64")));
        data.append(new RecordToken(dataType, StringToken.parse("sunny"), IntToken.parse("74")));
        
        // Create a new logical graph
        LogicalGraph graph = LogicalGraphFactory.newLogicalGraph("EmitData");
        
        // Create the emit records operator and set the data to emit
        EmitRecords emitRecs = graph.add(new EmitRecords());
        emitRecs.setInput(data);
        
        // Create a log rows instance to capture the data
        LogRows logger = graph.add(new LogRows(1));
        
        // Connect emit records to the logger
        graph.connect(emitRecs.getOutput(), logger.getInput());
        
        // Execute the graph in a single stream reduce output from logger
        graph.compile(EngineConfig.engine().parallelism(1)).run();
    }
}
The output from the sample application is captured as follows. Note that all four created records were emitted correctly and processed by the downstream LogRows operator.
INFO: Executing phase 0 graph: {[EmitRecords, LogRows]}
INFO: Input type is {"type":"record","representation":"DENSE_BASE_NULL","fields":[{"forecast":{"type":"string"}},{"temperature":{"type":"int"}}]}
INFO: 0: row 1 is [sunny, 78]
INFO: 0: row 2 is [cloudy, 67]
INFO: 0: row 3 is [rain, 64]
INFO: 0: row 4 is [sunny, 74]
INFO: Counted 4 rows
INFO: Phase 0 graph: {[EmitRecords, LogRows]} completed in 420 milliseconds
Using the EmitRecords operator in RushScript
var emitter = dr.emitRecords(data);
Properties
The EmitRecords operator provides one property.
Name
Type
Description
input
The list of records that will be produced on the output of this operator.
Ports
The EmitRecords operator provides a single input port.
Name
Type
Get Method
Description
input
getInput()
The list of records that will be produced.
The EmitRecords operator provides a single output port.
Name
Type
Get Method
Description
output
getOutput()
Contains the output data.
Using the PutModel Operator to Emit Model Data
The PutModel operator provides a way to inject an in-memory reference to a model object into a graph. This is typically used when a graph is embedded within a larger context where we already have a reference to a model object. This operator is always implicitly non-parallel; if the target is parallel, a scatter will be performed, sending a copy of the model to all partitions.
Code Example
The following code fragment demonstrates creating a PutModel operator within an application. The model is handed to the operator. During execution, the operator will push the model to its output port. Downstream operators will then have access to the model instance.
Using the PutModel operator in Java
// Create a simple model
Long model = new Long(42);

// Create the PutModel operator and set the model to emit
PutModel<Long> modelDist = graph.add(new PutModel<Long>(CountPort.FACTORY));
modelDist.setModel(model);

// Connect the PutModel output to a downstream consumer
graph.connect(modelDist.getOutput(), targetPort);
Properties
The PutModel operator provides the following properties.
Name
Type
Description
model
T
The model instance to emit.
modelReference
For advanced use only; specifies a reference location where the model is to be obtained.
Ports
The PutModel operator provides a single output port.
Name
Type
Get Method
Description
output
getOutput()
Output port that will transmit the model during graph execution
Using the LogRows Operator to Log Record Data
The LogRows operator logs information about the input data from a flow. The record type of the flow is logged. A logFrequency property is used to determine which data rows are logged. A data row is logged by outputting the value of each input field within the row. A format can be provided that specifies the output format for each row log.
Code Example
The code fragment shows how to construct a LogRows operator and connect its output port with a source port.
Using the LogRows operator in Java
// Create a log rows instance to capture the data
LogRows logger = graph.add(new LogRows(1));

// Connect source to the logger
graph.connect(source, logger.getInput());
Using the LogRows operator in RushScript
dr.logRows(data, {logFrequency:1});
Properties
The LogRows operator provides the following properties.
Name
Type
Description
format
String
The format to use when logging rows. Reference java.text.MessageFormat for more information about the syntax of formats. Two variables are passed to the format: the row count and the row contents. Default: "row {0} is {1}".
logFrequency
int
The frequency of rows to log. Setting the frequency to 1 will log every row. A value of 2 logs every other row and so on. Default: 0 (turns off row logging).
Ports
The LogRows operator provides a single input port.
Name
Type
Get Method
Description
input
getInput()
Data to be logged.
Last modified date: 06/14/2024