Example of Using External Port
The following code sample shows the method to use an external source or sink. In an application, the same thread should not be controlling all the external ports and must have a separate thread for each external port.
External port example
import com.pervasive.dataflow.ports.record.ExternalRe cordSource;
import com.pervasive.dataflow.ports.record.ExternalRe cordSink;
...
// Create the graph LogicalGraph graph =
LogicalGraphFactory.newLogicalGraph();
// The type of the external source RecordTokenType type =
record(TokenTypeConstant.DOUBLE("field1"),
TokenTypeConstant.DOUBLE("field2"));
// Create the external source, adding to the graph
ExternalRecordSource source = graph.add(new ExternalRecordSource(type));
// Add some operators to the graph op = graph.add(...);
// Add the external sink to the graph
ExternalRecordSink sink= graph.add(new ExternalRecordSink());
// Connect source to another operator in the graph
graph.connect(source.getInput(),op.getInput());
// Connect the sink to another operator in the graph
graph.connect(op.getOutput(), sink.getInput());
// Compile the graph
LogicalGraphInstance instance = graph.compile();
// Always call start to start in background, don't use run because run will deadlock! instance.start();
// Produce some data in this thread RecordOutput
rout = source.getInput(); while (...) {
...
rout.push();
}
// Push end of data on external source
rout.pushEndOfData();
// Process the sink results in this thread RecordInput rec = sink.getOutput();
while ( rec.stepNext() ) {
...
}
// After processing all results or calling pushEndOfData,
// join on the graph.
// Don't join before since it will deadlock! instance.join();